SQS
SQSQueue is a sink and source primitive that can be used to read and write messages to an SQS queue. To create a SQSQueue provide:
Arg | Description |
---|---|
queue_name | The name of the queue |
aws_region | The region the queue exists in |
aws_account_id | The account id the queue exists in |
Example sink usage:
from buildflow.io.aws import SQSQueue
queue = SQSQueue(
queue_name="queue_name",
aws_region="us-east-1",
aws_account_id="1234567890"
)
app.manage(queue)
@app.consumer(source=..., sink=queue)
Example source usage:
from buildflow.io.aws import SQSQueue
queue = SQSQueue(
queue_name="queue_name",
aws_region="us-east-1",
aws_account_id="1234567890"
)
app.manage(queue)
@app.consumer(sink=queue, source=...)
:::
Types
The SQSQueue sink expects an object that can be serialized into a string. You can return a dataclass
and we will automatically serialize it to JSON, or you can return a string object directly.
If you have a custom type you can implement the to_string
method to return the bytes you want to send to SQS.
class CustomType:
def __init__(self, data: str):
self.data = str
def to_string(self):
return self.data
@app.pipeline(source=..., sink=SQSQueue(...))
async def my_processor(elem: str) -> CustomType:
return CustomType(data=elem)
The SQSSource
source returns a string
object by default. However you can also provide us a type that you would like us to deserialize the string into. If you provide a dataclass we will automatically deserialize the string into a JSON object and then create your dataclass. If you provide a custom type you can implement the from_string
method to deserialize the string into your type.
class CustomType:
def __init__(self, data: str):
self.decoded_data = str
@classmethod
def from_string(cls, data: str):
return cls(data)
@app.pipeline(source=SQSQueue(...), sink=...)
async def my_processor(elem: CustomType) -> CustomType:
return elem
Resource Creation
If you are using BuildFlow’s built in resource creation/management you can use the SQSQueue primitive to create a queue in your AWS account.