A processor is where you will define your actual business logic. We offer three different patterns of processors to support a wide variety of use cases.
|Endpoints||Takes in data from an HTTP endpoint via a GET or POST request and returns a user defined response.|
|Consumers||Event-driven processor that reads data from a source and writes data to a sink.|
|Collectors||Takes in data from an HTTP endpoint via a GET or POST request and writes it to a sink. Then returns a success or failure response.|
Some additional patterns we hope to support in the future are open websocket connections and batch pipelines.
TL:DR; If you are using any libraries that require you to use
await. You should make your processor async.
All BuildFlow processors patterns run in an async runtime. So you can make your process async by simply adding the
@app.consumer(source=Primitive(...), sink=Primitive(...)) async def my_consumer(...): ...
Async With Ray
The async run time is powered by Ray. This means you can use any of the functionality of ray core to add increased parallelism to your processor. This is recommended if you need to perform any long running or computationally expensive tasks in your processor.
@ray.remote def long_task(elem): time.sleep(10) return elem @app.consumer(source=ResourceType(...), sink=ResourceType(...)) def my_consumer(elem): return await long_task.remote(elem)
BuildFlow supports using custom types for both the input and your output of the processor. If you are receiving or writing a JSON payload you can simply use a
dataclass to get automatic serialization and deserialization.
@dataclass class InputType: a: int @dataclass class OutputType: b: int @app.consumer(source=Primitive(...), sink=Primitive(...)) async def my_consumer(elem: InputType) -> OutputType: return(OutputType(b=elem.a + 1))
For consumers the input type defines what data time is coming from your sink, and the output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.
For collectors the input type defines the JSON payload that is expected from the HTTP request. The output type defines what will be written to your sink. For more details on how to use additional types with sources and sinks see our source and sink documentation.
For endpoints the input type defines the JSON payload that is expected from the HTTPrequest. The output type defines the JSON payload that will be sent back in the response. For more details on different types of requests and response payload specific to endpoints see our endpoint documentation.
These options control how the autoscaler should scale individual processors can be passed when creating a processor of any type. All options are optional and have reasonable defaults. Your options are:
|num_replicas||Number of replicas the consumer should start with. Defaults to |
|min_replicas||Minimum number of replicas the consumer should have. Defaults to |
|max_replicas||Maximum number of replicas the consumer should have. Defaults to |
All of these options can be set when creating a processor of any type. For example:
@app.consumer(source=..., sink=..., min_replicas=1, max_replicas=10)