GCSBucket is a sink primitive that can be used to write various files to a GCS bucket. The sink primitive takes in a file path and will create one unique file per replica. To create a GCSBucket provide:

ArgDescription
project_idThe project ID where the GCS bucket lives
bucket_nameThe name of the bucket
file_pathThe base path of the file to write to. We will take this path and generate a unique file path for each replica in the following format: {file_path}-{uuid}.{extension}
file_formatThe file format to write valid options are: JSON, CSV, and PARQUET

file_path and file_format are only used if you are using GCSBucket as a sink.

Example usage:

from buildflow.io.gcp import GCSBucket

bucket = GCSBucket(
    project_id="project",
    bucket_name="bucket",
    file_path="path/to/file_in_bucket",
    file_format="PARQUET")
    
app.manage(bucket)

@app.consumer(source=..., sink=bucket)

Types

The GCSBucket sink expects an object that can be serialized in to a JSON object. You can return a dataclass and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.

If you would like to return a custom type that is not JSON serializable you can implement the to_json method on your class and we will use that to serialize your object.

class CustomType:
    def __init__(self, b: int):
        self.b = str

    def to_json(self):
        return {"b": self.b}

bucket = GCSBucket(
    project_id="project",
    bucket_name="bucket",
    file_path="path/to/file_in_bucket",
    file_format="PARQUET")
    
app.manage(bucket)

@app.consumer(source=..., sink=bucket)
async def my_processor(elem: int) -> CustomType:
    return CustomType(b=elem + 1)

Resource Creation

If you are using BuildFlow’s built in resource creation/management you can use the GCSBucket primitive to create a GCS Bucket in your provided project.

Configuration Options

You can provide the following options to control resource management of the GCS bucket:

OptionsDescription
force_destroyIf true destroy will fail if the bucket contains objects. Defaults to False .
bucket_regionThe region to create the bucket in.
primitive = GCSBucket(...).options(force_destroy=True, bucket_region="US")