A processor is where you will define your actual business logic. We offer three different patterns of processors to support a wide variety of use cases.

PatternDescription
EndpointsTakes in data from an HTTP endpoint via a GET or POST request and returns a user defined response.
ConsumersEvent-driven processor that reads data from a source and writes data to a sink.
CollectorsTakes in data from an HTTP endpoint via a GET or POST request and writes it to a sink. Then returns a success or failure response.

Some additional patterns we hope to support in the future are open websocket connections and batch pipelines.

Async Processors

TL:DR; If you are using any libraries that require you to use await. You should make your processor async.

All BuildFlow processors patterns run in an async runtime. So you can make your process async by simply adding the async keyword.

@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(...):
    ...

Async With Ray

The async run time is powered by Ray. This means you can use any of the functionality of ray core to add increased parallelism to your processor. This is recommended if you need to perform any long running or computationally expensive tasks in your processor.

@ray.remote
def long_task(elem):
    time.sleep(10)
    return elem

@app.consumer(source=ResourceType(...), sink=ResourceType(...))
def my_consumer(elem):
    return await long_task.remote(elem)

Custom Types

BuildFlow supports using custom types for both the input and your output of the processor. If you are receiving or writing a JSON payload you can simply use a dataclass to get automatic serialization and deserialization.

@dataclass
class InputType:
    a: int

@dataclass
class OutputType:
    b: int

@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(elem: InputType) -> OutputType:
    return(OutputType(b=elem.a + 1))

For Consumers

For consumers the input type defines what data time is coming from your sink, and the output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.

For Collectors

For collectors the input type defines the JSON payload that is expected from the HTTP request. The output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.

For Endpoints

For endpoints the input type defines the JSON payload that is expected from the HTTPrequest. The output type defines the JSON payload that will be sent back in the response. For more details on different types of requests and response payload specific to endpoints see our endpoint documentation.

Autoscaling Options

These options control how the autoscaler should scale individual processors can be passed when creating a processor of any type. All options are optional and have reasonable defaults. Your options are:

OptionDescription
num_replicasNumber of replicas the consumer should start with. Defaults to 1
min_replicasMinimum number of replicas the consumer should have. Defaults to 1
max_replicasMaximum number of replicas the consumer should have. Defaults to 1

All of these options can be set when creating a processor of any type. For example:

@app.consumer(source=..., sink=..., min_replicas=1, max_replicas=10)

Different processor types have different options for configuring autoscaling. See the specific processor type documentation for more details (endpoints, collectors, consumers).