S3Bucket is a sink primitive that can be used to write various files to an S3 bucket. The sink primitive takes in a file path and will create one unique file per replica. To create a S3Bucket provide:

ArgDescription
bucket_nameThe name of the bucket
file_pathThe base path of the file to write to. We will take this path and generate a unique file path for each replica in the following format: {file_path}-{uuid}.{extension}
file_formatThe file format to write valid options are: JSON, CSV, and PARQUET
aws_regionThe region the bucket exists in

file_path and file_format are only used if you are using S3Bucket as a sink.

Example usage:

from buildflow.io.aws import S3Bucket

bucket = S3Bucket(
    bucket_name="bucket",
    aws_region="us-east-1",
    file_path="path/to/file_in_bucket",
    file_format="PARQUET"
)

app.manage(bucket)

@app.consumer(source=..., sink=bucket)

Types

The S3Bucket sink expects an object that can be serialized in to a JSON object. You can return a dataclass and we will automatically serialize it for you, or you can return a dictionary containing JSON serializable objects.

If you would like to return a custom type that is not JSON serializable you can implement the to_json method on your class and we will use that to serialize your object.

bucket = S3Bucket(
    bucket_name="bucket",
    aws_region="us-east-1",
    file_path="path/to/file_in_bucket",
    file_format="PARQUET"
)

app.manage(bucket)

class CustomType:
    def __init__(self, b: int):
        self.b = str

    def to_json(self):
        return {"b": self.b}

@app.consumer(source=..., sink=bucket)
async def my_processor(elem: int) -> CustomType:
    return CustomType(b=elem + 1)

Resource Creation

If you are using BuildFlow’s built in resource creation/management you can use the S3Bucket primitive to create a bucket in your AWS account.

Configuration Options

You can provide the following options to control resource management of the S3 bucket:

OptionsDescription
force_destroyIf true destroy will fail if the bucket contains objects. Defaults to False .
primitive = S3Bucket(...).options(force_destroy=True)