Primitives are resources that a processor may read from, write to, or reference. For example a primitive might be a Postgres Database managed by GCP, or a S3 bucket managed by AWS. Primitives can be uses as sources or sinks in your processors, and they can also be managed by your flow. If a primitive is managed it can automatically be created, managed, and destroyed with BuildFlow
BuildFlow provides many primitives out of the box, and you can also create your own custom primitives. All of our primitives can be found in our primitive docs
Primitive Resource Management
A primitive can be marked as managed by attaching it to your flow with the
manage method of your flow.
from buildflow import Flow from buildflow.io.aws import AWSBucket from buildflow.io.gcp import GCSBucket app = Flow() app.manage(AWSBucket("my-bucket"), GCSBucket("my-bucket"))
Now resource creation can be done by running the below command in the root directory of your project.
And the resources and be destroyed by running:
Both of these commands will print out a preview of the resources that will be created or destroyed. If you are happy with the preview you can run type:
yes to accept it.
If you would like to simply see a preview you can run
You may notice that often an individual primitive creates and manages multiple resources in order to accomplish reading or writing.
For instance writing to Snowflake takes over 15 resources!
All resource management is powered by Pulumi, ideally you shouldn’t have to interact with pulumi directly but you can always use the pulumi cli directly if you ever run into issues.
Creating Custom Primitives
Implement the Primitive Interface
To create a custom primitive, you can implement the Primitive interface a description of all the methods is found below (bold indicates required).
|primitive_id||str||A unique ID for your primitive|
|pulumi_resources||List[pulumi.Resources]||Returns a list of pulumi resources that should be managed|
|source||SourceStrategy||Returns an implementation of SourceStrategy for reading data.|
|sink||SinkStrategy||Returns an implementation of SinkStrategy for reading data.|
|cloud_console_url||str||Returns a URL to the cloud console for this resource if applicable.|
You only need to implement the methods that your primitive supports. For example, if your primitive only supports reading data you only need to implement the
source() method, or if your primitive only supports being managed you only need to implement the
@dataclass class CustomPrimitive(GCPPrimitive): input_field1: str input_field2: str def sink(self, credentials: CredentialType): return CustomSinkStrategy(input_field1=self.input_field1, input_field2=self.input_field2) def source(self, credentials: CredentialType): return CustomSourceStrategy(input_field1=self.input_field1, input_field2=self.input_field2) def pulumi_resources(self, credentials: CredentialType, opts: pulumi.ResourceOptions): return [pulumi_aws.s3.BucketV2( opts=opts, resource_name=self.bucket_name, bucket=self.bucket_name, force_destroy=self.force_destroy, )]
Implement the Strategy Interfaces
Strategies define the actual logic for reading or writing data.
Source strategies should implement the SourceStrategy interface.
|pull||PullResponse||pull returns a batch of data from the source.|
|ack||None||ack acknowledges data pulled from the source.|
|backlog||int||backlog returns an integer representing the number of items in the backlog|
|max_batch_size||int||max_batch_size returns the max number of items that can be pulled at once.|
|pull_converter||Callable[[Any], Any]||pull_converter returns a function that can convert the output of the source into the user defined type|
class CustomAckInfo(AckInfo): pass class CustomPrimitiveSource(SourceStrategy): async def pull(self) -> PullResponse: return PullResponse( payload=[1, 2, 3], ack_info=CustomAckInfo(), ) async def ack(self, to_ack: AckInfo, success: bool): return async def backlog(self) -> int: return 0 def max_batch_size(self) -> int: return -1 def pull_converter(self, user_defined_type: Type) -> Callable[[Any], Any]: return converters.identify()
Sink strategies should implement the SinkStrategy interface.
|push||None||pushes data to the sink|
|push_converter||Callable[[Any], Any]||Returns a function that can be used to convert the payload returned by the users pipeline into the data type that is expected by the sink.|
class CustomPrimitiveSink(SinkStrategy): async def push(self, batch: Batch): print("pushing data: ", batch) def push_converter(self, user_defined_type: Type) -> Callable[[Any], Any]: return converters.identify()
Using Your Custom Primitive
Once complete you can plug your custom primitive directly into the
@app.pipeline(source=CustomPrimitive(), sink=CustomPrimitive()) def my_pipeline(): ...