GCS Bucket
GCSBucket is a sink primitive that can be used to write various files to a GCS bucket. The sink primitive takes in a file path and will create one unique file per replica. To create a GCSBucket provide:
Arg | Description |
---|---|
project_id | The project ID where the GCS bucket lives |
bucket_name | The name of the bucket |
file_path | The base path of the file to write to. We will take this path and generate a unique file path for each replica in the following format: {file_path}-{uuid}.{extension} |
file_format | The file format to write valid options are: JSON, CSV, and PARQUET |
file_path
and file_format
are only used if you are using GCSBucket as a sink.
Example usage:
from buildflow.io.gcp import GCSBucket
bucket = GCSBucket(
project_id="project",
bucket_name="bucket",
file_path="path/to/file_in_bucket",
file_format="PARQUET")
app.manage(bucket)
@app.consumer(source=..., sink=bucket)
Types
The GCSBucket sink expects an object that can be serialized in to a JSON object. You can return a dataclass
and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.
If you would like to return a custom type that is not JSON serializable you can implement the to_json
method on your class and we will use that to serialize your object.
class CustomType:
def __init__(self, b: int):
self.b = str
def to_json(self):
return {"b": self.b}
bucket = GCSBucket(
project_id="project",
bucket_name="bucket",
file_path="path/to/file_in_bucket",
file_format="PARQUET")
app.manage(bucket)
@app.consumer(source=..., sink=bucket)
async def my_processor(elem: int) -> CustomType:
return CustomType(b=elem + 1)
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the GCSBucket primitive to create a GCS Bucket in your provided project.
Configuration Options
You can provide the following options to control resource management of the GCS bucket:
Options | Description |
---|---|
force_destroy | If true destroy will fail if the bucket contains objects. Defaults to False . |
bucket_region | The region to create the bucket in. |
primitive = GCSBucket(...).options(force_destroy=True, bucket_region="US")