CloudTasksQueue
A GCP Cloud Tasks Queue.
Like all Resources, this class configures itself across multiple Environments.
For more information see the official documentation.
Example Usage
1import launchflow as lf
2
3# Automatically creates / connects to a Cloud Tasks Queue in your GCP project
4queue = lf.gcp.CloudTasksQueue("my-queue")
5
6queue.enqueue("https://example.com/endpoint", json={"key": "value"})
initialization
Create a new CloudTasksQueue.
Args:
name (str)
: The name of the Cloud Tasks Queue.location (Optional[str])
: The location of the queue. If None, the environments default region is used. Defaults to None.
enqueue
1CloudTasksQueue.enqueue(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"
Enqueue a task in the Cloud Tasks queue.
Args:
url (str)
: The url the task will call.method (str)
: The HTTP method to use. Defaults to "POST".headers (Optional[Dict[str, str]])
: A dictionary of headers to include in the request.body (Optional[bytes])
: The body of the request. Only one ofbody
orjson
can be provided.json (Optional[Dict])
: A dictionary to be serialized as JSON and sent as the body of the request. Only one ofbody
orjson
can be provided.oauth_token (Optional[str])
: An OAuth token to include in the request.oidc_token (Optional[str])
: An OIDC token to include in the request.schedule_time (Optional[datetime.datetime])
: The time to schedule the task for.
Returns:
tasks.Task
: The created GCP cloud task.
Raises:
ImportError
: If the google-cloud-tasks library is not installed.ValueError
: If bothbody
andjson
are provided.
Example usage:
1import launchflow as lf
2
3queue = lf.gcp.CloudTasksQueue("my-queue")
4
5queue.enqueue("https://example.com/endpoint", json={"key": "value"})
enqueue_async
1async CloudTasksQueue.enqueue_async(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"
Asynchronously enqueue a task in the Cloud Tasks queue.
Args:
url (str)
: The url the task will call.method (str)
: The HTTP method to use. Defaults to "POST".headers (Optional[Dict[str, str]])
: A dictionary of headers to include in the request.body (Optional[bytes])
: The body of the request. Only one ofbody
orjson
can be provided.json (Optional[Dict])
: A dictionary to be serialized as JSON and sent as the body of the request. Only one ofbody
orjson
can be provided.oauth_token (Optional[str])
: An OAuth token to include in the request.oidc_token (Optional[str])
: An OIDC token to include in the request.schedule_time (Optional[datetime.datetime])
: The time to schedule the task for.
Returns:
tasks.Task
: The created GCP cloud task.
Raises:
ImportError
: If the google-cloud-tasks library is not installed.ValueError
: If bothbody
andjson
are provided.
Example usage:
1import launchflow as lf
2
3queue = lf.gcp.CloudTasksQueue("my-queue")
4
5await queue.enqueue_async("https://example.com/endpoint", json={"key": "value"})