Skip to main content

Developing Turbine data apps with Go

To proceed with the following guide, you must have already gone through the set up steps and initialized your application.

Requirements

The application

When you run the meroxa apps init liveapp --lang go command, Turbine automatically scaffolds an example data app in an empty Git repository on your local machine.

Navigate to the root of your project and run the tree command to list its contents. You should see the following:

├── README.md # Contains an introduction and guidance on building a Turbine app using Go.
├── app.go # The base of your application code. Includes an example data app to get you started.
├── app_test.go # Unit test example that tests a function within the example data app.
├── app.json # A configuration file for your data app.
├── go.mod # Manages your data app dependencies, scripts, versions, etc.
├── go.sum # Lists direct and indirect dependencies along with the version.
└── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your data app locally.
├── demo-cdc.json # A CDC data record sample for the example data app.
└── demo-no-cdc.json # A data record sample for the example data app.

Note: If your $GOPATH isn't set to where you are working, you can manually initialize your Go modules by running go mod init liveapp in the root of your project directory.

The codebase contains comments that describe each function and its purpose. All that awaits is your creativity demonstrated through code.

/path/to/liveapp/app.go
package main

import (
// Dependencies of the example data app
"crypto/md5"
"encoding/hex"
"fmt"
"log"

// Dependencies of Turbine
"github.com/meroxa/turbine-go"
"github.com/meroxa/turbine-go/runner"
)

func main() {
runner.Start(App{})
}

var _ turbine.App = (*App)(nil)

type App struct{}

func (a App) Run(v turbine.Turbine) error {
// To configure your data stores as resources on the Meroxa Platform
// use the Meroxa Dashboard, CLI, or Meroxa Terraform Provider
// For more details refer to: https://docs.meroxa.com/

// Identify an upstream data store for your data app
// with the `Resources` function
// Replace `source_name` with the resource name the
// data store was configured with on Meroxa
source, err := v.Resources("source_name")
if err != nil {
return err
}

// Specify which upstream records to pull
// with the `Records` function
// Replace `collection_name` with a table, collection,
// or bucket name in your data store
// If your upstream resource requires additional configuration
rr, err := source.Records("collection_name", map[string]interface{"incrementing.field.name":"id"})
if err != nil {
return err
}

// Specify what code to execute against upstream records
// with the `Process` function
// Replace `Anonymize` with the name of your function code
res, _ := v.Process(rr, Anonymize{})

// Identify a downstream data store for your data app
// with the `Resources` function
// Replace `destination_name` with the resource name the
// data store was configured with on Meroxa
dest, err := v.Resources("destination_name")
if err != nil {
return err
}

// Specify where to write records downstream
// using the `Write` function
// Replace `collection_archive` with a table, collection,
// or bucket name in your data store
err = dest.Write(res, "collection_archive", nil)
if err != nil {
return err
}

return nil
}

type Anonymize struct{}

func (f Anonymize) Process(stream []turbine.Record) ([]turbine.Record, []turbine.RecordWithError) {
for i, r := range stream {
e := fmt.Sprintf("%s", r.Payload.Get("customer_email"))
if e == "" {
log.Println("unable to find customer_email value in %d record", i)
break
}
hashedEmail := consistentHash(e)
err := r.Payload.Set("customer_email", hashedEmail)
if err != nil {
log.Println("error setting value: ", err)
break
}
stream[i] = r
}
return stream, nil
}

func consistentHash(s string) string {
h := md5.Sum([]byte(s))
return hex.EncodeToString(h[:])
}

Managing application data

Configuration

The app.json is a configuration file that Meroxa uses to interpret details about a Turbine data app. Including the name, codebase language, environment, and relevant data stores and their corresponding fixtures.

/path/to/liveapp/app.json
{
"name": "liveapp",
"language": "go",
"environment": "common",
"resources": {
"source_name": "fixtures/demo-cdc.json"
}
}

In this example, the app configuration identifies a resource named source-name that points to the demo-cdc.json fixture. Fixtures are JSON-formatted data record samples used in place of your production data when developing or running A data app locally.

Without changing anything, you can run the command meroxa app run within the root of your data app project and see a result. The records the application is pulling are a result of the data records contained within the demo-cdc.json fixture.

Using resources

To get your production data into your data app, you will need to create Meroxa resources for your datastores. Once you have a set of Meroxa resources, you can now call them within your data app.

To do this, use the Resources function within the Turbine section of your application code. In the example data app, it should appear something like this:

  source, err := v.Resources("source_name")
// ...
destination, err := v.Resources("destination_name")

Replace source_name and destination_name with the names of your Meroxa resources.

warning

Meroxa resources must be created to deploy. If you attempt to deploy without, you will encounter a Deployment failed error in the progressive deploy dialog.

Getting data in and out

Now that you have called your Meroxa resources in your data app, you must direct Turbine on how to use them. For this, you can use the Records and Write functions.

Records

The Records function tells Turbine what records you want to use in your data app. Here you indicate the collection of records by naming them using whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the records function within the Turbine section of your application code. In the example data app, it should appear something like this:

  records, err := source.Records("collection_name", nil)
if err != nil {
return err
}

Replace collection_name with the name of your data collection construct in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

Write

The Write function tells Turbine where to write the output of your data app. Here you indicate the resource as well as whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).

To do this, use the Write function within the Turbine section of your application code. In the example data app, it should appear something like this:

  err = destination.Write(anonymized, "collection_archive", nil)
if err != nil {
return err
}

return nil
}

# You'll notice `anonymized` referenced in this line of code—we'll speak more to this under `Processing data`.

Replace collection_archive with the name of your data collection construct you want written to in your datastore. Let's say, you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog. If this table does not exist, assuming your credentials have the permissions to create a table, it will automatically be generated in the datastore.

Processing data

Now that you have called your Meroxa resources in your data app and told Turbine how you want to direct that data, you can now decide what sort of code you want to run against that stream of data.

Process

The Process function provides Turbine with the code you want to run against the record or event stream in your data app. This is where you write your custom code to achieve a specific outcome.

To do this, use the Process function within the Turbine section of your application code. In the example data app, this comes in two parts:

Part 1: Where you have written the custom code you wish to run against the record or event stream:

type Anonymize struct{}

func (f Anonymize) Process(stream []turbine.Record) ([]turbine.Record, []turbine.RecordWithError) {
for i, r := range stream {
e := fmt.Sprintf("%s", r.Payload.Get("customer_email"))
if e == "" {
log.Println("unable to find customer_email value in %d record", i)
break
}
hashedEmail := consistentHash(e)
err := r.Payload.Set("customer_email", hashedEmail)
if err != nil {
log.Println("error setting value: ", err)
break
}
stream[i] = r
}
return stream, nil
}

func consistentHash(s string) string {
h := md5.Sum([]byte(s))
return hex.EncodeToString(h[:])
}

In the example data app, we are using CDC events. You will see in the example function Process on the Anonymize struct we are looking for record.value.payload.after.email. Turbine will search for an email field in the event payload and apply the hash to that field.

Part 2: Where you apply the Process function to the record or event stream:

  anonymized, _ := v.Process(records, Anonymize{})

Rename Anonymize as is appropriate and replace the contents with the code you wish to run against the event or data stream.

You can also decide to forgo Process and do a direct end-to-end stream of records or events.

You can accomplish this by removing the call to Process in your data app, remove the Anonymize struct, and change the write code to use records as a parameter instead of anonymized:

  err = destination.Write(records, "collection_archive", nil)
if err != nil {
return err
}

return nil
}

Records and Events

All Turbine data app records and events are formatted in JSON and adhere to a common format with some variable differences depending on the datastore.

In the following example, notice an record or event comes in two parts: schema and `payload.

{ "schema": <schema>, payload: <event data> }

Payload

The payload is defined by the resource from where you are pulling events or records. Depending on the configuration of your Meroxa resource and the capabilities of the underlying datastore, you may see variability between a record or an event structure. Events are created as changes are captured through Change Data Capture (CDC)—the payload will include a before and after within the payload depicting what changes took place.

Schema

The schema is defined by the resource from where you are pulling events or records. It is automatically recorded by Meroxa and captures changes over time. Similar to payload, you may see variability between a record or an event structure. Events will show the same before and after characteristics.

{
"field": "before",
"fields": [
{ "field": "id", "optional": false, "type": "int32" },
{ "field": "category", "optional": false, "type": "string" },
{ "field": "product_type", "optional": false, "type": "string" },
{ "field": "product_name", "optional": false, "type": "string" },
{ "field": "stock", "optional": false, "type": "boolean" },
{ "field": "product_id", "optional": false, "type": "int32" },
{ "field": "shipping_address", "optional": false, "type": "string" },
{ "field": "customer_email", "optional": false, "type": "varchar" }
],
"name": "resources.public.collection_name.Value",
"optional": true,
"type": "struct"
}

Look closely, in this schema we see for each payload the type, optional (optionality), and field (name) are captured.

Examples

Two demo fixtures have been automatically generated within /fixtures/ in your example data app to demonstrate both CDC and non-CDC formatted data records.

In most cases, Meroxa will default to the CDC-format as we built our application framework around real-time data. However, you may configure specific resources to leverage polling in some instances (e.g. PostgreSQL) which will result in slightly altered formatting which will need to be accounted for in your code.

Unit testing

Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific. We have provided an example in your example data app. However, you may use any testing framework of your choice.

Run your app locally

You can run your app locally throughout development to ensure your application is outputting the intended result.

To run your app locally, use the meroxa app run command either at the root of your application or by using the --path argument with the local path to your application. Your app will run against the sample json data specified in app.json.

# You can run the app by referencing its local path
$ meroxa app run --path /path/to/liveapp

# Or run the app directly from the app directory
$ meroxa app run

You should receive a result that anonymizes the user record email.

What's next?

Once your data application is running as expected, you're ready to take the next steps and deploy!

Next up: Deploy your data application