In this guide we will go over how to load and serve a model that is stored in a remote bucket (GCS or S3). We will load the model at server startup and ensure that it gets used across requests. All the code for this guide can be found on GitHub.

To do this we will use:

To see a full working project example with a UI included see our Llama2 AI chat example.

1

Initialize Flow

First we create a new BuildFlow Flow. This will be the main entry point to our application.

app = Flow()
2

Attach a Service

We then attach a service to our flow object. We will later attach endpoints that can be called by the user.

service = app.service(service_id="model-loading")
3

Define our Bucket Primtive

Next we define a primitive to reference the bucket where our model is stored, and create a dependency to open a connection to that bucket.

bucket = S3Bucket(bucket_name="launchflow-llama-models")
ModelBucketDep = BucketDependencyBuilder(bucket)
4

Create our Custom Model Dependency

Now we define our custom dependency to download our model from the bucket and store it locally. This dependency consumes the bucket dependency we created in the previous step to get a connect to the bucket. We create the dependency by annotating our class with @dependency(scope=Scope.REPLICA) this marks the class as a dependency that can be injected into any processor and only one instance of the dependency should be created per replica.

We use asyncio.Lock to ensure our model is only called by one request at a time. This is because the model engine we’re using (llama.cpp) does not support batching.

@dependency(scope=Scope.REPLICA)
class ModelDep:
    def __init__(self, bucket_dep: ModelBucketDep):
        full_path = os.path.abspath("llama-2-7b-chat.Q8_0.gguf")
        if not os.path.exists(full_path):
            logging.info("Downloading model from S3")
            bucket_dep.bucket.download_file("llama-2-7b-chat.Q8_0.gguf", full_path)
        self._llama_model = Llama(full_path, verbose=False)
        self._lock = Lock()

    async def model(self):
        await self._lock.acquire()
        return self._llama_model

    def release(self):
        self._lock.release()
5

Call our Model

Now that we have loaded our model lets call it from an endpoint! We can do this by attaching the @service.endpoint decorator to a function that consumes a string prompt and our ModelDep dependency. We then pass in the string prompt to the model and return the result.

We use a StreamingResponse to allow results to be streamed back to the user as they become available.

@service.endpoint(route="/chat", method="POST")
async def chat(
    prompt: str,
    model_dep: ModelDep,
) -> StreamingResponse:
    llama_chat_messages = [{"role": "user", "content": prompt}]

    model = await model_dep.model()
    try:
        chat_prediction = model.create_chat_completion(llama_chat_messages, stream=True)
    except Exception as e:
        model_dep.release()
        raise e

    def iter_chunks():
        try:
            for chunk in chat_prediction:
                if chunk.get("choices", []):
                    choice = chunk["choices"][0]
                    message_chunk = choice.get("delta", {}).get("content", "")
                    yield message_chunk
        finally:
            model_dep.release()

    return StreamingResponse(iter_chunks())
6

Run the Code

You can run this guide locally with the VS Code extension or by running buildflow run in the projects root directory.

Once running you can navigate to http://localhost:8000 to see the swagger UI and test out the endpoint.