Primitives are resources that a processor may read from, write to, or reference. For example a primitive might be a Postgres Database managed by GCP, or a S3 bucket managed by AWS. Primitives can be uses as sources or sinks in your processors, and they can also be managed by your flow. If a primitive is managed it can automatically be created, managed, and destroyed with BuildFlow

BuildFlow provides many primitives out of the box, and you can also create your own custom primitives. All of our primitives can be found in our primitive docs

Primitive Resource Management

A primitive can be marked as managed by attaching it to your flow with the manage method of your flow.

from buildflow import Flow
from buildflow.io.aws import AWSBucket
from buildflow.io.gcp import GCSBucket


app = Flow()
app.manage(AWSBucket("my-bucket"), GCSBucket("my-bucket"))

Now resource creation can be done by running the below command in the root directory of your project.

buildflow apply

And the resources and be destroyed by running:

buildflow destroy

Both of these commands will print out a preview of the resources that will be created or destroyed. If you are happy with the preview you can run type: yes to accept it.

If you would like to simply see a preview you can run buildflow preview

You may notice that often an individual primitive creates and manages multiple resources in order to accomplish reading or writing.

For instance writing to Snowflake takes over 15 resources!

All resource management is powered by Pulumi, ideally you shouldn’t have to interact with pulumi directly but you can always use the pulumi cli directly if you ever run into issues.

Creating Custom Primitives

Implement the Primitive Interface

To create a custom primitive, you can implement the Primitive interface a description of all the methods is found below (bold indicates required).

If your primitive is specific to a cloud provider you can extend GCPPrimitive or AWSPrimitive.

By extending these we will know if you what type of credentials to pass to your primitive (GCP or AWS).

MethodReturn TypeDescription
primitive_idstrA unique ID for your primitive
pulumi_resourcesList[pulumi.Resources]Returns a list of pulumi resources that should be managed
sourceSourceStrategyReturns an implementation of SourceStrategy for reading data.
sinkSinkStrategyReturns an implementation of SinkStrategy for reading data.
cloud_console_urlstrReturns a URL to the cloud console for this resource if applicable.

You only need to implement the methods that your primitive supports. For example, if your primitive only supports reading data you only need to implement the source() method, or if your primitive only supports being managed you only need to implement the pulumi_resources method.

@dataclass
class CustomPrimitive(GCPPrimitive):
    input_field1: str
    input_field2: str

    def sink(self, credentials: CredentialType):
        return CustomSinkStrategy(input_field1=self.input_field1, input_field2=self.input_field2)

    def source(self, credentials: CredentialType):
        return CustomSourceStrategy(input_field1=self.input_field1, input_field2=self.input_field2)

    def pulumi_resources(self, credentials: CredentialType, opts: pulumi.ResourceOptions):
        return [pulumi_aws.s3.BucketV2(
            opts=opts,
            resource_name=self.bucket_name,
            bucket=self.bucket_name,
            force_destroy=self.force_destroy,
        )]

Implement the Strategy Interfaces

Strategies define the actual logic for reading or writing data.

Source Strategy

Source strategies should implement the SourceStrategy interface.

MethodReturn TypeDescription
pullPullResponsepull returns a batch of data from the source.
ackNoneack acknowledges data pulled from the source.
backlogintbacklog returns an integer representing the number of items in the backlog
max_batch_sizeintmax_batch_size returns the max number of items that can be pulled at once.
pull_converterCallable[[Any], Any]pull_converter returns a function that can convert the output of the source into the user defined type
class CustomAckInfo(AckInfo):
    pass

class CustomPrimitiveSource(SourceStrategy):

    async def pull(self) -> PullResponse:
        return PullResponse(
            payload=[1, 2, 3],
            ack_info=CustomAckInfo(),
        )

    async def ack(self, to_ack: AckInfo, success: bool):
        return

    async def backlog(self) -> int:
        return 0

    def max_batch_size(self) -> int:
        return -1

    def pull_converter(self, user_defined_type: Type) -> Callable[[Any], Any]:
        return converters.identify()

Sink Strategy

Sink strategies should implement the SinkStrategy interface.

MethodReturn TypeDescription
pushNonepushes data to the sink
push_converterCallable[[Any], Any]Returns a function that can be used to convert the payload returned by the users pipeline into the data type that is expected by the sink.

class CustomPrimitiveSink(SinkStrategy):
    async def push(self, batch: Batch):
        print("pushing data: ", batch)

    def push_converter(self, user_defined_type: Type) -> Callable[[Any], Any]:
        return converters.identify()

Using Your Custom Primitive

Once complete you can plug your custom primitive directly into the app.pipeline decorator.

@app.pipeline(source=CustomPrimitive(), sink=CustomPrimitive())
def my_pipeline():
    ...