Skip to content

DuckDB

Installation

You need to install the duckdb library to use this in blendsql.

Bases: Database

An in-memory DuckDB database connection. Can be initialized via any of the available class methods.

Examples:

from blendsql.db import DuckDB
db = DuckDB.from_pandas(
    pd.DataFrame(
        {
            "name": ["John", "Parker"],
            "age": [12, 26]
        },
    )
)
# Or, load multiple dataframes
db = DuckDB.from_pandas(
    {
        "students": pd.DataFrame(
            {
                "name": ["John", "Parker"],
                "age": [12, 26]
            },
        ),
        "classes": pd.DataFrame(
            {
                "class": ["Physics 101", "Chemistry"],
                "size": [50, 32]
            },
        ),
    }
)
Source code in blendsql/db/_duckdb.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@attrs
class DuckDB(Database):
    """An in-memory DuckDB database connection.
    Can be initialized via any of the available class methods.

    Examples:
        ```python
        from blendsql.db import DuckDB
        db = DuckDB.from_pandas(
            pd.DataFrame(
                {
                    "name": ["John", "Parker"],
                    "age": [12, 26]
                },
            )
        )
        # Or, load multiple dataframes
        db = DuckDB.from_pandas(
            {
                "students": pd.DataFrame(
                    {
                        "name": ["John", "Parker"],
                        "age": [12, 26]
                    },
                ),
                "classes": pd.DataFrame(
                    {
                        "class": ["Physics 101", "Chemistry"],
                        "size": [50, 32]
                    },
                ),
            }
        )
        ```
    """

    # Can be either a dict from name -> pd.DataFrame
    # or, a single pd.DataFrame object
    con: "DuckDBPyConnection" = attrib()
    db_url: str = attrib()

    # We use below to track which tables we should drop on '_reset_connection'
    temp_tables: Set[str] = set()

    @classmethod
    def from_pandas(
        cls, data: Union[Dict[str, pd.DataFrame], pd.DataFrame], tablename: str = "w"
    ):
        if not _has_duckdb:
            raise ImportError(
                "Please install duckdb with `pip install duckdb`!"
            ) from None
        import duckdb

        con = duckdb.connect(database=":memory:")
        if isinstance(data, pd.DataFrame):
            db_url = "Local pandas table"
            # I don't really understand the scope of duckdb's replacement scan here
            # I assign the underlying data to _df, since passing self.data doesn't work
            # in the self.con.sql call.
            _df = data
            con.sql(f"CREATE TABLE {tablename} AS SELECT * FROM _df")

        elif isinstance(data, dict):
            db_url = f"Local pandas tables {', '.join(data.keys())}"
            for tablename, _df in data.items():
                # Note: duckdb.sql connects to the default in-memory database connection
                con.sql(f"CREATE TABLE {tablename} AS SELECT * FROM _df")
        else:
            raise ValueError(
                "Unknown datatype passed to `Pandas`!\nWe expect either a single dataframe, or a dictionary mapping many tables from {tablename: df}"
            )
        return cls(con=con, db_url=db_url)

    @classmethod
    def from_sqlite(cls, db_url: str):
        """TODO: any point in this if we already have dedicated SQLite databse class
        and it's faster?
        """
        if not _has_duckdb:
            raise ImportError(
                "Please install duckdb with `pip install duckdb<1`!"
            ) from None
        import duckdb

        con = duckdb.connect(database=":memory:")
        db_url = str(Path(db_url).resolve())
        con.sql("INSTALL sqlite;")
        con.sql("LOAD sqlite;")
        con.sql(f"ATTACH '{db_url}' AS sqlite_db (TYPE sqlite);")
        con.sql("USE sqlite_db")
        return cls(con=con, db_url=db_url)

    def _reset_connection(self):
        """Reset connection, so that temp tables are cleared."""
        for tablename in self.temp_tables:
            self.con.execute(f'DROP TABLE IF EXISTS "{tablename}"')
        self.temp_tables = set()

    def has_temp_table(self, tablename: str) -> bool:
        return tablename in self.execute_to_list("SHOW TABLES")

    @cached_property
    def sqlglot_schema(self) -> dict:
        """Returns database schema as a dictionary, in the format that
        sqlglot.optimizer expects.

        Examples:
            ```python
            db.sqlglot_schema
            > {"x": {"A": "INT", "B": "INT", "C": "INT", "D": "INT", "Z": "STRING"}}
            ```
        """
        schema: Dict[str, dict] = {}
        for tablename in self.tables():
            schema[f'"{double_quote_escape(tablename)}"'] = {}
            for column_name, column_type in self.con.execute(
                f"SELECT column_name, column_type FROM (DESCRIBE {tablename})"
            ).fetchall():
                schema[f'"{double_quote_escape(tablename)}"'][
                    '"' + column_name + '"'
                ] = column_type
        return schema

    def tables(self) -> List[str]:
        return self.execute_to_list("SHOW TABLES;")

    def iter_columns(self, tablename: str) -> Generator[str, None, None]:
        for row in self.con.execute(
            f"SELECT column_name FROM (DESCRIBE {tablename})"
        ).fetchall():
            yield row[0]

    def schema_string(self, use_tables: Optional[Collection[str]] = None) -> str:
        """Converts the database to a series of 'CREATE TABLE' statements."""
        # TODO
        return None

    def to_temp_table(self, df: pd.DataFrame, tablename: str):
        """Technically, when duckdb is run in-memory (as is the default),
        all created tables are temporary tables (since they expire at the
        end of the session). So, we don't really need to insert 'TEMP' keyword here?
        """
        # DuckDB has this cool 'CREATE OR REPLACE' syntax
        # https://duckdb.org/docs/sql/statements/create_table.html#create-or-replace
        self.con.sql(f'CREATE OR REPLACE TEMP TABLE "{tablename}" AS SELECT * FROM df')
        self.temp_tables.add(tablename)
        logger.debug(Fore.CYAN + f"Created temp table {tablename}" + Fore.RESET)

    def execute_to_df(self, query: str, params: Optional[dict] = None) -> pd.DataFrame:
        """On params with duckdb: https://github.com/duckdb/duckdb/issues/9853#issuecomment-1832732933"""
        return self.con.sql(query).df()

    def execute_to_list(
        self, query: str, to_type: Optional[Callable] = lambda x: x
    ) -> list:
        res = []
        for row in self.con.execute(query).fetchall():
            res.append(to_type(row[0]))
        return res

from_pandas(data, tablename='w') classmethod

Source code in blendsql/db/_duckdb.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def from_pandas(
    cls, data: Union[Dict[str, pd.DataFrame], pd.DataFrame], tablename: str = "w"
):
    if not _has_duckdb:
        raise ImportError(
            "Please install duckdb with `pip install duckdb`!"
        ) from None
    import duckdb

    con = duckdb.connect(database=":memory:")
    if isinstance(data, pd.DataFrame):
        db_url = "Local pandas table"
        # I don't really understand the scope of duckdb's replacement scan here
        # I assign the underlying data to _df, since passing self.data doesn't work
        # in the self.con.sql call.
        _df = data
        con.sql(f"CREATE TABLE {tablename} AS SELECT * FROM _df")

    elif isinstance(data, dict):
        db_url = f"Local pandas tables {', '.join(data.keys())}"
        for tablename, _df in data.items():
            # Note: duckdb.sql connects to the default in-memory database connection
            con.sql(f"CREATE TABLE {tablename} AS SELECT * FROM _df")
    else:
        raise ValueError(
            "Unknown datatype passed to `Pandas`!\nWe expect either a single dataframe, or a dictionary mapping many tables from {tablename: df}"
        )
    return cls(con=con, db_url=db_url)

from_sqlite(db_url) classmethod

TODO: any point in this if we already have dedicated SQLite databse class and it's faster?

Source code in blendsql/db/_duckdb.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@classmethod
def from_sqlite(cls, db_url: str):
    """TODO: any point in this if we already have dedicated SQLite databse class
    and it's faster?
    """
    if not _has_duckdb:
        raise ImportError(
            "Please install duckdb with `pip install duckdb<1`!"
        ) from None
    import duckdb

    con = duckdb.connect(database=":memory:")
    db_url = str(Path(db_url).resolve())
    con.sql("INSTALL sqlite;")
    con.sql("LOAD sqlite;")
    con.sql(f"ATTACH '{db_url}' AS sqlite_db (TYPE sqlite);")
    con.sql("USE sqlite_db")
    return cls(con=con, db_url=db_url)