Primitives
Primitives are resources that a processor may read from, write to, or reference. For example a primitive might be a Postgres Database managed by GCP, or a S3 bucket managed by AWS. Primitives can be uses as sources or sinks in your processors, and they can also be managed by your flow. If a primitive is managed it can automatically be created, managed, and destroyed with BuildFlow
BuildFlow provides many primitives out of the box, and you can also create your own custom primitives. All of our primitives can be found in our primitive docs
Primitive Resource Management
A primitive can be marked as managed by attaching it to your flow with the manage
method of your flow.
from buildflow import Flow
from buildflow.io.aws import AWSBucket
from buildflow.io.gcp import GCSBucket
app = Flow()
app.manage(AWSBucket("my-bucket"), GCSBucket("my-bucket"))
Now resource creation can be done by running the below command in the root directory of your project.
buildflow apply
And the resources and be destroyed by running:
buildflow destroy
Both of these commands will print out a preview of the resources that will be created or destroyed. If you are happy with the preview you can run type: yes
to accept it.
If you would like to simply see a preview you can run buildflow preview
You may notice that often an individual primitive creates and manages multiple resources in order to accomplish reading or writing.
For instance writing to Snowflake takes over 15 resources!
All resource management is powered by Pulumi, ideally you shouldn’t have to interact with pulumi directly but you can always use the pulumi cli directly if you ever run into issues.
Creating Custom Primitives
Implement the Primitive Interface
To create a custom primitive, you can implement the Primitive interface a description of all the methods is found below (bold indicates required).
If your primitive is specific to a cloud provider you can extend GCPPrimitive or AWSPrimitive.
By extending these we will know if you what type of credentials to pass to your primitive (GCP or AWS).
Method | Return Type | Description | |
---|---|---|---|
primitive_id | str | A unique ID for your primitive | |
pulumi_resources | List[pulumi.Resources] | Returns a list of pulumi resources that should be managed | |
source | SourceStrategy | Returns an implementation of SourceStrategy for reading data. | |
sink | SinkStrategy | Returns an implementation of SinkStrategy for reading data. | |
cloud_console_url | str | Returns a URL to the cloud console for this resource if applicable. |
You only need to implement the methods that your primitive supports. For example, if your primitive only supports reading data you only need to implement the source()
method, or if your primitive only supports being managed you only need to implement the pulumi_resources
method.
@dataclass
class CustomPrimitive(GCPPrimitive):
input_field1: str
input_field2: str
def sink(self, credentials: CredentialType):
return CustomSinkStrategy(input_field1=self.input_field1, input_field2=self.input_field2)
def source(self, credentials: CredentialType):
return CustomSourceStrategy(input_field1=self.input_field1, input_field2=self.input_field2)
def pulumi_resources(self, credentials: CredentialType, opts: pulumi.ResourceOptions):
return [pulumi_aws.s3.BucketV2(
opts=opts,
resource_name=self.bucket_name,
bucket=self.bucket_name,
force_destroy=self.force_destroy,
)]
Implement the Strategy Interfaces
Strategies define the actual logic for reading or writing data.
Source Strategy
Source strategies should implement the SourceStrategy interface.
Method | Return Type | Description | |
---|---|---|---|
pull | PullResponse | pull returns a batch of data from the source. | |
ack | None | ack acknowledges data pulled from the source. | |
backlog | int | backlog returns an integer representing the number of items in the backlog | |
max_batch_size | int | max_batch_size returns the max number of items that can be pulled at once. | |
pull_converter | Callable[[Any], Any] | pull_converter returns a function that can convert the output of the source into the user defined type |
class CustomAckInfo(AckInfo):
pass
class CustomPrimitiveSource(SourceStrategy):
async def pull(self) -> PullResponse:
return PullResponse(
payload=[1, 2, 3],
ack_info=CustomAckInfo(),
)
async def ack(self, to_ack: AckInfo, success: bool):
return
async def backlog(self) -> int:
return 0
def max_batch_size(self) -> int:
return -1
def pull_converter(self, user_defined_type: Type) -> Callable[[Any], Any]:
return converters.identify()
Sink Strategy
Sink strategies should implement the SinkStrategy interface.
Method | Return Type | Description |
---|---|---|
push | None | pushes data to the sink |
push_converter | Callable[[Any], Any] | Returns a function that can be used to convert the payload returned by the users pipeline into the data type that is expected by the sink. |
class CustomPrimitiveSink(SinkStrategy):
async def push(self, batch: Batch):
print("pushing data: ", batch)
def push_converter(self, user_defined_type: Type) -> Callable[[Any], Any]:
return converters.identify()
Using Your Custom Primitive
Once complete you can plug your custom primitive directly into the app.pipeline
decorator.
@app.pipeline(source=CustomPrimitive(), sink=CustomPrimitive())
def my_pipeline():
...