CloudTasksQueue

A GCP Cloud Tasks Queue.

Like all Resources, this class configures itself across multiple Environments.

For more information see the official documentation.

Example Usage

1
import launchflow as lf
2
3
# Automatically creates / connects to a Cloud Tasks Queue in your GCP project
4
queue = lf.gcp.CloudTasksQueue("my-queue")
5
6
queue.enqueue("https://example.com/endpoint", json={"key": "value"})

initialization

Create a new CloudTasksQueue.

Args:

  • name (str): The name of the Cloud Tasks Queue.
  • location (Optional[str]): The location of the queue. If None, the environments default region is used. Defaults to None.

enqueue

1
CloudTasksQueue.enqueue(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"

Enqueue a task in the Cloud Tasks queue.

Args:

  • url (str): The url the task will call.
  • method (str): The HTTP method to use. Defaults to "POST".
  • headers (Optional[Dict[str, str]]): A dictionary of headers to include in the request.
  • body (Optional[bytes]): The body of the request. Only one of body or json can be provided.
  • json (Optional[Dict]): A dictionary to be serialized as JSON and sent as the body of the request. Only one of body or json can be provided.
  • oauth_token (Optional[str]): An OAuth token to include in the request.
  • oidc_token (Optional[str]): An OIDC token to include in the request.
  • schedule_time (Optional[datetime.datetime]): The time to schedule the task for.

Returns:

Raises:

  • ImportError: If the google-cloud-tasks library is not installed.
  • ValueError: If both body and json are provided.

Example usage:

1
import launchflow as lf
2
3
queue = lf.gcp.CloudTasksQueue("my-queue")
4
5
queue.enqueue("https://example.com/endpoint", json={"key": "value"})

enqueue_async

1
async CloudTasksQueue.enqueue_async(url: str, method: str = "POST", headers: Optional[Dict[str, str]] = None, body: Optional[bytes] = None, json: Optional[Dict] = None, oauth_token: Optional[str] = None, oidc_token: Optional[str] = None, schedule_time: Optional[datetime.datetime] = None) -> "tasks.Task"

Asynchronously enqueue a task in the Cloud Tasks queue.

Args:

  • url (str): The url the task will call.
  • method (str): The HTTP method to use. Defaults to "POST".
  • headers (Optional[Dict[str, str]]): A dictionary of headers to include in the request.
  • body (Optional[bytes]): The body of the request. Only one of body or json can be provided.
  • json (Optional[Dict]): A dictionary to be serialized as JSON and sent as the body of the request. Only one of body or json can be provided.
  • oauth_token (Optional[str]): An OAuth token to include in the request.
  • oidc_token (Optional[str]): An OIDC token to include in the request.
  • schedule_time (Optional[datetime.datetime]): The time to schedule the task for.

Returns:

Raises:

  • ImportError: If the google-cloud-tasks library is not installed.
  • ValueError: If both body and json are provided.

Example usage:

1
import launchflow as lf
2
3
queue = lf.gcp.CloudTasksQueue("my-queue")
4
5
await queue.enqueue_async("https://example.com/endpoint", json={"key": "value"})