Snowflake Table
SnowflakeTable is a sink that can be used to write data to a Snowflake table. To create a SnowflakeTable simply provide:
Arg | Description |
---|---|
table | The name of the snowflake table |
database | The name of the database the table exists in |
schema | The name of the schema the table exists in |
bucket | A S3Bucket that the data will be staged in before writing to Snowflake. |
snow_pipe | The name of the Snowflake pipe to use to load the data. If you are using BuildFlow to managed your resources you can leave this out and one will be created for you when you run buildflow apply . |
snowflake_stage | The name of the Snowflake stage to use to load the data. If you are using BuildFlow to managed your resources you can leave this out and one will be created for you when you run buildflow apply . |
account | The name of the Snowflake account to use |
user | The name of the Snowflake user to use to authenticate as |
private_key | The private key to use for authentication |
There are a lot of components involved in writing to Snowflake so we recommend you allow buildflow to manage all of these.
Utilities exist for reading in a private key file here.
from buildflow.io.snowflake import SnowflakeTable
from buildflow.io.aws import S3Bucket
bucket = S3Bucket(bucket_name="bucket")
snowflake_table = SnowflakeTable(
table="table",
database="database",
schema="schema",
bucket=bucket,
user="...",
private_key="...")
app.manage(snowflake_table, bucket)
@app.consumer(source=..., sink=snowflake_table)
...
Support for GCS buckets is coming soon.
Types
The SnowflakeTable sink expects an object that can be serialized in to a JSON object. You can return a dataclass
and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.
If you would like to return a custom type that is not JSON serializable you can implement the to_json
method on your class and we will use that to serialize your object.
class CustomType:
def __init__(self, b: int):
self.b = str
def to_json(self):
return {"b": self.b}
bucket = S3Bucket(bucket_name="bucket")
snowflake_table = SnowflakeTable(
table="table",
database="database",
schema="schema",
bucket=bucket,
user="...",
private_key="...")
app.manage(snowflake_table, bucket)
@app.consumer(source=..., sink=snowflake_table)
async def my_processor(elem: int) -> CustomType:
return CustomType(b=elem + 1)
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the SnowflakeTable primitive to create a all required resources for you. The following resources will be created:
Resource | Description |
---|---|
Snowflake Table | A snowflake table with a table schema matching your output type |
Snowflake Database | A snowflake database to hold the table (only if database_managed option is True) |
Snowflake Schema | A snowflake schema to hold the table (only if schema_managed option is True) |
Snowflake Stage | A snowflake stage to load the data from the bucket if the snowflake_stage option is not provided |
Snowflake Pipe | A snowflake pipe to load the data from the stage if the snowflake_pipe option is not provided |
Configuration Options
You can provide the following options to control resource management of the SnowflakeTable table:
Options | Description |
---|---|
database_managed | Whether or not the database should be included in resource management / creation. Defaults to True . |
schema_managed | Whether or not the database should be included in resource management / creation. Defaults to True . |
table_schema | The schema associated with the table. This can be your dataclass type that you are outputing. This is required if you are having buildflow manage your SnowflakeTable table. |
SnowflakeTable(...).options(database_managed=True, schema_managed=True)