BigQueryDataset

BigQueryDataset is a primitive type that can only be managed. In general it is used as a container for your BigQueryTables. To create a BigQueryDataset simply provide:

ArgDescription
project_idThe name of the GCP project the table exists in
dataset_nameThe name of the dataset

Example usage:

from buildflow.io.gcp import BigQueryDataset

dataset = BigQueryDataset(project_id="project", dataset_name="dataset")

app.manage(dataset)

Resource Creation

If you are using BuildFlow’s built in resource creation/management you can use the BigQueryDataset primitive to create a BigQuery dataset topic in your provided project.

BigQueryTable

BigQueryTable is a sink that can be use to write data to a BigQuery table. To create a BigQueryTable simply provide:

ArgDescription
datasetThe BigQueryDataset object that contains the table
table_nameThe name of the table
from buildflow.io.gcp import BigQueryDataset, BigQueryTable

dataset = BigQueryDataset(project_id="project", dataset_name="dataset")
table = BigQueryTable(dataset=dataset, table_name="table")

app.manage(dataset, table)

@app.consumer(source=..., sink=table)

Types

The BigQueryTable sink expects an object that can be serialized in to a JSON object. You can return a dataclass and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.

If you would like to return a custom type that is not JSON serializable you can implement the to_json method on your class and we will use that to serialize your object.

class CustomType:
    def __init__(self, b: int):
        self.b = str

    def to_json(self):
        return {"b": self.b}


dataset = BigQueryDataset(project_id="project", dataset_name="dataset")
table = BigQueryTable(dataset=dataset, table_name="table")

app.manage(dataset, table)

@app.consumer(source=..., sink=table)
async def my_processor(elem: int) -> CustomType:
    return CustomType(b=elem + 1)

Resource Creation

If you are using BuildFlow’s built in resource creation/management you can use the BigQueryTable primitive to create a BigQuery table in your provided project.

Configuration Options

You can provide the following options to control resource management of the BigQuery table:

OptionDescription
destroy_projectionWhether or not the dataset should be destroyed when the buildflow destroy is run. Defaults to False .
batch_sizeThe number of rows to batch together in one write request to BigQuery. Defaults to 10,000. If you produce more than rows than batch_size the rows will be split in to multiple requests where the number of requests equals: len(rows) / batch_size .
schemaThe schema associated with the table. This can be your dataclass type that you are outputing. This is required if you are having buildflow manage your BigQuery table.
@dataclasses.dataclass
class MySchema:
    field1: int
    field2: str

BigQueryTable(...).options(destroy_projection=True, batch_size=100_000, schema=MySchema)

Defining a Schema

The schema you pass to options should be a dataclass that represent the schema of your table. Each field will correspond to one column of your table. The type of the field will be used to determine the type of the column. The following types are supported:

Python TypeBigQuery Field Type
intINTEGER
strSTRING
floatFLOAT
bytesBYTES
boolBOOL
datetime.datetimeTIMESTAMP
datetime.dateDATE
datetime.timeTIME
Optional[…]NULLABLE[…]
List[…]REPEATED[…]
dataclassRECORD[…]

Using a nested dataclass will result in a nested RECORD column in your table.