Consumers
The Consumer pattern enables building event-driven architectures. You define your business logic in a function and attach where it should read data from and where it should write data to.
A consumer is composed of:
Consumers can easily be created with the consumer
decorator method of your flow.
@app.consumer(source=Primitive(...), sink=Primitive(...))
def my_consumer(...):
...
Or you can use the @consumer
decorator and attach it with the add_consumer
method of your flow.
from buildflow import Flow, consumer
@consumer(source=Primitive(...), sink=Primitive(...))
def my_consumer(...):
...
app = Flow()
flow.add_consumer(my_consumer)
Input and Output
The input of your consumer will be one element that is read from your source
primitive. For instance if your source type is a queue (like Kafka, GCP Pub/Sub, or AWS SQS) then your processor will receive one message at a time that is read from the queue.
The element you return from your consumer will be written to your sink
primitive. For instance if your sink type is an OLAP datastore (like GCP BigQuery or Snowflake) one row will be written for each element you return from your processor, or if you sink is a queue (like Kafka, GCP Pub/Sub, or AWS SQS) then one message will be published for each element you return from your processor.
For more details on what these types should / can be for your source and sink see our source and sink types documentation.
Consumer Options
Consumers can be individually configured for different resource requirements.
The following options are available:
Option | Description |
---|---|
num_cpus | How many CPUs should be allocated to the processor. This can be any floating point number > 0. Defaults to 1 . |
num_concurrent_tasks | number of current tasks to read from your source with. Defaults to 1 . |
log_level | Log level for the processor. Defaults to logging.INFO . |
num_replicas | Number of replicas the consumer should start with. Defaults to 1 |
min_replicas | Minimum number of replicas the consumer should have. Defaults to 1 |
max_replicas | Maximum number of replicas the consumer should have. Defaults to 1 |
autoscale_frequency_secs | How often the consumer should check if it needs to scale up or down. Defaults to 60 |
consumer_backlog_burn_threshold | Threshold for how many seconds it will take to burn down a backlog before scaling up. Increasing this number will cause your consumer to scale up more aggresively. Defaults to 60 . |
consumer_cpu_percent_target | Target cpu percentage for scaling down. Increasing this number will cause your consumer to scale down more aggresively. Defaults to 25 . |
Example
Decorator example:
@app.consumer(
source=...,
sink=...,
num_cpus=2,
num_concurrent_tasks=2,
log_level=logging.DEBUG,
num_replicas=10,
)
def my_conusmer():
...