Project
Function
Sample CLI
gway sql open-connection
References
['debug', 'info', 'resource', 'warning']
Full Code
def open_connection(datafile=None, *,
sql_engine="sqlite", autoload=False, force=False,
row_factory=False, **dbopts):
"""
Initialize or reuse a database connection.
Caches connections by sql_engine, file path, and thread ID (if required).
"""
# Determine base cache key
base_key = (sql_engine, datafile or "default")
# Determine if thread ID should be included in key
if sql_engine in {"sqlite"}:
thread_key = threading.get_ident()
else:
thread_key = "*"
key = (base_key, thread_key)
if key in _connection_cache:
conn = _connection_cache[key]
if row_factory:
gw.warning("Row factory change requires close_connection(). Reconnect manually.")
gw.debug(f"Reusing connection: {key}")
return conn
# Create connection
if sql_engine == "sqlite":
path = gw.resource(datafile or "work/data.sqlite")
conn = sqlite3.connect(path)
if row_factory:
if row_factory is True:
conn.row_factory = sqlite3.Row
elif callable(row_factory):
conn.row_factory = row_factory
elif isinstance(row_factory, str):
conn.row_factory = gw[row_factory]
gw.debug(f"Configured row_factory: {conn.row_factory}")
gw.info(f"Opened SQLite connection at {path}")
elif sql_engine == "duckdb":
import duckdb
path = gw.resource(datafile or "work/data.duckdb")
conn = duckdb.connect(path)
gw.info(f"Opened DuckDB connection at {path}")
elif sql_engine == "postgres":
import psycopg2
conn = psycopg2.connect(**dbopts)
gw.info(f"Connected to Postgres at {dbopts.get('host', 'localhost')}")
else:
raise ValueError(f"Unsupported sql_engine: {sql_engine}")
# Wrap and cache connection
conn = WrappedConnection(conn)
_connection_cache[key] = conn
if autoload and sql_engine == "sqlite":
load_csv(connection=conn, force=force)
return conn