Skip to main content

TurbinePy App Overview

Take a closer look at the contents of the TurbinePy streaming app project directory as well as learn how to configure your streaming app to work with your production data on the Meroxa Platform.

App Project Files

Navigate to the root of the app project using cd /your/local/path/liveapp. Then, use tree to list the app project folders and files.

$ tree

The output you see will be exhaustive. Here is a shortened list of what should be included in the app project folder, along with a description.

├── .git # Contains information tracking changes and commits made to your app project.
├── .gitignore # Tells Git which files or folders to ignore in your app project.
├── README.md # Contains an introduction and guidance on building a streaming app using TurbinePy.
├── app.json # A configuration file for your streaming app.
├── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your streaming app locally.
│   ├── demo-cdc.json # A JSON-formatted CDC data record for the example streaming app.
│   └── demo-no-cdc.json # A JSON-formatted non-CDC data record for the example streaming app.
├── main.py # The core of your streaming app code. Example code is written to get your started.
└── __init__.py # Allows us to treat the TurbinePy app as a package we can import and execute.

main.py

This is the core of your streaming app. Self-documented TurbinePy boilerplate code is already written to help you get started at /your/local/path/liveapp/main.py.

# Dependencies of the example data app
import hashlib
import logging
import sys
import typing as t

from turbine.runtime import Record, Runtime

logging.basicConfig(level=logging.INFO)


def anonymize(records: t.List[Record]) -> t.List[Record]:
    logging.info(f"processing {len(records)} record(s)")
    for record in records:
        logging.info(f"input: {record}")
        try:
            payload = record.value["payload"]

            # Hash the email
            payload["email"] = hashlib.sha256(
                payload["email"].encode("utf-8")
            ).hexdigest()

            logging.info(f"output: {record}")
        except Exception as e:
            print("Error occurred while parsing records: " + str(e))
            logging.info(f"output: {record}")
    return records


class App:
    @staticmethod
    async def run(turbine: Turbine):
        # To configure your datastores as resources on the Meroxa Platform
        # use the Meroxa Dashboard, CLI, or Meroxa Terraform Provider
        # For more details refer to: https://docs.meroxa.com/

        # Identify an upstream datastore for your data app
        # with the `resources` function
        # Replace `source_name` with the resource name the
        # datastore was configured with on the Meroxa platform.
        source = await turbine.resources("source_name")

        # Specify which upstream records to pull
        # with the `records` function
        # Replace `collection_name` with a table, collection,
        # or bucket name in your datastore.
        # If you need additional connector configurations, replace '{}'
        # with the key and value, i.e. {"incrementing.field.name": "id"}
        records = await source.records("collection_name")

        # Specify which secrets in environment variables should be passed
        # into the Process.
        # Replace 'PWD' with the name of the environment variable.
        turbine.register_secrets("PWD")

        # Specify what code to execute against upstream records
        # with the `process` function
        # Replace `anonymize` with the name of your function code.
        anonymized = await turbine.process(records, anonymize)

        # Identify a downstream datastore for your data app
        # with the `resources` function
        # Replace `destination_name` with the resource name the
        # datastore was configured with on the Meroxa platform.
        destination_db = await turbine.resources("destination_name")

        # Specify where to write records downstream
        # using the `write` function
        # Replace `collection_archive` with a table, collection,
        # or bucket name in your datastore.
        # If you need additional connector configurations, replace '{}'
        # with the key and value, i.e. {"behavior.on.null.values": "ignore"}
        await destination_db.write(anonymized, "collection_name")

TurbinePy Methods

The following are the methods available for use in TurbinePy code.

Using Resources

Once you have successfully created Resources for your streaming app on the Meroxa Platform, you may use the resources method to call them in your TurbinePy code.

In the following code example, the source_name or destination_name are the names you’ve defined for your Resources on the Meroxa Platform. e.g. We recommend something descriptive and recognizable like pgproduction or dwh.

source = await turbine.resources("source_name")
# ...
destination_db = await turbine.resources("destination_name")

A limit of one Source Resource is required for your streaming app. However, you may enrich data records using any number of APIs through secrets. A Destination Resource is both not required or you may have multiple for your streaming app.

Streaming Data

To get data streaming into your app, use the records method. This is how you identify the records or events you want to stream in your TurbinePy code.

This can only be used once you have configured a Source Resource on the Meroxa Platform and have identified it in your TurbinePy code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the records method. In this example case, it is source.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc).

For example: Let’s say you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

records = await source.records("collection_name")

You may optionally use the write method to tell Turbine where to write the output of your streaming app.

Similar to records, this can only be used once you have configured a Destination Resource on the Meroxa Platform and have identified it in your TurbinePy code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the write method. In this example case, it is destination.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc). In the case of multiple Destination Resources you will need to do this for each.

For example: Let’s say you have a bucket named logs in Amazon S3. This is where you’d define the bucket name that will serve as your destination.

await destination_db.write(anonymized, "collection_name")

# You'll notice `anonymized` referenced in this line of code—we'll speak more to this under `Processing data`.

In some cases, special configurations for certain Resources may be necessary to achieve certain outcomes. This can be done by including the following argument with the configuration and value associated with the desired setting.

await destination_db.write(anonymized, "collection_name", {"configuration_name":"setting_value"})

Processing Data

To start processing the data you have streaming through your application you will need to write your custom code you want to process against the stream of data.

The process method determines how to process the records coming in to your streaming app against a function.

First, you will need to write your the custom code you would like to execute against the stream. In the following example code, we are using CDC events. You will see in the example app’s main.py file we have defined a function Anonymize that looks for payload.after.customer_email. Turbine will search for an customer_email field in the event payload and apply a hash to that field.

def anonymize(records: t.List[Record]) -> t.List[Record]:
    logging.info(f"processing {len(records)} record(s)")
    for record in records:
        logging.info(f"input: {record}")
        try:
            payload = record.value["payload"]

            # Hash the email
            payload["email"] = hashlib.sha256(
                payload["email"].encode("utf-8")
            ).hexdigest()

            logging.info(f"output: {record}")
        except Exception as e:
            print("Error occurred while parsing records: " + str(e))
            logging.info(f"output: {record}")
    return records

Next, you will use the process method, calling the name of the stream from the Source Resource by referencing the variable name you have given it. In the example case: records.

anonymized = await turbine.process(records, anonymize)

Once your streaming app is deployed on the Meroxa Platform, Meroxa will do the work to take each record or event that is streamed to your app and run your code against it. The Meroxa Platform will scale out the processing of records relative to the velocity of records streaming in.

Replace this.anonymize with the function you wish to process against the event or data stream.

You may also forgo process and do a direct end-to-end stream of records or events.

Simply eliminate the process method in the code and change the write code to call records instead of anonymized.

await destination_db.write(records, "collection_name")

Managing Secrets

The following example depicts how you would manage secrets used to authenticate with external systems you may be tapping into to retrieve data for streaming and processing through an API or integration.

You will need to create environment variables and reference them in the TurbinePy code by passing an additional parameter into the process method used.

# Register API Secret
turbine.register_secrets("PWD")

# Deploy function with source as input
enriched = await turbine.process(records, enrich_data)

app.json

This file serves as a configuration file, used by the Meroxa CLI and Platform to attain information about your streaming app. Upon initialization of a streaming app, the CLI will scaffold the file for you with available options:

{
  "name": "liveapp",
  "language": "python",
  "environment": "common",
  "resources": {
    "source_name": "fixtures/path"
  }
}

Name — The name of your application. It is set upon initializing your streaming app and should not change.

Language — The language your streaming app will be written in. It is set upon initializing your streaming app and should not change.

Environmentcommon is the only available environment for streaming apps. The Meroxa Platform does have the ability to create isolated environments but this feature is currently in private beta.

Resources — These are the named systems and databases that you'll use in your streaming app. The source_name needs to match the name of the Source Resource that you create on Meroxa Platform running the meroxa resources create command with the Meroxa CLI or Dashboard. You can point to the path in the fixtures that'll be used to mock the Resource when you run meroxa apps run.

The source_name string should map to a Resource name in your app.json to map the Resource variable tied to the .records method to a fixture file in your /fixtures/.

Unit Testing

Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific, you may use any testing framework of your choice.