BuildFlow allows you to define custom dependencies for your processor that can be injected whenever your processor is called. This enable you to easily integrate other components into your processor. BuildFlow provides many ready to use dependencies to make integrations even quicker.
What do dependencies enable?
- Shared logic: dependencies can be used across processor
- Stateful processors: dependencies can be scoped allowing the results to be “cached”
- Shared state: global dependencies mean processors can share state across an entire flow
- Security, Authentication: common dependencies can be used to ensure you are authenticating requests the same way everytime
How do you use dependencies?
Lets start with a super simple example of a common string that should be injected into every endpoint.
from buildflow import Flow from buildflow.dependencies import dependency, Scope @dependency(Scope.GLOBAL) class MyStringDependency: def __init__(self): self.my_string = "HELLO!" app = Flow() service = app.service() @service.endpoint("/bob", method="GET") def bob(my_string_dep: MyStringDependency): return my_string_dep.my_string + " BOB!" @service.endpoint("/sally", method="GET") def sally(my_string_dep: MyStringDependency): return my_string_dep.my_string + " SALLY!"
That’s it! You simply define your class and add it as an argument to your procssor.
Even though we only use endpoints in the above example, dependencies can be used with any processor pattern (endpoints, consumers, or collectors),
We define the
MyStringDependency class by attaching the
@dependency decorator to it. This decorator can be attached to any class you define.
The decorator takes a single argument
scope which defines where the dependency is available. A dependency will always be cached for the lifetime of the scope.
There are several options for scope:
|PROCESS||A new dependency will be made available for every call to your processor and will be destroyed once the call to your processor is over.|
|REPLICA||A new dependency will be made available when ever a replica is created and destroyed whenever the replica is drained.|
|GLOBAL||A new dependency will be made available when your flow is started and destroyed whenever the flow is drained.|
|NO_SCOPE||A new dependency will be created every time this dependency is accessed (nothing will be cached).|
In our above example we mark the dependency as
GLOBAL which means it will be created when the flow is started and destroyed when the flow is drained. This means that the dependency will be available for the lifetime of the flow and will be shared across all processors. So no matter how many times you call your processor it will always reference the same string object.
GLOBAL scoped dependencies must be serializable since they are stored in an object store that is shared across replicas.
Dependencies can also depend on other dependencies. This allows you build any hierarchy of dependencies you may need. For example, maybe you want to inject a database connection into your processor, but also have a common dependency for fetching some common state from the database. This can be done by having two dependencies:
- Your first dependency sets up the database connection
- Your second dependency depends on the first and fetches the common state
from buildflow import Flow from buildflow.dependencies import dependency, Scope from my_db import connect @dependency(Scope.REPLICA) class ConnectionDep: def __init__(self): self.connection = connect @dependency(Scope.REPLICA) class DBStateDep: def __init__(self, connection_dep: ConnectionDep): self.state = connection_dep.connection.run_query() app = Flow() service = app.service() @service.endpoint("/", method="GET") def index(connection_dep: ConnectionDep, db_state_dep: DBStateDep): ...
In this example you can see that both the
index processor and
DBStateDep both depend on the
ConnectionDep so they will both actually get the same connection object.
You’ll also note that we marked our
ConnectDep dependency as
REPLICA scoped so this database connection will be kept alive and reused for all calls to your processor (unless a new replica is added for a scaling event).
If you are using your dependency in an endpoint or collector you can reference the
starlette.requests.Request) object in your dependency to access things specific to your request.
Your dependency must be marked as
PROCESS scoped to use the request object.
from buildflow import Flow from buildflow.dependencies import dependency, Scope from buildflow.requests import Request @dependency(Scope.PROCESS) class ExtractHeaderDep: def __init__(self, request: Request): self.header = request.headers.get("X-My-Header") app = Flow() service = app.service() @service.endpoint("/bob", method="GET") def bob(my_header_dep: ExtractHeaderDep): return my_header_dep.header