Primitives
DuckDB Table
DuckDB is a sink that can be used to write data to a DuckDB table. To create a DuckDB simply provide:
Arg | Description |
---|---|
database | The name of the DuckDB database the table exists in |
table | The name of the table |
motherduck_token | The token to use for authenticating to MotherDuck |
from buildflow.io.duckdb import DuckDB
duckdb_table = DuckDB(database="database", table="table")
@app.consumer(source=..., sink=duckdb_table)
Connecting to Motherduck
from buildflow.io.duckdb import DuckDB
duckdb_table = DuckDB(database="md:database", table="table", motherduck_token="XXXXX")
@app.consumer(source=..., sink=duckdb_table)
Types
The DuckDB sink expects an object that can be serialized in to a JSON object. You can return a dataclass
and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.
If you would like to return a custom type that is not JSON serializable you can implement the to_json
method on your class and we will use that to serialize your object.
class CustomType:
def __init__(self, b: int):
self.b = str
def to_json(self):
return {"b": self.b}
duckdb_table = DuckDB(database="database", table="table")
@app.consumer(source=..., sink=duckdb_table)
async def my_processor(elem: int) -> CustomType:
return CustomType(b=elem + 1)