Help for sql.load_csv

Project

sql

Function

load_csv

Sample CLI

gway sql load-csv

References

['debug', 'info', 'resource', 'warning']

Full Code

def load_csv(*, connection=None, folder="data", force=False):
    """
    Recursively loads CSVs from a folder into SQLite tables.
    Table names are derived from folder/file paths.
    """
    assert connection
    base_path = gw.resource(folder)

    def load_folder(path, prefix=""):
        cursor = connection.cursor()
        for item in os.listdir(path):
            full_path = os.path.join(path, item)
            if os.path.isdir(full_path):
                sub_prefix = f"{prefix}_{item}" if prefix else item
                load_folder(full_path, sub_prefix)
            elif item.endswith(".csv"):
                base_name = os.path.splitext(item)[0]
                table_name = f"{prefix}_{base_name}" if prefix else base_name
                table_name = table_name.replace("-", "_")

                with open(full_path, "r", encoding="utf-8") as f:
                    reader = csv.reader(f)
                    try:
                        headers = next(reader)
                        sample_row = next(reader)
                    except StopIteration:
                        gw.warning(f"Skipping empty CSV: {full_path}")
                        continue

                    seen = set()
                    unique_headers = []
                    for h in headers:
                        h_clean = h.strip()
                        h_final = h_clean
                        i = 1
                        while h_final.lower() in seen:
                            h_final = f"{h_clean}_{i}"
                            i += 1
                        unique_headers.append(h_final)
                        seen.add(h_final.lower())

                    types = [
                        infer_type(sample_row[i])
                        if i < len(sample_row) else "TEXT"
                        for i in range(len(unique_headers))
                    ]

                    cursor.execute(
                        "SELECT name FROM sqlite_master "
                        "WHERE type='table' AND name=?", (table_name,)
                    )
                    exists = cursor.fetchone()

                    if exists and force:
                        cursor.execute(f"DROP TABLE IF EXISTS [{table_name}]")
                        gw.info(f"Dropped existing table: {table_name}")

                    if not exists or force:
                        colspec = ", ".join(
                            f"[{unique_headers[i]}] {types[i]}"
                            for i in range(len(unique_headers))
                        )
                        create = f"CREATE TABLE [{table_name}] ({colspec})"
                        insert = (
                            f"INSERT INTO [{table_name}] "
                            f"({', '.join(f'[{h}]' for h in unique_headers)}) "
                            f"VALUES ({', '.join('?' for _ in unique_headers)})"
                        )

                        cursor.execute(create)
                        cursor.execute(insert, sample_row)
                        cursor.executemany(insert, reader)
                        connection.commit()

                        gw.info(
                            f"Loaded table '{table_name}' with "
                            f"{len(unique_headers)} columns"
                        )
                    else:
                        gw.debug(f"Skipped existing table: {table_name}")
        cursor.close()

    load_folder(base_path)