Tutorial: Image Classification
In this guide we’ll introduce you to BuildFlow and BuildFlow concepts by walking through a simple image classification application.


Our application will:
- Load our model from local storage in a dependency ensuring the model is ready before we receive image uploads, and it can be shared across requests.
- Expose an endpoint that allows users to upload images
- When our endpoint receives an image, it will upload the image to a bucket (GCS or S3)
- When the image is uploaded it will trigger a notification to our queue (GCP Pub/Sub or AWS SQS)
- Our consumer will receive this notification and run the image through an image classification model using ImageAI
- The results of the image classification will be stored in a data warehouse (BigQuery) or DuckDB)
- The data warehouse will be exposed by another endpoint
If all goes well, you should have a working image classification app like this:

GCP
AWS
Local
All the code for this tutorial can be found on github; we will be walking through this repo step by step.
Define your Project
Your project is defined by the presence of a buildflow.yaml
file in the root of your project.
This is created when we use the BuildFlow CLI to create a new project, and doesn’t need to be updated unless you need to add something.
Our yaml file defines the following:
- project: the name of our project; this can be updated to any string containing lower-case letters and hyphens
- pulumi_config: our pulumi configuration; here we say we have one stack that is stored locally
- entry_point: defines how our application is run
project: buildflow-gcp-image-classification-walkthrough
pulumi_config:
pulumi_home: .buildflow/_pulumi
stacks:
- name: local
backend_url: file://.buildflow/_pulumi/local
entry_point: main:app
Initialize your Flow
We define our flow in main.py
this is the entry point we defined in our buildflow.yaml
file. app = Flow(...)
creates our Flow object which is a container for all of our business logic and resources.
You’ll notice we call a couple methods on our flow object.
- manage: this method marks a primitive as being managed. Meaning it can be created, updated, and destroyed by buildflow.
- add_service: attaches a service to our flow. A service is a container of endpoints, our service will be used for allowing users to upload images to our application.
- add_consumer: attaches a consumer to our flow. A consumer allows us to asynchronously process image uploads meaning it won’t block the users request!
from buildflow import Flow, FlowOptions
from image_classification import primitives
from image_classification.processors.consumer import classify_image
from image_classification.processors.service import service
from image_classification.settings import env
app = Flow(
flow_options=FlowOptions(
gcp_service_account_info=env.gcp_service_account_info, stack=env.env.value
)
)
app.manage(
primitives.bucket,
primitives.dataset,
primitives.table,
primitives.file_change_stream,
)
app.add_service(service)
app.add_consumer(classify_image)
Create your Primitives
In primitives.py
we define all of the primitives used by our applications. Primitives are resources that a processor may read from, write to, or reference.
You may have noticed these same primitives were passed into app.manage
in main.py
this means they will be created when you run buildflow run
for the first time.
We define 4 primitives here:
- bucket: a GCS bucket where we will upload user images to.
- file_change_stream: a GCS File Change Stream which sets up notifications for when a file is uploaded to our bucket. This is what triggers our image upload model to be called.
- dataset: a BigQuery dataset, this is a container for our BigQuery table.
- table: a BigQuery table where we will store our image classifications results.
from buildflow.io.gcp import (
GCSFileChangeStream,
BigQueryTable,
GCSBucket,
BigQueryDataset,
)
from image_classification.settings import env
from image_classification.schemas import ImageClassificationRow
bucket = GCSBucket(
project_id=env.gcp_project_id,
bucket_name=f"{env.gcp_project_id}-image-classification",
).options(force_destroy=True)
file_change_stream = GCSFileChangeStream(gcs_bucket=bucket)
dataset = BigQueryDataset(
project_id=env.gcp_project_id,
dataset_name="buildflow_image_classification_walkthrough",
)
table = BigQueryTable(dataset=dataset, table_name="image_classification").options(
schema=ImageClassificationRow
)
Add your Dependencies
In dependencies.py
we set up dependencies that will be used by our endpoints and consumer. Dependencies allow us to precompute and share code between processors. In our case we want to load the model when we start our application before any data is processed.
We do this by defining the ModelDep
class and add the decorator @dependency(scope=Scope.REPLICA)
which annotates the class as a dependency that should be created before a replica starts to process data.
We also define a BucketDep
which is a bucket dependency that allows us to connect to the bucket associated with our bucket primitive we defined in primitives.py
import os
from buildflow.dependencies import dependency, Scope
from buildflow.dependencies.bucket import BucketDependencyBuilder
from imageai.Classification import ImageClassification
from image_classification.primitives import bucket
@dependency(scope=Scope.REPLICA)
class ModelDep:
def __init__(self) -> None:
self.execution_path = os.path.dirname(os.path.realpath(__file__))
self.prediction = ImageClassification()
self.prediction.setModelTypeAsMobileNetV2()
self.prediction.setModelPath("mobilenet_v2-b0353104.pth")
self.prediction.loadModel()
BucketDep = BucketDependencyBuilder(bucket)
Add your Service
In service.py
we define our actual endpoints that will be called by our users. First we create a service with service = Service(...)
then attach endpoints to functions using @service.endpoint(...)
.
Remember we attached our service to our flow in main.py
with app.add_service
We define three endpoints here:
- index: this endpoint returns a simple HTML page that allows users to upload images.
- image_upload: this endpoint allows users to upload an image; then we upload them to our remote bucket. It takes in an UploadFile which is a file that has been uploaded by a user. We also take in the
bucket_dep
dependency we defined earlier. This allows us to upload the file to our bucket using the Google Cloud Storage client library. - image_upload_results: this endpoint returns a simple HTML page that displays the results of our image classification model. It does this by querying our BigQuery table and then rendering the results as a table.
from buildflow import Service
from buildflow.requests import UploadFile
from buildflow.responses import FileResponse, HTMLResponse
from google.cloud import bigquery
import pandas as pd
from image_classification.dependencies import BucketDep
from image_classification.primitives import table
service = Service(service_id="image-classification")
@service.endpoint("/", method="GET")
def index():
return FileResponse("image_classification/processors/index.html")
@service.endpoint("/image-upload", method="POST")
def image_upload(image_file: UploadFile, bucket_dep: BucketDep):
blob = bucket_dep.bucket.blob(image_file.filename)
blob.upload_from_file(image_file.file)
def generate_inner_table(row):
inner_df = pd.DataFrame.from_records(row["classifications"])
return inner_df.to_html(classes="table-auto", index=False, border=1).replace(
"\n", ""
)
@service.endpoint("/image-upload-results", method="GET")
def image_upload_results():
bigquery_client = bigquery.Client()
results = bigquery_client.query(f"SELECT * FROM {table.primitive_id()}")
df = results.to_dataframe()
df["classifications"] = df.apply(generate_inner_table, axis=1)
return HTMLResponse(
df.to_html(
classes="m-5 border-spacing-2 border-collapse border border-slate-500 table-auto".split(
" "
),
escape=False,
).replace("\n", "")
)
Add Async Processing
consumer.py
is where we send our images through the model. We do this by attaching the @consumer
decorator to our
classify_image
function.
Remember we attached our service to our flow in main.py
with app.add_consumer
Our consumer function takes in two arguments:
- file_event: GCSFileChangeEvent - this is the input to our consumer that is generate when ever an image is uploaded to our bucket (whenever our endpoint is called).
- model: ModelDep - this is the dependency we defined in dependency.py that loaded the model.
import datetime
import os
import tempfile
from buildflow import consumer
from buildflow.types.gcp import GCSFileChangeEvent
from image_classification.dependencies import ModelDep
from image_classification.primitives import table, file_change_stream
from image_classification.schemas import Classification, ImageClassificationRow
@consumer(source=file_change_stream, sink=table)
def classify_image(
file_event: GCSFileChangeEvent, model: ModelDep
) -> ImageClassificationRow:
with tempfile.TemporaryDirectory() as td:
file_path = os.path.join(td, file_event.file_path)
# Download the bytes to a local file that can be sent through the model
with open(file_path, "wb") as f:
f.write(file_event.blob)
predictions, probabilities = model.prediction.classifyImage(
file_path, result_count=5
)
classifications = []
for predicition, probability in zip(predictions, probabilities):
classifications.append(Classification(predicition, probability))
row = ImageClassificationRow(
image_name=file_event.file_path,
upload=datetime.datetime.utcnow().isoformat(),
classifications=classifications,
)
print(row)
return row
Other Files
Some other files we didn’t mention that are also in the repo:
- settings.py - this loads in a
.env
file for configuring our environment variables - schemas.py - defines our API schemas for request and respones
- requirements.txt - defines our python dependencies
- index.html - a simple HTML page for uploading images
Running The Application
To run this example on resources created on GCP use this github repo.
In order to run this code you will need to have a GCP project created that you have access to.
Follow the Google Cloud Walkthrough to create a new one if you need to.
Clone the GitHub Repo
git clone git@github.com:launchflow/buildflow-gcp-image-classification.git
cd buildflow-gcp-image-classification
Install your requirements
pip install -r requirements.txt
Create a .env file
Rename the template.env
to .env
and update all variables
Create your resources
Create all the resources that are required for your system.
buildflow apply
Make sure you have logged in with GCP with: gcloud auth login --update-adc
This will output every primitive that will be created, and ask you to confirm before creating them:
Primitives to create:
---------------------
├── GCSBucket
│ └── gcs://<GCP_PROJECT_ID>-image-classification
├── BigQueryDataset
│ └── <GCP_PROJECT_ID>.buildflow_image_classification_walkthrough
├── BigQueryTable
│ └── <GCP_PROJECT_ID>.buildflow_image_classification_walkthrough.image_classification
├── GCSFileChangeStream
│ └── <GCP_PROJECT_ID>-image-classification.<GCP_PROJECT_ID>-image-classification_topic
├── GCPPubSubSubscription
│ └── <GCP_PROJECT_ID>/<GCP_PROJECT_ID>-image-classification_subscription
└── GCPPubSubTopic
└── <GCP_PROJECT_ID>/<GCP_PROJECT_ID>-image-classification_topic
Would you like to apply these changes?
Enter "y (yes)" to confirm, "n (no) to reject":
Run your project
Run your project with:
buildflow run
Once running you can visit http://localhost:8000 to see and begin uploading images!
We provide a picture of Caleb’s dog in the repo to help get you started!
What's next?
Now that you have a working image classification app, you can start to customize it to your needs. Such as adding google auth for user authentication or a postgres database for permanent storage. Or even hosting your own model on a private bucket.