Amazon Web Services
S3 File Change Stream
S3FileChangeStream is a source primitive that can be used to subscribe to file changes on a given S3 bucket. To create a S3FileChangeStream provide:
Arg | Description |
---|---|
s3_bucket | S3Bucket to subscribe to change events from. NOTE: you do not need to provide file path or file format. |
event_types | List of event types to subscribe to. Defaults to: [S3ChangeStreamEventType.OBJECT_CREATED_ALL] |
You must mark your S3FileChangeStream primitive as managed to have the underlying resources created.
from buildflow.io.aws import S3Bucket, S3FileChangeStream
from buildflow.types.aws import S3FileChangeEvent
bucket = S3Bucket(bucket_name="my-bucket")
source = S3FileChangeStream(s3_bucket=bucket)
app.manage(bucket, source)
@app.consumer(source=source, sink=...)
def consumer(elem: S3FileChangeEvent):
...
With event types:
from buildflow.io.aws import S3Bucket, S3FileChangeStream
from buildflow.types.aws import S3FileChangeEvent, S3ChangeStreamEventType
bucket = S3Bucket(bucket_name="my-bucket")
source = S3FileChangeStream(
s3_bucket=bucket,
event_types=[S3ChangeStreamEventType.OBJECT_CREATED_ALL, S3ChangeStreamEventType.OBJECT_REMOVED_ALL])
app.manage(bucket, source)
@app.consumer(source=source, sink=...)
def consumer(elem: S3FileChangeEvent):
...
Types
The S3FileChangeStream source always returns a S3FileChangeEvent
object. This object contains the following fields:
Field | Description |
---|---|
event_type | The buildflow.types.aws.S3ChangeStreamEventType event type |
metadata | Any metadata that was associated with the file change event. This is a dictionary of strings. |
blob | The raw contents of the file. This is lazily fetched. |
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the S3FileChangeStream primitive type to create and manage all resources that are need to subscribe to a S3 bucket.
The following resources will be created:
- S3 Bucket
- SQS Queue
- S3 Bucket Notification
- SQS Queue Policy