In this guide we’ll introduce you to BuildFlow and BuildFlow concepts by walking through a simple image classification application.

Our application will:

  1. Load our model from local storage in a dependency ensuring the model is ready before we receive image uploads, and it can be shared across requests.
  2. Expose an endpoint that allows users to upload images
  3. When our endpoint receives an image, it will upload the image to a bucket (GCS or S3)
  4. When the image is uploaded it will trigger a notification to our queue (GCP Pub/Sub or AWS SQS)
  5. Our consumer will receive this notification and run the image through an image classification model using ImageAI
  6. The results of the image classification will be stored in a data warehouse (BigQuery) or DuckDB)
  7. The data warehouse will be exposed by another endpoint

If all goes well, you should have a working image classification app like this:

  • GCP

  • AWS

  • Local

All the code for this tutorial can be found on github; we will be walking through this repo step by step.

Define your Project

Your project is defined by the presence of a buildflow.yaml file in the root of your project. This is created when we use the BuildFlow CLI to create a new project, and doesn’t need to be updated unless you need to add something.

Our yaml file defines the following:

  • project: the name of our project; this can be updated to any string containing lower-case letters and hyphens
  • pulumi_config: our pulumi configuration; here we say we have one stack that is stored locally
  • entry_point: defines how our application is run
project: buildflow-gcp-image-classification-walkthrough
pulumi_config:
  pulumi_home: .buildflow/_pulumi
  stacks:
    - name: local
      backend_url: file://.buildflow/_pulumi/local
entry_point: main:app

Initialize your Flow

We define our flow in main.py this is the entry point we defined in our buildflow.yaml file. app = Flow(...) creates our Flow object which is a container for all of our business logic and resources. You’ll notice we call a couple methods on our flow object.

  • manage: this method marks a primitive as being managed. Meaning it can be created, updated, and destroyed by buildflow.
  • add_service: attaches a service to our flow. A service is a container of endpoints, our service will be used for allowing users to upload images to our application.
  • add_consumer: attaches a consumer to our flow. A consumer allows us to asynchronously process image uploads meaning it won’t block the users request!
from buildflow import Flow, FlowOptions

from image_classification import primitives
from image_classification.processors.consumer import classify_image
from image_classification.processors.service import service
from image_classification.settings import env

app = Flow(
    flow_options=FlowOptions(
        gcp_service_account_info=env.gcp_service_account_info, stack=env.env.value
    )
)
app.manage(
    primitives.bucket,
    primitives.dataset,
    primitives.table,
    primitives.file_change_stream,
)
app.add_service(service)
app.add_consumer(classify_image)

Create your Primitives

In primitives.py we define all of the primitives used by our applications. Primitives are resources that a processor may read from, write to, or reference.

You may have noticed these same primitives were passed into app.manage in main.py this means they will be created when you run buildflow run for the first time.

We define 4 primitives here:

  • bucket: a GCS bucket where we will upload user images to.
  • file_change_stream: a GCS File Change Stream which sets up notifications for when a file is uploaded to our bucket. This is what triggers our image upload model to be called.
  • dataset: a BigQuery dataset, this is a container for our BigQuery table.
  • table: a BigQuery table where we will store our image classifications results.
from buildflow.io.gcp import (
    GCSFileChangeStream,
    BigQueryTable,
    GCSBucket,
    BigQueryDataset,
)

from image_classification.settings import env
from image_classification.schemas import ImageClassificationRow

bucket = GCSBucket(
    project_id=env.gcp_project_id,
    bucket_name=f"{env.gcp_project_id}-image-classification",
).options(force_destroy=True)
file_change_stream = GCSFileChangeStream(gcs_bucket=bucket)

dataset = BigQueryDataset(
    project_id=env.gcp_project_id,
    dataset_name="buildflow_image_classification_walkthrough",
)
table = BigQueryTable(dataset=dataset, table_name="image_classification").options(
    schema=ImageClassificationRow
)

Add your Dependencies

In dependencies.py we set up dependencies that will be used by our endpoints and consumer. Dependencies allow us to precompute and share code between processors. In our case we want to load the model when we start our application before any data is processed.

We do this by defining the ModelDep class and add the decorator @dependency(scope=Scope.REPLICA) which annotates the class as a dependency that should be created before a replica starts to process data.

We also define a BucketDep which is a bucket dependency that allows us to connect to the bucket associated with our bucket primitive we defined in primitives.py

import os

from buildflow.dependencies import dependency, Scope
from buildflow.dependencies.bucket import BucketDependencyBuilder
from imageai.Classification import ImageClassification

from image_classification.primitives import bucket


@dependency(scope=Scope.REPLICA)
class ModelDep:
    def __init__(self) -> None:
        self.execution_path = os.path.dirname(os.path.realpath(__file__))
        self.prediction = ImageClassification()
        self.prediction.setModelTypeAsMobileNetV2()
        self.prediction.setModelPath("mobilenet_v2-b0353104.pth")
        self.prediction.loadModel()


BucketDep = BucketDependencyBuilder(bucket)

Add your Service

In service.py we define our actual endpoints that will be called by our users. First we create a service with service = Service(...) then attach endpoints to functions using @service.endpoint(...).

Remember we attached our service to our flow in main.py with app.add_service

We define three endpoints here:

  • index: this endpoint returns a simple HTML page that allows users to upload images.
  • image_upload: this endpoint allows users to upload an image; then we upload them to our remote bucket. It takes in an UploadFile which is a file that has been uploaded by a user. We also take in the bucket_dep dependency we defined earlier. This allows us to upload the file to our bucket using the Google Cloud Storage client library.
  • image_upload_results: this endpoint returns a simple HTML page that displays the results of our image classification model. It does this by querying our BigQuery table and then rendering the results as a table.
from buildflow import Service
from buildflow.requests import UploadFile
from buildflow.responses import FileResponse, HTMLResponse
from google.cloud import bigquery
import pandas as pd

from image_classification.dependencies import BucketDep
from image_classification.primitives import table

service = Service(service_id="image-classification")


@service.endpoint("/", method="GET")
def index():
    return FileResponse("image_classification/processors/index.html")


@service.endpoint("/image-upload", method="POST")
def image_upload(image_file: UploadFile, bucket_dep: BucketDep):
    blob = bucket_dep.bucket.blob(image_file.filename)
    blob.upload_from_file(image_file.file)


def generate_inner_table(row):
    inner_df = pd.DataFrame.from_records(row["classifications"])
    return inner_df.to_html(classes="table-auto", index=False, border=1).replace(
        "\n", ""
    )


@service.endpoint("/image-upload-results", method="GET")
def image_upload_results():
    bigquery_client = bigquery.Client()
    results = bigquery_client.query(f"SELECT * FROM {table.primitive_id()}")
    df = results.to_dataframe()
    df["classifications"] = df.apply(generate_inner_table, axis=1)

    return HTMLResponse(
        df.to_html(
            classes="m-5 border-spacing-2 border-collapse border border-slate-500 table-auto".split(
                " "
            ),
            escape=False,
        ).replace("\n", "")
    )

Add Async Processing

consumer.py is where we send our images through the model. We do this by attaching the @consumer decorator to our classify_image function.

Remember we attached our service to our flow in main.py with app.add_consumer

Our consumer function takes in two arguments:

  • file_event: GCSFileChangeEvent - this is the input to our consumer that is generate when ever an image is uploaded to our bucket (whenever our endpoint is called).
  • model: ModelDep - this is the dependency we defined in dependency.py that loaded the model.
import datetime
import os
import tempfile

from buildflow import consumer
from buildflow.types.gcp import GCSFileChangeEvent

from image_classification.dependencies import ModelDep
from image_classification.primitives import table, file_change_stream
from image_classification.schemas import Classification, ImageClassificationRow


@consumer(source=file_change_stream, sink=table)
def classify_image(
    file_event: GCSFileChangeEvent, model: ModelDep
) -> ImageClassificationRow:
    with tempfile.TemporaryDirectory() as td:
        file_path = os.path.join(td, file_event.file_path)
        # Download the bytes to a local file that can be sent through the model
        with open(file_path, "wb") as f:
            f.write(file_event.blob)
        predictions, probabilities = model.prediction.classifyImage(
            file_path, result_count=5
        )
    classifications = []
    for predicition, probability in zip(predictions, probabilities):
        classifications.append(Classification(predicition, probability))
    row = ImageClassificationRow(
        image_name=file_event.file_path,
        upload=datetime.datetime.utcnow().isoformat(),
        classifications=classifications,
    )
    print(row)
    return row

Other Files

Some other files we didn’t mention that are also in the repo:

  • settings.py - this loads in a .env file for configuring our environment variables
  • schemas.py - defines our API schemas for request and respones
  • requirements.txt - defines our python dependencies
  • index.html - a simple HTML page for uploading images

Running The Application

To run this example on resources created on GCP use this github repo.

In order to run this code you will need to have a GCP project created that you have access to.

Follow the Google Cloud Walkthrough to create a new one if you need to.

1

Clone the GitHub Repo

git clone git@github.com:launchflow/buildflow-gcp-image-classification.git 
cd buildflow-gcp-image-classification
2

Install your requirements

    pip install -r requirements.txt
3

Create a .env file

Rename the template.env to .env and update all variables

4

Create your resources

Create all the resources that are required for your system.

    buildflow apply

Make sure you have logged in with GCP with: gcloud auth login --update-adc

This will output every primitive that will be created, and ask you to confirm before creating them:

    Primitives to create:
    ---------------------
    ├── GCSBucket
    │   └── gcs://<GCP_PROJECT_ID>-image-classification
    ├── BigQueryDataset
    │   └── <GCP_PROJECT_ID>.buildflow_image_classification_walkthrough
    ├── BigQueryTable
    │   └── <GCP_PROJECT_ID>.buildflow_image_classification_walkthrough.image_classification
    ├── GCSFileChangeStream
    │   └── <GCP_PROJECT_ID>-image-classification.<GCP_PROJECT_ID>-image-classification_topic
    ├── GCPPubSubSubscription
    │   └── <GCP_PROJECT_ID>/<GCP_PROJECT_ID>-image-classification_subscription
    └── GCPPubSubTopic
        └── <GCP_PROJECT_ID>/<GCP_PROJECT_ID>-image-classification_topic


    Would you like to apply these changes?
    Enter "y (yes)" to confirm, "n (no) to reject":
5

Run your project

Run your project with:

    buildflow run

Once running you can visit http://localhost:8000 to see and begin uploading images!

We provide a picture of Caleb’s dog in the repo to help get you started!

Dog

6

What's next?

Now that you have a working image classification app, you can start to customize it to your needs. Such as adding google auth for user authentication or a postgres database for permanent storage. Or even hosting your own model on a private bucket.