launchflow.fastapi
sqlalchemy_depends
1def sqlalchemy_depends(engine: Engine,
2 autocommit: bool = False,
3 autoflush: bool = False,
4 expire_on_commit: bool = False)
Returns a dependency that returns a SQLAlchemy Session for use in FastAPI.
Args:
engine
: A SQLAlchemy engine to use for creating the session.autocommit
: Whether to autocommit the session after a commit.autoflush
: Whether to autoflush the session after a commit.expire_on_commit
: Whether to expire all instances after a commit.
Example usage:
1import launchflow as lf
2
3from fastapi import FastAPI, Depends
4from sqlalchemy import text
5from sqlalchemy.orm import Session
6
7# Create the Postgres database instance
8postgres = lf.gcp.CloudSQLPostgres("my-pg-db")
9
10# Create the SQLAlchemy engine (connection pool)
11engine = postgres.sqlalchemy_engine()
12
13# Create the FastAPI dependency
14session = lf.fastapi.sqlalchemy_depends(engine)
15
16app = FastAPI()
17
18@app.get("/")
19def read_root(db: Session = Depends(session)):
20 # Use the session to query the database
21 return db.execute(text("SELECT 1")).scalar_one_or_none()
sqlalchemy_async_depends
1def sqlalchemy_async_depends(autocommit: bool = False,
2 autoflush: bool = False,
3 expire_on_commit: bool = False)
Returns a dependency that returns a SQLAlchemy AsyncSession for use in FastAPI.
Args:
engine
: A SQLAlchemy engine to use for creating the session.autocommit
: Whether to autocommit the session after a commit.autoflush
: Whether to autoflush the session after a commit.expire_on_commit
: Whether to expire all instances after a commit.
Example usage:
1from contextlib import asynccontextmanager
2
3import launchflow as lf
4
5from fastapi import FastAPI, Depends
6from sqlalchemy import text
7from sqlalchemy.ext.asyncio import AsyncSession
8
9# Create the Postgres database instance
10postgres = lf.gcp.CloudSQLPostgres("my-pg-db")
11
12# Create the FastAPI dependency
13async_session = lf.fastapi.sqlalchemy_async_depends()
14
15
16@asynccontextmanager
17async def lifespan(app: FastAPI):
18 # Create the SQLAlchemy async engine (connection pool)
19 engine = await postgres.sqlalchemy_async_engine()
20 # Setup the async session dependency
21 async_session.setup(engine)
22 yield
23
24app = FastAPI(lifespan=lifespan)
25
26@app.get("/")
27async def query_postgres(db: AsyncSession = Depends(async_session)):
28 # Use the async session to query the database
29 return await db.execute(text("SELECT 1")).scalar_one_or_none()