In this guide we will go over how to schedule async tasks in your application. All the code for this guide can be found on GitHub.

To do this we will use:

To see a full working project example with a UI included see our Image Classification example.

1

Initialize Flow

First we create a new BuildFlow Flow. This will be the main entry point to our application.

app = Flow()
2

Attach a Service

We then attach a service to our flow object. We will later attach an endpoint for receiving data.

service = app.service(service_id="schedule-async-tasks")
3

Define Queue Primitives

Next we define primitives to enque data for async tasks, and mark them as managed by BuildFlow.

If you would like to use AWS simply comment out the GCP lines, and uncomment the AWS lines.

gcp_pubsub_topic = GCPPubSubTopic(
    project_id=os.environ["GCP_PROJECT_ID"], topic_name="sample-topic"
)
gcp_pubsub_subscription = GCPPubSubSubscription(
    project_id=os.environ["GCP_PROJECT_ID"],
    subscription_name="sample-subscription",
).options(topic=gcp_pubsub_topic)

# sqs_queue = SQSQueue(queue_name="sample-queue")

app.manage(gcp_pubsub_topic, gcp_pubsub_subscription)
# app.manage(sqs_queue)
4

Create a Sink Dependency

Now we create a sink dependency to push data to our queue.

Swap the comments on these lines if you are using AWS.

Sink = SinkDependencyBuilder(gcp_pubsub_topic)
# Sink = SinkDependencyBuilder(sqs_queue)
5

Attach Endpoint for Receiving Data

Next we attach an endpoint to our service to receive data from the user.

This endpoint takes in two arguments.

  1. The number provided by the user
  2. The sink we defined in the previous step

We then call sink.push to push a JSON object to our queue.

Notice that sink.push is an async method so we must use await to call it, and we mark our endpoint processor as async by using async def.

@service.endpoint("/square", method="GET")
async def square_endpoint(num: int, sink: Sink):
    await sink.push({"num": num})
    return {"success": True}
6

Consume the Queue

Finally we consume the data that was put on the queue using a consumer.

Swap the comments on these lines if you are using AWS.

# @app.consumer(source=sqs_queue)
@app.consumer(source=gcp_pubsub_subscription)
def square_consumer(element: Dict[str, int]):
    square = element["num"] * element["num"]
    print(f"square of: {element['num']} is {square}")

All this consumer does is print the square of the number that was pushed to the queue. You could optional add a sink to allow the consumer to push data to another primitive.

7

Run the Code

If you’re using GCP ensure you set the environment variable GCP_PROJECT_ID before running any of the below commands.

First create all the resources with the VSCode extension or the CLI:

buildflow apply

Once completed you can run the code with the VSCode extension or the CLI:

buildflow run

Now you can visit: http://localhost:8000/docs to see the swagger UI to send a request and watch it be enqueue and processed by your consumer.