The Endpoint pattern enables serving HTTP traffic via an API.

An endpoint is composed of:

  • A route: this is the path that the endpoint will be available at.
  • A method: this is the HTTP method that the endpoint will be available at currently GET and POST are supported.
  • Your processing Logic.

Endpoints are created as part of a Service. A Service is a collection of endpoints that all share the same set of resources.

A service can be created by calling app.service(). This will create a service that is automatically attached to the flow.

from buildflow import Flow

app = Flow()
service = app.service()

You can also manually create a service and attach it with app.add_service(service)

from buildflow import Flow, Service

app = Flow()
service = Service()
app.add_service(service)

Endpoints can be attached to a service with the @service.endpoint(...) decorator.

from buildflow import Flow

app = Flow()
service = app.service()

@service.endpoint(route="/", method="POST")
def my_endpoint(...):
    ...

Or by calling service.add_endpoint(...) and using the @endpoint decorator.

from buildflow import Flow, endpoint

app = Flow()
service = app.service()

@endpoint(route="/", method="POST")
def my_endpoint(...):
    ...
service.add_endpoint(my_endpoint)

Request and Response Type

JSON Request Body and Response

The input of your collector will be one request from your HTTP endpoint. If you endpoint is processing a JSON payload you can specify a dataclass type, and the JSON will be automatically deserialized into that type. Similar for responses, if you return a dataclass it will be serialized to JSON.

from dataclasses import dataclass
from buildflow import Flow

@dataclass
class MyType:
    val: int

app = Flow()
service = app.service()

@service.endpoint(route="/", method="POST")
def my_endpoint(request: MyType) -> MyType:
    return request

Query parameters

With endpoints you can also define query parameters. The query is the key-value pairs that go after the ? in the URL.

from buildflow import Flow

service = app.service()

@service.endpoint(route="/my_endpoint", method="GET")
def my_endpoint(my_int: int) -> int:
    return request

Requests to this URL would look like:

http://localhost/my_endpoint?my_int=1

Raw Request

If you need access to the entire request you can also import from buildflow.requests import Request. If you do this no validation will be performed but you will have access to all the request data.

buildflow.requests.Request is just a proxy for the starlette.requests.Request object.

We simply provide it as a convenience to avoid having to import starlette.requests.Request directly.

from buildflow import Flow
from buildflow.requests import Request

app = Flow()
service = app.service()

@service.endpoint(route="/", method="POST")
def my_endpoint(request: Request):
    ...

Other Request Types

We also offer additional request types that can be used to receive different types other than JSON. Such as form data or files. These are available in the buildflow.requests module.

Note these are just proxy imports for FastAPI request types for convenience. You can find more details on these in the FastAPI docs.

Request TypeDescription
FormProcess a form from an http body. Proxy for fastapi.Form.
UploadFileReturn a file from and http request. Proxy for fastapi.UploadFile.

Other Response Types

We also offer additional response types that can be used to return different types other than JSON. These can all be found in the buildflow.responses module.

These responses are all proxies for different starlette.responses objects, and can be used interchangeably.

Response TypeDescription
FileResponseReturn a file from disk. Proxy for starlette.responses.FileResponse.
HTMLResponseReturn an HTML file. Proxy for starlette.responses.HTMLResponse.
JSONResponseReturn a JSON response. Proxy for starlette.responses.JSONResponse.
PlainTextResponseReturn a plain text response. Proxy for starlette.responses.PlainTextResponse.
RedirectResponseReturn a redirect. Proxy for starlette.responses.RedirectResponse.
StreamingResponseReturns a streaming response. Proxy for starlette.responses.StreamingResponse.

Middleware

You can attach middle ware to your service using the add_middleware method on your service. You can use any middleware provided by Starlette. We also provide some convenience imports at buildflow.middleware.

CORS middleware

For additional details see the starlette CORS middleware.

from buildflow import Service
from buildflow.middleware import CORSMiddleware

service = Service()
service.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

SessionMiddleware

For additional details see the starlette SessionMiddleware.

from buildflow import Service
from buildflow.middleware import SessionMiddleware

service = Service()
service.add_middleware(SessionMiddleware, secret_key=session_secret)

Open API Docs

BuildFlow also automatically provides open API docs for your serivces. You can access these at the /docs endpoint.

Service Options

Services can be individually configured for different resource requirements.

The following options are available:

OptionDescription
service_idA unique ID for your service. Defaults to a random UUID.
base_routeBase route for all endpoints in the service. Defaults to /. This must be unique for all services in a flow.
num_cpusHow many CPUs should be allocated to the processor. This can be any floating point number > 0. Defaults to 1.
log_levelLog level for the processor. Defaults to logging.INFO.
num_replicasNumber of replicas the consumer should start with. Defaults to 1
min_replicasMinimum number of replicas the consumer should have. Defaults to 1
max_replicasMaximum number of replicas the consumer should have. Defaults to 1
max_concurrent_queriesMaximum number of queries that are sent to a replica without receving a response. Defaults to 100
target_num_ongoing_requests_per_replicaTarget number of ongoing requests per replica before scaling. Setting this lower will mean replicas are added faster, setting it higher will mean replicas are added slower. Defaults to 1

Example

from buildflow import Flow
from buildflow.requests import Request

app = Flow()
service = app.service(num_cpus=2)

@service.endpoint(route="/", method="POST")
def my_endpoint(request: Request):
    ...