Endpoints
The Endpoint pattern enables serving HTTP traffic via an API.
An endpoint is composed of:
- A
route
: this is the path that the endpoint will be available at. - A
method
: this is the HTTP method that the endpoint will be available at currently GET and POST are supported. - Your processing Logic.
Endpoints are created as part of a Service
. A Service is a collection of endpoints that all share the same set of resources.
A service can be created by calling app.service()
. This will create a service that is automatically attached to the flow.
from buildflow import Flow
app = Flow()
service = app.service()
You can also manually create a service and attach it with app.add_service(service)
from buildflow import Flow, Service
app = Flow()
service = Service()
app.add_service(service)
Endpoints can be attached to a service with the @service.endpoint(...)
decorator.
from buildflow import Flow
app = Flow()
service = app.service()
@service.endpoint(route="/", method="POST")
def my_endpoint(...):
...
Or by calling service.add_endpoint(...)
and using the @endpoint
decorator.
from buildflow import Flow, endpoint
app = Flow()
service = app.service()
@endpoint(route="/", method="POST")
def my_endpoint(...):
...
service.add_endpoint(my_endpoint)
Request and Response Type
JSON Request Body and Response
The input of your collector will be one request from your HTTP endpoint. If you endpoint is processing a JSON payload you can specify a dataclass type, and the JSON will be automatically deserialized into that type. Similar for responses, if you return a dataclass it will be serialized to JSON.
from dataclasses import dataclass
from buildflow import Flow
@dataclass
class MyType:
val: int
app = Flow()
service = app.service()
@service.endpoint(route="/", method="POST")
def my_endpoint(request: MyType) -> MyType:
return request
Query parameters
With endpoints you can also define query parameters. The query is the key-value pairs that go after the ?
in the URL.
from buildflow import Flow
service = app.service()
@service.endpoint(route="/my_endpoint", method="GET")
def my_endpoint(my_int: int) -> int:
return request
Requests to this URL would look like:
http://localhost/my_endpoint?my_int=1
Raw Request
If you need access to the entire request you can also import from buildflow.requests import Request
. If you do this no validation will be performed but you will have access to all the request data.
buildflow.requests.Request
is just a proxy for the starlette.requests.Request object.
We simply provide it as a convenience to avoid having to import starlette.requests.Request
directly.
from buildflow import Flow
from buildflow.requests import Request
app = Flow()
service = app.service()
@service.endpoint(route="/", method="POST")
def my_endpoint(request: Request):
...
Other Request Types
We also offer additional request types that can be used to receive different types other than JSON. Such as form data or files. These are available in the buildflow.requests
module.
Note these are just proxy imports for FastAPI request types for convenience. You can find more details on these in the FastAPI docs.
Request Type | Description |
---|---|
Form | Process a form from an http body. Proxy for fastapi.Form. |
UploadFile | Return a file from and http request. Proxy for fastapi.UploadFile. |
Other Response Types
We also offer additional response types that can be used to return different types other than JSON.
These can all be found in the buildflow.responses
module.
These responses are all proxies for different starlette.responses objects, and can be used interchangeably.
Response Type | Description |
---|---|
FileResponse | Return a file from disk. Proxy for starlette.responses.FileResponse. |
HTMLResponse | Return an HTML file. Proxy for starlette.responses.HTMLResponse. |
JSONResponse | Return a JSON response. Proxy for starlette.responses.JSONResponse. |
PlainTextResponse | Return a plain text response. Proxy for starlette.responses.PlainTextResponse. |
RedirectResponse | Return a redirect. Proxy for starlette.responses.RedirectResponse. |
StreamingResponse | Returns a streaming response. Proxy for starlette.responses.StreamingResponse. |
Middleware
You can attach middle ware to your service using the add_middleware
method on your service. You can use any middleware provided by Starlette. We also provide some convenience imports at buildflow.middleware
.
CORS middleware
For additional details see the starlette CORS middleware.
from buildflow import Service
from buildflow.middleware import CORSMiddleware
service = Service()
service.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SessionMiddleware
For additional details see the starlette SessionMiddleware.
from buildflow import Service
from buildflow.middleware import SessionMiddleware
service = Service()
service.add_middleware(SessionMiddleware, secret_key=session_secret)
Open API Docs
BuildFlow also automatically provides open API docs for your serivces. You can access these at the /docs
endpoint.
Service Options
Services can be individually configured for different resource requirements.
The following options are available:
Option | Description |
---|---|
service_id | A unique ID for your service. Defaults to a random UUID. |
base_route | Base route for all endpoints in the service. Defaults to / . This must be unique for all services in a flow. |
num_cpus | How many CPUs should be allocated to the processor. This can be any floating point number > 0. Defaults to 1 . |
log_level | Log level for the processor. Defaults to logging.INFO . |
num_replicas | Number of replicas the consumer should start with. Defaults to 1 |
min_replicas | Minimum number of replicas the consumer should have. Defaults to 1 |
max_replicas | Maximum number of replicas the consumer should have. Defaults to 1 |
max_concurrent_queries | Maximum number of queries that are sent to a replica without receving a response. Defaults to 100 |
target_num_ongoing_requests_per_replica | Target number of ongoing requests per replica before scaling. Setting this lower will mean replicas are added faster, setting it higher will mean replicas are added slower. Defaults to 1 |
Example
from buildflow import Flow
from buildflow.requests import Request
app = Flow()
service = app.service(num_cpus=2)
@service.endpoint(route="/", method="POST")
def my_endpoint(request: Request):
...