Processors
A processor is where you will define your actual business logic. We offer three different patterns of processors to support a wide variety of use cases.
Pattern | Description |
---|---|
Endpoints | Takes in data from an HTTP endpoint via a GET or POST request and returns a user defined response. |
Consumers | Event-driven processor that reads data from a source and writes data to a sink. |
Collectors | Takes in data from an HTTP endpoint via a GET or POST request and writes it to a sink. Then returns a success or failure response. |
Some additional patterns we hope to support in the future are open websocket connections and batch pipelines.
Async Processors
TL:DR; If you are using any libraries that require you to use await
. You should make your processor async.
All BuildFlow processors patterns run in an async runtime. So you can make your process async by simply adding the async
keyword.
@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(...):
...
Async With Ray
The async run time is powered by Ray. This means you can use any of the functionality of ray core to add increased parallelism to your processor. This is recommended if you need to perform any long running or computationally expensive tasks in your processor.
@ray.remote
def long_task(elem):
time.sleep(10)
return elem
@app.consumer(source=ResourceType(...), sink=ResourceType(...))
def my_consumer(elem):
return await long_task.remote(elem)
Custom Types
BuildFlow supports using custom types for both the input and your output of the processor. If you are receiving or writing a JSON payload you can simply use a dataclass
to get automatic serialization and deserialization.
@dataclass
class InputType:
a: int
@dataclass
class OutputType:
b: int
@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(elem: InputType) -> OutputType:
return(OutputType(b=elem.a + 1))
For Consumers
For consumers the input type defines what data time is coming from your sink, and the output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.
For Collectors
For collectors the input type defines the JSON payload that is expected from the HTTP request. The output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.
For Endpoints
For endpoints the input type defines the JSON payload that is expected from the HTTPrequest. The output type defines the JSON payload that will be sent back in the response. For more details on different types of requests and response payload specific to endpoints see our endpoint documentation.
Autoscaling Options
These options control how the autoscaler should scale individual processors can be passed when creating a processor of any type. All options are optional and have reasonable defaults. Your options are:
Option | Description |
---|---|
num_replicas | Number of replicas the consumer should start with. Defaults to 1 |
min_replicas | Minimum number of replicas the consumer should have. Defaults to 1 |
max_replicas | Maximum number of replicas the consumer should have. Defaults to 1 |
All of these options can be set when creating a processor of any type. For example:
@app.consumer(source=..., sink=..., min_replicas=1, max_replicas=10)
Different processor types have different options for configuring autoscaling. See the specific processor type documentation for more details (endpoints, collectors, consumers).