The Consumer pattern enables building event-driven architectures. You define your business logic in a function and attach where it should read data from and where it should write data to.

A consumer is composed of:

Consumers can easily be created with the consumer decorator method of your flow.

@app.consumer(source=Primitive(...), sink=Primitive(...))
def my_consumer(...):
    ...

Or you can use the @consumer decorator and attach it with the add_consumer method of your flow.

from buildflow import Flow, consumer
@consumer(source=Primitive(...), sink=Primitive(...))
def my_consumer(...):
    ...

app = Flow()
flow.add_consumer(my_consumer)

Input and Output

The input of your consumer will be one element that is read from your source primitive. For instance if your source type is a queue (like Kafka, GCP Pub/Sub, or AWS SQS) then your processor will receive one message at a time that is read from the queue.

The element you return from your consumer will be written to your sink primitive. For instance if your sink type is an OLAP datastore (like GCP BigQuery or Snowflake) one row will be written for each element you return from your processor, or if you sink is a queue (like Kafka, GCP Pub/Sub, or AWS SQS) then one message will be published for each element you return from your processor.

For more details on what these types should / can be for your source and sink see our source and sink types documentation.

Consumer Options

Consumers can be individually configured for different resource requirements.

The following options are available:

OptionDescription
num_cpusHow many CPUs should be allocated to the processor. This can be any floating point number > 0. Defaults to 1.
num_concurrent_tasksnumber of current tasks to read from your source with. Defaults to 1.
log_levelLog level for the processor. Defaults to logging.INFO.
num_replicasNumber of replicas the consumer should start with. Defaults to 1
min_replicasMinimum number of replicas the consumer should have. Defaults to 1
max_replicasMaximum number of replicas the consumer should have. Defaults to 1
autoscale_frequency_secsHow often the consumer should check if it needs to scale up or down. Defaults to 60
consumer_backlog_burn_thresholdThreshold for how many seconds it will take to burn down a backlog before scaling up. Increasing this number will cause your consumer to scale up more aggresively. Defaults to 60.
consumer_cpu_percent_targetTarget cpu percentage for scaling down. Increasing this number will cause your consumer to scale down more aggresively. Defaults to 25.

Example

Decorator example:

@app.consumer(
    source=...,
    sink=...,
    num_cpus=2,
    num_concurrent_tasks=2,
    log_level=logging.DEBUG,
    num_replicas=10,
)
def my_conusmer():
    ...