BigQuery
BigQueryDataset
BigQueryDataset is a primitive type that can only be managed. In general it is used as a container for your BigQueryTables. To create a BigQueryDataset simply provide:
Arg | Description |
---|---|
project_id | The name of the GCP project the table exists in |
dataset_name | The name of the dataset |
Example usage:
from buildflow.io.gcp import BigQueryDataset
dataset = BigQueryDataset(project_id="project", dataset_name="dataset")
app.manage(dataset)
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the BigQueryDataset primitive to create a BigQuery dataset topic in your provided project.
BigQueryTable
BigQueryTable is a sink that can be use to write data to a BigQuery table. To create a BigQueryTable simply provide:
Arg | Description |
---|---|
dataset | The BigQueryDataset object that contains the table |
table_name | The name of the table |
from buildflow.io.gcp import BigQueryDataset, BigQueryTable
dataset = BigQueryDataset(project_id="project", dataset_name="dataset")
table = BigQueryTable(dataset=dataset, table_name="table")
app.manage(dataset, table)
@app.consumer(source=..., sink=table)
Types
The BigQueryTable sink expects an object that can be serialized in to a JSON object. You can return a dataclass
and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.
If you would like to return a custom type that is not JSON serializable you can implement the to_json
method on your class and we will use that to serialize your object.
class CustomType:
def __init__(self, b: int):
self.b = str
def to_json(self):
return {"b": self.b}
dataset = BigQueryDataset(project_id="project", dataset_name="dataset")
table = BigQueryTable(dataset=dataset, table_name="table")
app.manage(dataset, table)
@app.consumer(source=..., sink=table)
async def my_processor(elem: int) -> CustomType:
return CustomType(b=elem + 1)
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the BigQueryTable primitive to create a BigQuery table in your provided project.
Configuration Options
You can provide the following options to control resource management of the BigQuery table:
Option | Description |
---|---|
destroy_projection | Whether or not the dataset should be destroyed when the buildflow destroy is run. Defaults to False . |
batch_size | The number of rows to batch together in one write request to BigQuery. Defaults to 10,000. If you produce more than rows than batch_size the rows will be split in to multiple requests where the number of requests equals: len(rows) / batch_size . |
schema | The schema associated with the table. This can be your dataclass type that you are outputing. This is required if you are having buildflow manage your BigQuery table. |
@dataclasses.dataclass
class MySchema:
field1: int
field2: str
BigQueryTable(...).options(destroy_projection=True, batch_size=100_000, schema=MySchema)
Defining a Schema
The schema you pass to options
should be a dataclass that represent the schema of your table. Each field will correspond to one column of your table. The type of the field will be used to determine the type of the column. The following types are supported:
Python Type | BigQuery Field Type |
---|---|
int | INTEGER |
str | STRING |
float | FLOAT |
bytes | BYTES |
bool | BOOL |
datetime.datetime | TIMESTAMP |
datetime.date | DATE |
datetime.time | TIME |
Optional[…] | NULLABLE[…] |
List[…] | REPEATED[…] |
dataclass | RECORD[…] |
Using a nested dataclass will result in a nested RECORD
column in your table.