Skip to main content

Developing Turbine data apps with Python

To proceed with the following guide, you must have already gone through the set up steps.

Requirements

The application

When you run the meroxa apps init liveapp --lang py command, Turbine automatically scaffolds an example data app in an empty Git repository on your local machine.

Navigate to the root of your project and run the tree command to list its contents. You should see the following:

├── README.md # Contains an introduction and guidance on building a Turbine app using Python.
├── main.py # The base of your application code. Includes an example data app to get you started.
├── app.json # A configuration file for running your data app locally.
├── __init__.py
└── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your data app locally.
├── demo-cdc.json # A Change Data Capture (CDC) data record sample for the example data app.
└── demo-no-cdc.json # A data record sample for the example data app.

The codebase contains comments that describe each function and its purpose. All that awaits is your creativity demonstrated through code.

/path/to/liveapp/main.py
# Dependencies of the example data app
import hashlib
import logging
import sys
import typing as t

from turbine.runtime import Record, Runtime

logging.basicConfig(level=logging.INFO)


def anonymize(records: t.List[Record]) -> t.List[Record]:
logging.info(f"processing {len(records)} record(s)")
for record in records:
logging.info(f"input: {record}")
try:
payload = record.value["payload"]

# Hash the email
payload["email"] = hashlib.sha256(
payload["email"].encode("utf-8")
).hexdigest()

logging.info(f"output: {record}")
except Exception as e:
print("Error occurred while parsing records: " + str(e))
logging.info(f"output: {record}")
return records


class App:
@staticmethod
async def run(turbine: Turbine):
# To configure your datastores as resources on the Meroxa Platform
# use the Meroxa Dashboard, CLI, or Meroxa Terraform Provider
# For more details refer to: https://docs.meroxa.com/

# Identify an upstream datastore for your data app
# with the `resources` function
# Replace `source_name` with the resource name the
# datastore was configured with on the Meroxa platform.
source = await turbine.resources("source_name")

# Specify which upstream records to pull
# with the `records` function
# Replace `collection_name` with a table, collection,
# or bucket name in your datastore.
# If you need additional connector configurations, replace '{}'
# with the key and value, i.e. {"incrementing.field.name": "id"}
records = await source.records("collection_name")

# Specify which secrets in environment variables should be passed
# into the Process.
# Replace 'PWD' with the name of the environment variable.
turbine.register_secrets("PWD")

# Specify what code to execute against upstream records
# with the `process` function
# Replace `anonymize` with the name of your function code.
anonymized = await turbine.process(records, anonymize)

# Identify a downstream datastore for your data app
# with the `resources` function
# Replace `destination_name` with the resource name the
# datastore was configured with on the Meroxa platform.
destination_db = await turbine.resources("destination_name")

# Specify where to write records downstream
# using the `write` function
# Replace `collection_archive` with a table, collection,
# or bucket name in your datastore.
# If you need additional connector configurations, replace '{}'
# with the key and value, i.e. {"behavior.on.null.values": "ignore"}
await destination_db.write(anonymized, "collection_name")

Managing application data

Local Configuration

The app.json is a configuration file that Meroxa uses to interpret details about a Turbine data app while developing or running locally. Configuration options include the name, codebase language, environment, and relevant datastores and their corresponding fixtures.

/path/to/liveapp/app.json
{
"name": "liveapp",
"language": "python",
"environment": "common",
"resources": {
"source_name": "fixtures/demo-cdc.json"
}
}

In this example, the app configuration identifies a resource named source_name that points to the demo-cdc.json fixture. Fixtures are JSON-formatted data record samples used in place of your production data when developing or running a data app locally.

Run your app locally

Without changing anything, you can run the data app project and see a result. The records the application is pulling are a result of the data records contained within the demo-cdc.json fixture.

To run your app locally, use the meroxa app run command either at the root of your application or by using the --path argument with the local path to your application. Your app will run against the sample json data specified in app.json.

# You can run the app by referencing its local path
$ meroxa app run --path /path/to/liveapp

# Or run the app directly from the app directory
$ meroxa app run

You should receive a result that anonymizes the user record email.

Using resources

To get your production data into your data app, you will need to create Meroxa resources for your datastores. Once you have a set of Meroxa resources, you can now call them within your data app.

To do this, use the resources function within the Turbine section of your application code. In the example data app, it should appear something like this:

source = await turbine.resources("source_name")
# ...
destination_db = await turbine.resources("destination_name")

Replace source_name and destination_name with the names of your Meroxa resources.

note

Accessing Meroxa resources while running a data app locally is not yet available

warning

Meroxa resources must be created to deploy. If you attempt to deploy without, you will encounter a Deployment failed error in the progressive deploy dialog.

Getting data in and out

Now that you have called your Meroxa resources in your data app, you must direct Turbine on how to use them. For this, you can use the records and write functions.

records

The Records function tells Turbine what records you want to use in your data app. Here you indicate the collection of records by naming them using whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the records function within the Turbine section of your application code. In the example data app, it should appear something like this:

records = await source.records("collection_name")

Replace collection_name with the name of your data collection construct in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

write

The write function tells Turbine where to write the output of your data app. Here you indicate the resource as well as whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the write function within the Turbine section of your application code. In the example data app, it should appear something like this:

await destination_db.write(anonymized, "collection_name")

# You'll notice `anonymized` referenced in this line of code—we'll speak more to this under `Processing data`.

Replace collection_archive with the name of your data collection construct you want written to in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog. If this table does not exist, assuming your credentials have the permissions to create a table, it will automatically be generated in the datastore.

Processing data

Now that you have called your Meroxa resources in your data app and told Turbine how you want to direct that data, you can now decide what sort of code you want to process against that stream of data.

process

The process function provides Turbine with the code you want to run against the record or event stream in your data app. This is where you write your custom code to achieve a specific outcome.

To do this, use the process function within the Turbine section of your application code. In the example data app, this comes in two parts:

Part 1: Where you have written the custom code you wish to run against the record or event stream:

def anonymize(records: t.List[Record]) -> t.List[Record]:
logging.info(f"processing {len(records)} record(s)")
for record in records:
logging.info(f"input: {record}")
try:
payload = record.value["payload"]

# Hash the email
payload["email"] = hashlib.sha256(
payload["email"].encode("utf-8")
).hexdigest()

logging.info(f"output: {record}")
except Exception as e:
print("Error occurred while parsing records: " + str(e))
logging.info(f"output: {record}")
return records

In the example data app, we are using CDC events. You will see in the example function anonymize we are looking for email. Turbine will search for an email field in the event payload and apply the hash to that field.

Part 2: Where you apply the process function to the record or event stream:

anonymized = await turbine.process(records, anonymize)

Replace anonymize with the function you wish to process against the event or data stream.

Direct data stream

You can also decide to forgo process and do a direct end-to-end stream of records or events.

Simply eliminate the process code in the Turbine section of your data app, remove the anonymize function code, and change the write code to call records instead of anonymize:

await destination_db.write(records, "collection_name")

Records and Events

All Turbine data app records and events are formatted in JSON and adhere to a common format with some variable differences depending on the datastore.

In the following example, notice an record or event comes in two parts: schema and payload.

{ "schema": <schema>, "payload": <event data> }

Payload

The payload is defined by the resource from where you are pulling events or records. Depending on the configuration of your Meroxa resource and the capabilities of the underlying datastore, you may see variability between a record or an event structure. Events are created as changes are captured through Change Data Capture (CDC)—the payload will include a before and after within the payload depicting what changes took place.

{
"payload": {
"after": {
"action": "logged in",
"email": "[email protected]",
"id": 4,
"user_id": 101
},
"before": null,
"op": "r",
"source": {
"connector": "postgresql",
"db": "databae",
"lsn": 537810437656,
"name": "resource-7109-171736",
"schema": "public",
"snapshot": "true",
"table": "user_activity",
"ts_ms": 1644362356743,
"txId": 8680,
"version": "1.2.5.Final",
"xmin": null
},
"transaction": null,
"ts_ms": 1644362356743
}
}

Schema

The schema is defined by the resource from where you are pulling events or records. It is automatically recorded by Meroxa and captures changes over time. Similar to payload, you may see variability between a record or an event structure. Events will show the same before and after characteristics.

{
"schema": {
"fields": [
{
"field": "before",
"fields": [
{ "field": "id", "optional": false, "type": "int32" },
{ "field": "user_id", "optional": true, "type": "int32" },
{ "field": "email", "optional": true, "type": "string" },
{ "field": "action", "optional": true, "type": "string" }
],
"name": "resource_7109_171736.public.user_activity.Value",
"optional": true,
"type": "struct"
},
{
"field": "after",
"fields": [
{ "field": "id", "optional": false, "type": "int32" },
{ "field": "user_id", "optional": true, "type": "int32" },
{ "field": "email", "optional": true, "type": "string" },
{ "field": "action", "optional": true, "type": "string" }
],
"name": "resource_7109_171736.public.user_activity.Value",
"optional": true,
"type": "struct"
},
],
"name": "resource_7109_171736.public.user_activity.Envelope",
"optional": false,
"type": "struct"
}
}


Look closely, in this `schema` we see for each `payload` the `type`, `optional` (optionality), and `field` (name) are captured.

### Examples
Two demo fixtures, `demo-cdc.json` and `demo-no-cdc.json` have been automatically generated within `/fixtures/` in your example data app to demonstrate both CDC and non-CDC formatted data records.

In most cases, Meroxa will default to the CDC-format as we built our application framework around real-time data. However, you may configure specific resources to leverage polling in some instances (e.g. PostgreSQL) which will result in slightly altered formatting which will need to be accounted for in your code.

## Unit testing
Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific. We have provided an example in your example data app. However, you may use any testing framework of your choice.

## What's next?
Once your data application is running as expected, you're ready to take the next steps and deploy!

**Next up:** [Deploy your data application](/turbine/deployment)