Dataclass Payloads

All of our Sources and Sinks support automatically parsing JSON payloads into a user defined dataclass. That means if you are reading from a source or writing to a sink you can always specify your type hint as a dataclass and we will know how to read and convert to your type.

@dataclass
class InputType:
    a: int

@dataclass
class OutputType:
    b: int

@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(elem: InputType) -> OutputType:
    return(OutputType(b=elem.a + 1))

If you do not provide a type hint you for the input you will receive the raw data from the source. For more info on what this is see the Primitives section for the specific source and sink you are using.

What if a dataclass isn’t enough?

If you need any custom serialization or deserialization you can provide a custom type as well. All you need to do is have your custom type provide a method that tell us how to serialize or deserialize the type.

The method you need to implement will be dependent on the resource you are using. For example if you are using GCPPubSubSubscription as a source you can add a from_bytes class method to have automatic deserialization. If you are using BigQueryTable as a sink you can add a to_json instance method to have automatic serialization.

See the Primitives section for your specific source and sink for more info.

Example:


class CustomType:
    def __init__(self, decoded_data: str):
        self.decoded_data = str

    @classmethod
    def from_bytes(cls, data: bytes):
        return cls(data.decode())

    def to_json(self):
        return {"decoded_data": self.decoded_data}

@app.consumer(source=Primitive(...), sink=Primitive(...))
async def my_consumer(elem: CustomType) -> CustomType:
    return elem

Multi-Row Output

When writing to a sink each element you return will result in a row being written to your sink. This means you can return multiple elements and BuildFlow will flatten in to multiple writes to your sink. To accomplish this you can either return a List or Tuple and BuildFlow will flatten each element into it’s own row.

@app.collector(route="/", method="POST", sink=Primitive(...))
def my_collector(message):
    return [output(message), output1(message), output2(message)]

You can also use yields to return multiple elements that should be written to your sink.

@app.collector(route="/", method="POST", ink=Primitive(...))
def my_collector(message):
    if message.startswith("hello"):
        yield "world"
    if message.endswith("world"):
        yield "!"

If you want your row to be a list you can wrap it in a Tuple or List to avoid flattening.

@app.collector(route="/", method="POST", sink=Primitive(...))
def my_collector(message):
    return (
        # Row 1
        [
            output(message),
            output1(message),
            output2(message),
        ],
        # Row 2
        [
            output(message),
            output1(message),
            output2(message),
        ],
    )