Developing Turbine data apps with Go
To proceed with the following guide, you must have already gone through the set up steps and initialized your application.
Requirements
The application
When you run the meroxa apps init liveapp --lang go
command, Turbine automatically scaffolds an example data app in an empty Git repository on your local machine.
Navigate to the root of your project and run the tree
command to list its contents. You should see the following:
├── README.md # Contains an introduction and guidance on building a Turbine app using Go.
├── app.go # The base of your application code. Includes an example data app to get you started.
├── app_test.go # Unit test example that tests a function within the example data app.
├── app.json # A configuration file for running your data app locally.
├── go.mod # Manages your data app dependencies, scripts, versions, etc.
├── go.sum # Lists direct and indirect dependencies along with the version.
└── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your data app locally.
├── demo-cdc.json # A Change Data Capture (CDC) data record sample for the example data app.
└── demo-no-cdc.json # A data record sample for the example data app.
Note: If your $GOPATH isn't set to where you are working, you can manually initialize your Go modules by running go mod init liveapp
in the root of your project directory.
The codebase contains comments that describe each function and its purpose. All that awaits is your creativity demonstrated through code.
package main
import (
// Dependencies of the example data app
"crypto/md5"
"encoding/hex"
"fmt"
"log"
// Dependencies of Turbine
"github.com/meroxa/turbine-go"
"github.com/meroxa/turbine-go/runner"
)
func main() {
runner.Start(App{})
}
var _ turbine.App = (*App)(nil)
type App struct{}
func (a App) Run(v turbine.Turbine) error {
// To configure your data stores as resources on the Meroxa Platform
// use the Meroxa Dashboard, CLI, or Meroxa Terraform Provider.
// For more details refer to: https://docs.meroxa.com/
//
// Identify an upstream data store for your data app
// with the `Resources` function
// Replace `source_name` with the resource name the
// data store was configured with on Meroxa.
source, err := v.Resources("source_name")
if err != nil {
return err
}
// Specify which upstream records to pull
// with the `Records` function
// Replace `collection_name` with a table, collection,
// or bucket name in your data store.
// If a configuration is needed for your source,
// you can pass it as a second argument to the `Records` function. For example:
//
// source.Records("collection_name", turbine.ConnectionOptions{turbine.ResourceConfig{Field: "incrementing.field.name", Value:"id"}})
records, err := source.Records("collection_name", nil)
if err != nil {
return err
}
// Specify what code to execute against upstream records
// with the `Process` function
// Replace `Anonymize` with the name of your function code.
anonymizedRecords := v.Process(records, Anonymize{})
// Identify a downstream data store for your data app
// with the `Resources` function
// Replace `destination_name` with the resource name the
// data store was configured with on Meroxa.
dest, err := v.Resources("destination_name")
if err != nil {
return err
}
// Specify where to write records downstream
// using the `Write` function
// Replace `collection_archive` with a table, collection,
// or bucket name in your data store.
// If a configuration is needed, you can also use i.e.
//
// dest.WriteWithConfig(
// anonymizedRecords,
// "my-archive",
// turbine.ConnectionOptions{turbine.ResourceConfig{Field: "buffer.flush.time", Value: "10"}}
// )
err = dest.Write(anonymizedRecords, "collection_archive")
if err != nil {
return err
}
return nil
}
type Anonymize struct{}
func (f Anonymize) Process(stream []turbine.Record) []turbine.Record {
for i, record := range stream {
email := fmt.Sprintf("%s", record.Payload.Get("after.customer_email"))
if email == "" {
log.Printf("unable to find customer_email value in record %d\n", i)
break
}
hashedEmail := consistentHash(email)
err := record.Payload.Set("after.customer_email", hashedEmail)
if err != nil {
log.Println("error setting value: ", err)
continue
}
stream[i] = record
}
return stream
}
func consistentHash(s string) string {
h := md5.Sum([]byte(s))
return hex.EncodeToString(h[:])
}
Managing application data
Local Configuration
The app.json
is a configuration file that Meroxa uses to interpret details about a Turbine data app while developing or running locally. Configuration options include the name, codebase language, environment, and relevant datastores and their corresponding fixtures.
{
"name": "liveapp",
"language": "go",
"environment": "common",
"resources": {
"source_name": "fixtures/demo-cdc.json"
}
}
In this example, the app configuration identifies a resource named source_name
that points to the demo-cdc.json
fixture. Fixtures are JSON-formatted data record samples used in place of your production data when developing or running A data app locally.
Run your app locally
Without changing anything, you can run the data app project and see a result. The records the application is pulling are a result of the data records contained within the demo-cdc.json
fixture.
To run your app locally, use the meroxa app run
command either at the root of your application or by using the --path
argument with the local path to your application. Your app will run against the sample json data specified in app.json
.
# You can run the app by referencing its local path
$ meroxa app run --path /path/to/liveapp
# Or run the app directly from the app directory
$ meroxa app run
You should receive a result that anonymizes the user record email.
Using resources
To get your production data into your data app, you will need to create Meroxa resources for your datastores. Once you have a set of Meroxa resources, you can now call them within your data app.
To do this, use the Resources
function within the Turbine section of your application code. In the example data app, it should appear something like this:
source, err := v.Resources("source_name")
// ...
dest, err := v.Resources("destination_name")
Replace source_name
and destination_name
with the names of your Meroxa resources.
note
Accessing Meroxa resources while running a data app locally is not yet available
warning
Meroxa resources must be created to deploy. If you attempt to deploy without, you will encounter a Deployment failed
error in the progressive deploy dialog.
Getting data in and out
Now that you have called your Meroxa resources in your data app, you must direct Turbine on how to use them. For this, you can use the Records
and Write
functions.
Records
The Records
function tells Turbine what records you want to use in your data app. Here you indicate the collection of records by naming them using whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).
To do this, use the Records
function within the Turbine section of your application code. In the example data app, it should appear something like this:
records, err := source.Records("collection_name", nil)
if err != nil {
return err
}
Replace collection_name
with the name of your data collection construct in your datastore. Let's say, you have a table named Users
in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users
or Users
for MySQL using binlog.
Write
The Write
function tells Turbine where to write the output of your data app. Here you indicate the resource as well as whatever data collection construct applies to the datastore (e.g., table, collection, bucket, index, etc.).
To do this, use the Write
function within the Turbine section of your application code. In the example data app, it should appear something like this:
err = dest.Write(anonymizedRecords, "collection_archive")
if err != nil {
return err
}
# You will notice `anonymizedRecords` referenced in this line of code—we will speak more to this under `Processing data`.
Replace collection_archive
with the name of your data collection construct you want written to in your datastore. Let's say, you have a table named Users
in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users
or Users
for MySQL using binlog. If this table does not exist, assuming your credentials have the permissions to create a table, it will automatically be generated in the datastore.
Processing data
Now that you have called your Meroxa resources in your data app and told Turbine how you want to direct that data, you can now decide what sort of code you want to run against that stream of data.
Process
The Process
function provides Turbine with the code you want to run against the record or event stream in your data app. This is where you write your custom code to achieve a specific outcome.
To do this, use the Process
function within the Turbine section of your application code. In the example data app, this comes in two parts:
Part 1: Where you have written the custom code you wish to run against the record or event stream:
type Anonymize struct{}
func (f Anonymize) Process(stream []turbine.Record) []turbine.Record {
for i, record := range stream {
email := fmt.Sprintf("%s", record.Payload.Get("after.customer_email"))
if email == "" {
log.Printf("unable to find customer_email value in record %d\n", i)
break
}
hashedEmail := consistentHash(email)
err := record.Payload.Set("after.customer_email", hashedEmail)
if err != nil {
log.Println("error setting value: ", err)
continue
}
stream[i] = record
}
return stream
}
func consistentHash(s string) string {
h := md5.Sum([]byte(s))
return hex.EncodeToString(h[:])
}
In the example data app, we are using CDC events. You will see in the example function Process
on the Anonymize struct we are looking for record.payload.after.customer_email
. Turbine will search for an customer_email
field in the event payload and apply the hash to that field.
Part 2: Where you apply the Process
function to the record or event stream:
anonymizedRecords := v.Process(records, Anonymize{})
Rename Anonymize as is appropriate and replace the contents with the code you wish to run against the event or data stream.
Direct data stream
You can also decide to forgo Process
and do a direct end-to-end stream of records or events.
You can accomplish this by removing the call to Process
in your data app, remove the Anonymize
struct, and change the write
code to use records
as a parameter instead of anonymizedRecords
:
err = destination.Write(records, "collection_archive")
if err != nil {
return err
}
Records and Events
All Turbine data app records and events are formatted in JSON and adhere to a common format with some variable differences depending on the datastore.
In the following example, notice an record or event comes in two parts: schema
and payload
.
{ "schema": <schema>, payload: <event data> }
Payload
The payload
is defined by the resource from where you are pulling events or records. Depending on the configuration of your Meroxa resource and the capabilities of the underlying datastore, you may see variability between a record or an event structure. Events are created as changes are captured through Change Data Capture (CDC)—the payload will include a before
and after
within the payload depicting what changes took place.
Schema
The schema
is defined by the resource from where you are pulling events or records. It is automatically recorded by Meroxa and captures changes over time. Similar to payload
, you may see variability between a record or an event structure. Events will show the same before
and after
characteristics.
{
"field": "before",
"fields": [
{ "field": "id", "optional": false, "type": "int32" },
{ "field": "category", "optional": false, "type": "string" },
{ "field": "product_type", "optional": false, "type": "string" },
{ "field": "product_name", "optional": false, "type": "string" },
{ "field": "stock", "optional": false, "type": "boolean" },
{ "field": "product_id", "optional": false, "type": "int32" },
{ "field": "shipping_address", "optional": false, "type": "string" },
{ "field": "customer_email", "optional": false, "type": "varchar" }
],
"name": "resources.public.collection_name.Value",
"optional": true,
"type": "struct"
}
Look closely, in this schema
we see for each payload
the type
, optional
(optionality), and field
(name) are captured.
Examples
Two demo fixtures have been automatically generated within /fixtures/
in your example data app to demonstrate both CDC and non-CDC formatted data records.
In most cases, Meroxa will default to the CDC-format as we built our application framework around real-time data. However, you may configure specific resources to leverage polling in some instances (e.g. PostgreSQL) which will result in slightly altered formatting which will need to be accounted for in your code.
Unit testing
Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.
Unit testing is language-specific. We have provided an example in your example data app. However, you may use any testing framework of your choice.
What's next?
Once your data application is running as expected, you're ready to take the next steps and deploy!
Next up: Deploy your data application