CloudTasksQueue
A GCP Cloud Tasks Queue.
Like all Resources, this class configures itself across multiple Environments.
For more information see the official documentation.
Example Usage
1import launchflow as lf
2
3# Automatically creates / connects to a Cloud Tasks Queue in your GCP project
4queue = lf.gcp.CloudTasksQueue("my-queue")
5
6queue.enqueue("https://example.com/endpoint", json={"key": "value"})
initialization
Create a new CloudTasksQueue.
Args:
name (str): The name of the Cloud Tasks Queue.location (Optional[str]): The location of the queue. If None, the environments default region is used. Defaults to None.
enqueue
1CloudTasksQueue.enqueue(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"
Enqueue a task in the Cloud Tasks queue.
Args:
url (str): The url the task will call.method (str): The HTTP method to use. Defaults to "POST".headers (Optional[Dict[str, str]]): A dictionary of headers to include in the request.body (Optional[bytes]): The body of the request. Only one ofbodyorjsoncan be provided.json (Optional[Dict]): A dictionary to be serialized as JSON and sent as the body of the request. Only one ofbodyorjsoncan be provided.oauth_token (Optional[str]): An OAuth token to include in the request.oidc_token (Optional[str]): An OIDC token to include in the request.schedule_time (Optional[datetime.datetime]): The time to schedule the task for.
Returns:
tasks.Task: The created GCP cloud task.
Raises:
ImportError: If the google-cloud-tasks library is not installed.ValueError: If bothbodyandjsonare provided.
Example usage:
1import launchflow as lf
2
3queue = lf.gcp.CloudTasksQueue("my-queue")
4
5queue.enqueue("https://example.com/endpoint", json={"key": "value"})
enqueue_async
1async CloudTasksQueue.enqueue_async(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"
Asynchronously enqueue a task in the Cloud Tasks queue.
Args:
url (str): The url the task will call.method (str): The HTTP method to use. Defaults to "POST".headers (Optional[Dict[str, str]]): A dictionary of headers to include in the request.body (Optional[bytes]): The body of the request. Only one ofbodyorjsoncan be provided.json (Optional[Dict]): A dictionary to be serialized as JSON and sent as the body of the request. Only one ofbodyorjsoncan be provided.oauth_token (Optional[str]): An OAuth token to include in the request.oidc_token (Optional[str]): An OIDC token to include in the request.schedule_time (Optional[datetime.datetime]): The time to schedule the task for.
Returns:
tasks.Task: The created GCP cloud task.
Raises:
ImportError: If the google-cloud-tasks library is not installed.ValueError: If bothbodyandjsonare provided.
Example usage:
1import launchflow as lf
2
3queue = lf.gcp.CloudTasksQueue("my-queue")
4
5await queue.enqueue_async("https://example.com/endpoint", json={"key": "value"})

