Skip to main content

Developing Turbine data apps with Python

warning

This development guide is still a work in progress...

To proceed with the following guide, you must have already gone through the set up steps and initialized your application.

Requirements

The application

When you run the meroxa apps init command, Turbine automatically scaffolds an example data app in an empty Git repository on your local machine.

Navigate to the root of your project and run the ls -a command to list its contents. You should see the following:

├── README.md # Contains an introduction and guidance on building a Turbine app using Python.
├── main.py # The base of your application code. Includes an example data app to get you started.
├── app.json # A configuration file for your data app.
├── __init__.py
└── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your data app locally.
├── demo-cdc.json # A CDC data record sample for the example data app.
└── demo-no-cdc.json # A data record sample for the example data app.

The codebase contains comments that describe each function and its purpose. All that awaits is your creativity demonstrated through code.

/path/to/liveapp/main.py
# Dependencies of the example data app
import hashlib
import typing as t

from turbine import Turbine
from turbine.runtime import Record


def anonymize(records: t.List[Record]) -> t.List[Record]:
updated = []
for record in records:
value_to_update = record.value
hashed_email = hashlib.sha256(
value_to_update["payload"]["after"]["email"].encode()
).hexdigest()
value_to_update["payload"]["after"]["email"] = hashed_email
updated.append(
Record(key=record.key, value=value_to_update, timestamp=record.timestamp)
)
return updated


class App:
@staticmethod
async def run(turbine: Turbine):
# To configure your datastores as resources on the Meroxa Platform
# use the Meroxa Dashboard, CLI, or Meroxa Terraform Provider
# For more details refer to: https://docs.meroxa.com/

# Identify an upstream data store for your data app
# with the `resources` function
# Replace `source_name` with the resource name the
# datastore was configured with on Meroxa
source = await turbine.resources("source_name")

# Specify which upstream records to pull
# with the `records` function
# Replace `collection_name` with a table, collection,
# or bucket name in your data store
records = await source.records("collection_name")

# Specify what code to execute against upstream records
# with the `process` function
# Replace `anonymize` with the name of your function code
anonymized = await turbine.process(records, anonymize)

# Identify a downstream datastore for your data app
# with the `resources` function
# Replace `destination_name` with the resource name the
# datastore was configured with on Meroxa
destination_db = await turbine.resources("destination_name")

# Specify where to write records downstream
# using the `write` function
# Replace `collection_archive` with a table, collection,
# or bucket name in your datastore
await destination_db.write(anonymized, "collection_name")

Managing application data

Configuration

The app.json is a configuration file that Meroxa uses to interpret details about a Turbine data app. Including the name, codebase language, environment, and relevant data stores and their corresponding fixtures.

/path/to/liveapp/app.json
{
"name": "liveapp",
"language": "python",
"environment": "common",
"resources": {
"source_name": "fixtures/demo-cdc.json"
}
}

In this example, the app configuration identifies a resource named source-name that points to the demo-cdc.json fixture. Fixtures are JSON-formatted data record samples used in place of your production data when developing or running A data app locally.

Without changing anything, you can run the command meroxa app run within the root of your data app project and see a result. The records the application is pulling are a result of the data records contained within the demo-cdc.json fixture.

Using resources

To get your production data into your data app, you will need to create Meroxa resources for your datastores. Once you have a set of Meroxa resources, you can now call them within your data app.

To do this, use the resources function within the Turbine section of your application code. In the example data app, it should appear something like this:

source = await turbine.resources("source_name")
# ...
destination_db = await turbine.resources("destination_name")

Replace source_name and destination_name with the names of your Meroxa resources.

warning

Meroxa resources must be created to deploy. If you attempt to deploy without, you will encounter a Deployment failed error in the progressive deploy dialog.

Getting data in and out

Now that you have called your Meroxa resources in your data app, you must direct Turbine on how to use them. For this, you can use the records and write functions.

records

The Records function tells Turbine what records you want to use in your data app. Here you indicate the collection of records by naming them using whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the records function within the Turbine section of your application code. In the example data app, it should appear something like this:

records = await source.records("collection_name")

Replace collection_name with the name of your data collection construct in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

write

The write function tells Turbine where to write the output of your data app. Here you indicate the resource as well as whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the write function within the Turbine section of your application code. In the example data app, it should appear something like this:

await destination_db.write(anonymized, "collection_name")

# You'll notice `anonymized` referenced in this line of code—we'll speak more to this under `Processing data`.

Replace collection_archive with the name of your data collection construct you want written to in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog. If this table does not exist, assuming your credentials have the permissions to create a table, it will automatically be generated in the datastore.

Processing data

Now that you have called your Meroxa resources in your data app and told Turbine how you want to direct that data, you can now decide what sort of code you want to process against that stream of data.

process

The process function tells Turbine what code you want to process against the record or event stream in your data app. This is where you write your custom code to achieve a specific outcome.

To do this, use the process function within the Turbine section of your application code. In the example data app, this comes in two parts:

Part 1: Where you have written the custom code you wish to process against the record or event stream:

def anonymize(records: t.List[Record]) -> t.List[Record]:
updated = []
for record in records:
value_to_update = record.value
hashed_email = hashlib.sha256(
value_to_update["payload"]["after"]["email"].encode()
).hexdigest()
value_to_update["payload"]["after"]["email"] = hashed_email
updated.append(
Record(key=record.key, value=value_to_update, timestamp=record.timestamp)
)
return updated

In the example data app, we are using CDC events. You will see in the example function exports.Anonymize we are looking for record.value.payload.after.email. Turbine will search for an email field in the event payload and apply the hash to that field.

Part 2: Where you apply the process function to the record or event stream:

anonymized = await turbine.process(records, anonymize)

Replace exports.Anonymize with the function you wish to process against the event or data stream.

You can also decide to forgo process and do a direct end-to-end stream of records or events.

Simply eliminate the process code in the Turbine section of your data app, remove the anonymized function code, and change the write code to call records instead of anonymize:

await destination_db.write(records, "collection_name")

Records and Events

All Turbine data app records and events are formatted in JSON and adhere to a common format with some variable differences depending on the datastore.

In the following example, notice an record or event comes in two parts: schema and `payload.

{ "schema": <schema>, payload: <event data> }

Payload

The payload is defined by the resource from where you are pulling events or records. Depending on the configuration of your Meroxa resource and the capabilities of the underlying datastore, you may see variability between a record or an event structure. Events are created as changes are captured through Change Data Capture (CDC)—the payload will include a before and after within the payload depicting what changes took place.

Schema

The schema is defined by the resource from where you are pulling events or records. It is automatically recorded by Meroxa and captures changes over time. Similar to payload, you may see variability between a record or an event structure. Events will show the same before and after characteristics.

{
"field": "before",
"fields": [
{ "field": "id", "optional": false, "type": "int32" },
{ "field": "category", "optional": false, "type": "string" },
{ "field": "product_type", "optional": false, "type": "string" },
{ "field": "product_name", "optional": false, "type": "string" },
{ "field": "stock", "optional": false, "type": "boolean" },
{ "field": "product_id", "optional": false, "type": "int32" },
{ "field": "shipping_address", "optional": false, "type": "string" },
{ "field": "customer_email", "optional": false, "type": "varchar" }
],
"name": "resources.public.collection_name.Value",
"optional": true,
"type": "struct"
}

Look closely, in this scema we see for each payload the type, optional (optionality), and field (name) are captured.

Examples

Two demo fixtures have been automatically generated within /fixtures/ in your example data app to demonstrate both CDC and non-CDC formatted data records.

In most cases, Meroxa will default to the CDC-format as we built our application framework around real-time data. However, you may configure specific resources to leverage polling in some instances (e.g. PostgreSQL) which will result in slightly altered formatting which will need to be accounted for in your code.

Unit testing

Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific. We have provided an example in your example data app. However, you may use any testing framework of your choice.

Run your app locally

You can run your app locally throughout development to ensure your application is outputting the intended result.

To run your app locally, use the meroxa app run command either at the root of your application or by using the --path argument with the local path to your application.

# You can run the app by referencing its local path
$ meroxa app run --path /path/to/liveapp

# Or run the app directly from the app directory
$ meroxa app run

You should receive a result that anonymizes the user record email.

What's next?

Once your data application is running as expected, you're ready to take the next steps and deploy!

Next up: Deploy your data application