Collectors
The Collector pattern enables receiving HTTP requests and dumping the payload to a sink.
A collector is composed of:
- A
route
: this is the path that the endpoint will be available at. - A
method
: this is the HTTP method that the endpoint will be available at currently GET and POST are supported. - A
sink
primitive - Your processing Logic.
Collectors can easily be created with the collector
decorator method of your flow.
@app.collector(route="/", method="POST", sink=Primitive(...))
def my_collector(...):
...
Or you can use the @collector
decorator and attach it with the add_collector
method of your flow.
from buildflow import Flow, collector
@collector(route="/", method="POST", sink=Primitive(...))
def my_collector(...):
...
app = Flow()
flow.add_collector(my_collector)
Request and Output Type
The input of your collector will be one request from your HTTP endpoint. You can use a dataclass to define a JSON payload that you expect. If you do not want to process JSON see our endpoint guide for additional request types and parameters.
The element you return from your collector will be written to your sink
primitive. For instance if your sink type is an OLAP datastore (like GCP BigQuery or Snowflake) one row will be written for each element you return from your processor. Or if your sink is a queue (like Kafka, GCP Pub/Sub, or AWS SQS) then one message will be published for each element you return from your processor. See our source and sink types documentation for what type should be used for writting to a sink.
Collector Options
Collectors can be individually configured for different resource requirements.
The following options are available:
Option | Description |
---|---|
num_cpus | How many CPUs should be allocated to the processor. This can be any floating point number > 0. Defaults to 1 . |
log_level | log level for the processor. Defaults to logging.INFO . |
num_replicas | Number of replicas the consumer should start with. Defaults to 1 |
min_replicas | Minimum number of replicas the consumer should have. Defaults to 1 |
max_replicas | Maximum number of replicas the consumer should have. Defaults to 1 |
max_concurrent_queries | Maximum number of queries that are sent to a replica without receving a response. Defaults to 100 |
target_num_ongoing_requests_per_replica | Target number of ongoing requests per replica before scaling. Setting this lower will mean replicas are added faster, setting it higher will mean replicas are added slower. Defaults to 1 |
Example
@app.collector(route=..., method=..., sink=..., num_cpus=2, log_level=logging.DEBUG)
def my_processor():
...