Help for sql.open_connection

Project

sql

Function

open_connection

Sample CLI

gway sql open-connection

References

['debug', 'info', 'resource', 'warning']

Full Code

def open_connection(datafile=None, *, 
            sql_engine="sqlite", autoload=False, force=False,
            row_factory=False, **dbopts):
    """
    Initialize or reuse a database connection.
    Caches connections by sql_engine, file path, and thread ID (if required).
    """

    # Determine base cache key
    base_key = (sql_engine, datafile or "default")

    # Determine if thread ID should be included in key
    if sql_engine in {"sqlite"}:
        thread_key = threading.get_ident()
    else:
        thread_key = "*"

    key = (base_key, thread_key)

    if key in _connection_cache:
        conn = _connection_cache[key]
        if row_factory:
            gw.warning("Row factory change requires close_connection(). Reconnect manually.")
        gw.debug(f"Reusing connection: {key}")
        return conn

    # Create connection
    if sql_engine == "sqlite":
        path = gw.resource(datafile or "work/data.sqlite")
        conn = sqlite3.connect(path)

        if row_factory:
            if row_factory is True:
                conn.row_factory = sqlite3.Row
            elif callable(row_factory):
                conn.row_factory = row_factory
            elif isinstance(row_factory, str):
                conn.row_factory = gw[row_factory]
            gw.debug(f"Configured row_factory: {conn.row_factory}")

        gw.info(f"Opened SQLite connection at {path}")

    elif sql_engine == "duckdb":
        import duckdb
        path = gw.resource(datafile or "work/data.duckdb")
        conn = duckdb.connect(path)
        gw.info(f"Opened DuckDB connection at {path}")

    elif sql_engine == "postgres":
        import psycopg2
        conn = psycopg2.connect(**dbopts)
        gw.info(f"Connected to Postgres at {dbopts.get('host', 'localhost')}")

    else:
        raise ValueError(f"Unsupported sql_engine: {sql_engine}")

    # Wrap and cache connection
    conn = WrappedConnection(conn)
    _connection_cache[key] = conn

    if autoload and sql_engine == "sqlite":
        load_csv(connection=conn, force=force)

    return conn