Flows
A Flow is the entity that is run by BuildFlow, and is the container for any processors you would like to run together.
Create a New Application
A flow is created by instantiating a Flow
object:
from buildflow import Flow
app = Flow()
Attach Processors
All processor types can be attached directly to your flow either by using the @app.{PROCESSOR}
decorator or by calling the app.add_{PROCESSOR}
method.
Using app. decorators
This method is convenient if all of your processors are defined in the same file as your flow.
from buildflow import Flow
app = Flow()
@app.consumer(source=..., sink=...)
def consumer(...):
...
@app.collector("/collect", method="GET", sink=...)
def collector(...):
...
service = app.service()
@service.endpoint("/serve", method="GET")
def endpoint(...):
...
Using add methods
These methods are convenient if you would like to define your processors in a different file than your flow.
add_consumer
: attaches a consumer to your flowadd_collector
: attaches a collector to your flowadd_service
: attaches a service to your flow
from buildflow import Flow, consumer, collector, Service, endpoint
app = Flow()
@consumer(source=..., sink=...)
def consumer_proc(...):
...
app.add_consumer(consumer_proc)
@collector("/collect", method="GET", sink=...)
def collector_proc(...):
...
app.add_collector(collector_proc)
service = Service()
@endpoint("/serve", method="GET")
def endpoint_proc(...):
...
service.add_endpoint(endpoint_proc)
app.add_service(service)
Managing Primitives
The Flow object is also responsible for managing the lifecycle of all primitives that are attached to it. Primitives can be attached to a flow by calling app.manage(...)
When a primitive is managed it can be created / update by running buildflow apply
or destroyed with buildflow destroy
from buildflow import Flow
from buildflow.io.aws import AWSBucket
from buildflow.io.gcp import GCSBucket
app = Flow()
app.manage(AWSBucket("my-bucket"), GCSBucket("my-bucket"))
In the above example our Flow is responsible for managing both the AWS and GCP bucket. When we run buildflow apply
both buckets will be created, and when we run buildflow destroy
both buckets will be destroyed.
Flow Options
The Flow object take one FlowOptions
object for configuring the flow.
In the FlowOptions
object you can specify any configuration that you want to be applied to all processors in your flow.
This include options for configuring the infrastructure and the runtime. All options are optional and have reasonable defaults.
from buildflow import Flow, FlowOptions
app = Flow(flow_options=FlowOptions(...))
Runtime
The Runtime options control how the node should operate when it is run. Your options are:
- log_level - The log level to use for all processors in the node. Defaults to
INFO
Credentials
Credentials options control how the Flow should authenticate with the cloud providers (AWS and GCP).
AWS Credentials
- aws_access_key_id - The AWS access key id to use for authentication
- aws_secret_access_key - The AWS secret access key to use for authentication
- aws_session_token - The AWS session token to use for authentication
GCP Credentials
- gcp_service_account_info - The JSON serialized string of the GCP service account to use for authentication. This can be a JSON key or a workload identity pool configuration.
Infrastructure
- stack - The name of the stack to consume, Defaults to
local
. This stack should reference a stack in your buildflow.yaml - require_confirmation - Whether or not confirmation should be required before applying changes. Defaults to
True
.