Schedule Async Tasks
In this guide we will go over how to schedule async tasks in your application. All the code for this guide can be found on GitHub.
To do this we will use:
- an endpoint receive requests from user
- queue primitives to schedule tasks on a queue for GCP we will use Pub/Sub and for AWS we will use SQS
- a sink dependency to push data to our queue
- a consumer for processing data pushed to the queue
To see a full working project example with a UI included see our Image Classification example.
Initialize Flow
First we create a new BuildFlow Flow. This will be the main entry point to our application.
app = Flow()
Attach a Service
We then attach a service to our flow object. We will later attach an endpoint for receiving data.
service = app.service(service_id="schedule-async-tasks")
Define Queue Primitives
Next we define primitives to enque data for async tasks, and mark them as managed by BuildFlow.
- For GCP we use a Pub/Sub topic and Pub/Sub subscription
- For AWS we use a SQS queue
If you would like to use AWS simply comment out the GCP lines, and uncomment the AWS lines.
gcp_pubsub_topic = GCPPubSubTopic(
project_id=os.environ["GCP_PROJECT_ID"], topic_name="sample-topic"
)
gcp_pubsub_subscription = GCPPubSubSubscription(
project_id=os.environ["GCP_PROJECT_ID"],
subscription_name="sample-subscription",
).options(topic=gcp_pubsub_topic)
# sqs_queue = SQSQueue(queue_name="sample-queue")
app.manage(gcp_pubsub_topic, gcp_pubsub_subscription)
# app.manage(sqs_queue)
Create a Sink Dependency
Now we create a sink dependency to push data to our queue.
Swap the comments on these lines if you are using AWS.
Sink = SinkDependencyBuilder(gcp_pubsub_topic)
# Sink = SinkDependencyBuilder(sqs_queue)
Attach Endpoint for Receiving Data
Next we attach an endpoint to our service to receive data from the user.
This endpoint takes in two arguments.
- The number provided by the user
- The sink we defined in the previous step
We then call sink.push
to push a JSON object to our queue.
Notice that sink.push
is an async method so we must use await
to call it, and we mark our endpoint processor as async by using async def
.
@service.endpoint("/square", method="GET")
async def square_endpoint(num: int, sink: Sink):
await sink.push({"num": num})
return {"success": True}
Consume the Queue
Finally we consume the data that was put on the queue using a consumer.
Swap the comments on these lines if you are using AWS.
# @app.consumer(source=sqs_queue)
@app.consumer(source=gcp_pubsub_subscription)
def square_consumer(element: Dict[str, int]):
square = element["num"] * element["num"]
print(f"square of: {element['num']} is {square}")
All this consumer does is print the square of the number that was pushed to the queue. You could optional add a sink
to allow the consumer to push data to another primitive.
Run the Code
If you’re using GCP ensure you set the environment variable GCP_PROJECT_ID
before running any of the below commands.
First create all the resources with the VSCode extension or the CLI:
buildflow apply
Once completed you can run the code with the VSCode extension or the CLI:
buildflow run
Now you can visit: http://localhost:8000/docs to see the swagger UI to send a request and watch it be enqueue and processed by your consumer.