DuckDB is a sink that can be used to write data to a DuckDB table. To create a DuckDB simply provide:

ArgDescription
databaseThe name of the DuckDB database the table exists in
tableThe name of the table
motherduck_tokenThe token to use for authenticating to MotherDuck
from buildflow.io.duckdb import DuckDB

duckdb_table = DuckDB(database="database", table="table")

@app.consumer(source=..., sink=duckdb_table)

Connecting to Motherduck

from buildflow.io.duckdb import DuckDB

duckdb_table = DuckDB(database="md:database", table="table", motherduck_token="XXXXX")

@app.consumer(source=..., sink=duckdb_table)

Types

The DuckDB sink expects an object that can be serialized in to a JSON object. You can return a dataclass and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.

If you would like to return a custom type that is not JSON serializable you can implement the to_json method on your class and we will use that to serialize your object.

class CustomType:
    def __init__(self, b: int):
        self.b = str

    def to_json(self):
        return {"b": self.b}

duckdb_table = DuckDB(database="database", table="table")

@app.consumer(source=..., sink=duckdb_table)
async def my_processor(elem: int) -> CustomType:
    return CustomType(b=elem + 1)