A Flow is the entity that is run by BuildFlow, and is the container for any processors you would like to run together.

Create a New Application

A flow is created by instantiating a Flow object:

from buildflow import Flow

app = Flow()

Attach Processors

All processor types can be attached directly to your flow either by using the @app.{PROCESSOR} decorator or by calling the app.add_{PROCESSOR} method.

Using app. decorators

This method is convenient if all of your processors are defined in the same file as your flow.

from buildflow import Flow

app = Flow()

@app.consumer(source=..., sink=...)
def consumer(...):
    ...

@app.collector("/collect", method="GET", sink=...)
def collector(...):
    ...

service = app.service()
@service.endpoint("/serve", method="GET")
def endpoint(...):
    ...

Using add methods

These methods are convenient if you would like to define your processors in a different file than your flow.

  • add_consumer: attaches a consumer to your flow
  • add_collector: attaches a collector to your flow
  • add_service: attaches a service to your flow
from buildflow import Flow, consumer, collector, Service, endpoint

app = Flow()

@consumer(source=..., sink=...)
def consumer_proc(...):
    ...
app.add_consumer(consumer_proc)

@collector("/collect", method="GET", sink=...)
def collector_proc(...):
    ...
app.add_collector(collector_proc)

service = Service()
@endpoint("/serve", method="GET")
def endpoint_proc(...):
    ...
service.add_endpoint(endpoint_proc)
app.add_service(service)

Managing Primitives

The Flow object is also responsible for managing the lifecycle of all primitives that are attached to it. Primitives can be attached to a flow by calling app.manage(...) When a primitive is managed it can be created / update by running buildflow apply or destroyed with buildflow destroy

from buildflow import Flow
from buildflow.io.aws import AWSBucket
from buildflow.io.gcp import GCSBucket


app = Flow()
app.manage(AWSBucket("my-bucket"), GCSBucket("my-bucket"))

In the above example our Flow is responsible for managing both the AWS and GCP bucket. When we run buildflow apply both buckets will be created, and when we run buildflow destroy both buckets will be destroyed.

Flow Options

The Flow object take one FlowOptions object for configuring the flow. In the FlowOptions object you can specify any configuration that you want to be applied to all processors in your flow. This include options for configuring the infrastructure and the runtime. All options are optional and have reasonable defaults.

from buildflow import Flow, FlowOptions
app = Flow(flow_options=FlowOptions(...))

Runtime

The Runtime options control how the node should operate when it is run. Your options are:

  • log_level - The log level to use for all processors in the node. Defaults to INFO

Credentials

Credentials options control how the Flow should authenticate with the cloud providers (AWS and GCP).

AWS Credentials

  • aws_access_key_id - The AWS access key id to use for authentication
  • aws_secret_access_key - The AWS secret access key to use for authentication
  • aws_session_token - The AWS session token to use for authentication

GCP Credentials

  • gcp_service_account_info - The JSON serialized string of the GCP service account to use for authentication. This can be a JSON key or a workload identity pool configuration.

Infrastructure

  • stack - The name of the stack to consume, Defaults to local. This stack should reference a stack in your buildflow.yaml
  • require_confirmation - Whether or not confirmation should be required before applying changes. Defaults to True.