SnowflakeTable is a sink that can be used to write data to a Snowflake table. To create a SnowflakeTable simply provide:

ArgDescription
tableThe name of the snowflake table
databaseThe name of the database the table exists in
schemaThe name of the schema the table exists in
bucketA S3Bucket that the data will be staged in before writing to Snowflake.
snow_pipeThe name of the Snowflake pipe to use to load the data. If you are using BuildFlow to managed your resources you can leave this out and one will be created for you when you run buildflow apply .
snowflake_stageThe name of the Snowflake stage to use to load the data. If you are using BuildFlow to managed your resources you can leave this out and one will be created for you when you run buildflow apply .
accountThe name of the Snowflake account to use
userThe name of the Snowflake user to use to authenticate as
private_keyThe private key to use for authentication

There are a lot of components involved in writing to Snowflake so we recommend you allow buildflow to manage all of these.

Utilities exist for reading in a private key file here.

from buildflow.io.snowflake import SnowflakeTable
from buildflow.io.aws import S3Bucket

bucket = S3Bucket(bucket_name="bucket")
snowflake_table = SnowflakeTable(
        table="table",
        database="database",
        schema="schema",
        bucket=bucket,
        user="...",
        private_key="...")

app.manage(snowflake_table, bucket)

@app.consumer(source=..., sink=snowflake_table)
    ...

Support for GCS buckets is coming soon.

Types

The SnowflakeTable sink expects an object that can be serialized in to a JSON object. You can return a dataclass and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.

If you would like to return a custom type that is not JSON serializable you can implement the to_json method on your class and we will use that to serialize your object.

class CustomType:
    def __init__(self, b: int):
        self.b = str

    def to_json(self):
        return {"b": self.b}

bucket = S3Bucket(bucket_name="bucket")
snowflake_table = SnowflakeTable(
        table="table",
        database="database",
        schema="schema",
        bucket=bucket,
        user="...",
        private_key="...")

app.manage(snowflake_table, bucket)

@app.consumer(source=..., sink=snowflake_table)
async def my_processor(elem: int) -> CustomType:
    return CustomType(b=elem + 1)

Resource Creation

If you are using BuildFlow’s built in resource creation/management you can use the SnowflakeTable primitive to create a all required resources for you. The following resources will be created:

ResourceDescription
Snowflake TableA snowflake table with a table schema matching your output type
Snowflake DatabaseA snowflake database to hold the table (only if database_managed option is True)
Snowflake SchemaA snowflake schema to hold the table (only if schema_managed option is True)
Snowflake StageA snowflake stage to load the data from the bucket if the snowflake_stage option is not provided
Snowflake PipeA snowflake pipe to load the data from the stage if the snowflake_pipe option is not provided

Configuration Options

You can provide the following options to control resource management of the SnowflakeTable table:

OptionsDescription
database_managedWhether or not the database should be included in resource management / creation. Defaults to True .
schema_managedWhether or not the database should be included in resource management / creation. Defaults to True .
table_schemaThe schema associated with the table. This can be your dataclass type that you are outputing. This is required if you are having buildflow manage your SnowflakeTable table.
SnowflakeTable(...).options(database_managed=True, schema_managed=True)