launchflow.gcp.bigquery

BigQueryDataset

1
class BigQueryDataset(GCPResource[BigQueryDatasetConnectionInfo])

A BigQuery Dataset resource.

Example usage:

1
from google.cloud import bigquery
2
import launchflow as lf
3
4
# Automatically configures / deploys a BigQuery Dataset in your GCP project
5
dataset = lf.gcp.BigQueryDataset("my-dataset")
6
7
schema = [
8
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
9
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
10
]
11
table = dataset.create_table("table_name", schema=schema)
12
13
dataset.insert_table_data("table_name", [{"name": "Alice", "age": 30}])
14
15
# You can also use the underlying resource directly
16
# For example, for a table with columns name,age
17
query = f"""
18
SELECT name, age
19
FROM `{dataset.dataset_id}.table_name`
20
WHERE age > 10
21
ORDER BY age DESC
22
"""
23
24
for row in dataset.client().query(query):
25
print(row)

__init__

1
def __init__(name: str, *, location="US") -> None

Create a new BigQuery Dataset resource.

Args:

  • name: The name of the dataset. This must be globally unique.
  • location: The location of the dataset. Defaults to "US".

dataset_id

1
@property
2
def dataset_id() -> str

Get the dataset id.

Returns:

  • The dataset id.

get_table_uuid

1
def get_table_uuid(table_name: str) -> str

Get the table UUID, {project_id}.{dataset_id}.{table_id}.

Args:

  • table_name: The name of the table.

Returns:

  • The table UUID.

client

1
@lru_cache
2
def client() -> "bigquery.Client"

Get the BigQuery Client object.

Returns:

dataset

1
@lru_cache
2
def dataset() -> "bigquery.Dataset"

Get the BigQuery Dataset object.

Returns:

create_table

1
def create_table(table_name: str,
2
*,
3
schema: "Optional[List[bigquery.SchemaField]]" = None
4
) -> "bigquery.Table"

Create a table in the dataset.

Args:

  • schema: The schema of the table. Not required and defaults to None.

Returns:

delete_table

1
def delete_table(table_name: str) -> None

Delete a table from the dataset.

Args:

  • table_name: The name of the table to delete.

load_table_data_from_csv

1
def load_table_data_from_csv(table_name: str, file_path: Path) -> None

Load data from a CSV file into a table.

Args:

  • table_name: The name of the table to load the data into.
  • file_path: The path to the CSV file to load.

insert_table_data

1
def insert_table_data(table_name: str,
2
rows_to_insert: List[Dict[Any, Any]]) -> None

Insert in-memory data into a table. There's seems to be a bug in bigquery where if a table name is re-used (created and then deleted recently), streaming to it won't work. If you encounter an unexpected 404 error, try changing the table name.

Args:

  • table_name: The name of the table to insert the data into.
  • rows_to_insert: The data to insert into the table.

Raises: ValueError if there were errors when inserting the data.