Skip to main content

TurbineJs App Overview

Take a closer look at the contents of the TurbineJs streaming app project directory as well as learn how to configure your streaming app to work with your production data on the Meroxa Platform.

App Project Files

Navigate to the root of the app project using cd /your/local/path/liveapp. Then, use tree to list the app project folders and files.

$ tree

The output you see will be exhaustive. Here is a shortened list of what should be included in the app project folder, along with a description.

├── .git # Contains information tracking changes and commits made to your app project.
├── .gitignore # Tells Git which files or folders to ignore in your app project.
├── README.md # Contains an introduction and guidance on building a streaming app using TurbineJs.
├── app.json # A configuration file for your streaming app.
├── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your streaming app locally.
│   ├── demo-cdc.json # A JSON-formatted CDC data record for the example streaming app.
│   └── demo-no-cdc.json # A JSON-formatted non-CDC data record for the example streaming app.
├── index.js # The core of your streaming app code. Example code is written to get your started.
├── index.test.js # Unit test example that tests a function within the example code.
├── node_modules # A subdirectory containing your Node.js libraries.
├── package-lock.json # Manages the exact versions of every package installed. 
└── package.json # Manages the dependencies, scripts, versions, etc for your streaming app.

index.js

This is the core of your streaming app. Self-documented TurbineJs boilerplate code is already written to help you get started at /your/local/path/liveapp/index.js.

// Import any of your relevant dependencies
const stringHash = require("string-hash");

// Sample helper that you can use in your app
function iAmHelping(str) {
  return `~~~${str}~~~`;
}

// Create function to guard check for unexpected record schemas
function isAttributePresent(attr) {
  return typeof attr !== "undefined" && attr !== null;
}

exports.App = class App {
  // Create a custom named function on the App to be applied to your records
  anonymize(records) {
    records.forEach((record) => {
      let payload = record.value.payload;

      // If CDC formatted records / `demo-cdc.json` fixture in use `payload.after.customer_email`
      // If non-CDC formatted records / `demo-no-cdc.json` fixture in use `payload.customer_email`
      // Remember to reflect changes in the `app.json` configuration
      if (
        isAttributePresent(payload.after) &&
        isAttributePresent(payload.after.customer_email)
      ) {
        payload.after.customer_email = iAmHelping(
          stringHash(payload.after.customer_email).toString()
        );
      }
    });

    return records;
  }

  async run(turbine) {
    // To configure resources for your production data on Meroxa, 
    // You can use the Dashboard, CLI, or Terraform Provider.

    // For more details refer to our documentation on supported
    // Sources (upstream datastore) https://docs.meroxa.com/sources
    // Destinations (downstream datastore) https://docs.meroxa.com/destinations

    // Identify the upstream datastore with the `resources` function
    // Replace `source_name` with the resource name configured on Meroxa
    let source = await turbine.resources("source_name");

    // Specify which `source` records to pull with the `records` function
    // Replace `collection_name` with whatever data organisation method
    // is relevant to the datastore (e.g., table, bucket, collection, etc.)
    let records = await source.records("collection_name");

    // Specify the code to execute against `records` with the `process` function
    // Replace `Anonymize` with the function
    let anonymized = await turbine.process(records, this.anonymize);

    // Identify the downstream datastore with the `resources` function
    // Replace `source_name` with the resource name configured on Meroxa
    let destination = await turbine.resources("destination_name");

    // Specify where to write records to your `destination` using the `write` function
    // Replace `collection_archive` with whatever data organisation method
    // is relevant to the datastore (e.g., table, bucket, collection, etc.)
    await destination.write(anonymized, "collection_archive");
  }
};

TurbineJs Methods

The following are the methods available for use in TurbineJs code.

Using Resources

Once you have successfully created Resources for your streaming app on the Meroxa Platform, you may use the resources method to call them in your TurbineJs code.

In the following code example, the source_name or destination_name are the names you’ve defined for your Resources on the Meroxa Platform. e.g. We recommend something descriptive and recognizable like pgproduction or dwh.

let source = await turbine.resources("source_name");
// ...
let destination = await turbine.resources("destination_name");

A limit of one Source Resource is required for your streaming app. However, you may enrich data records using any number of APIs through secrets. A Destination Resource is both not required or you may have multiple for your streaming app.

Streaming Data

To get data streaming into your app, use the records method. This is how you identify the records or events you want to stream in your TurbineJs code.

This can only be used once you have configured a Source Resource on the Meroxa Platform and have identified it in your TurbineJs code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the records method. In this example case, it is source.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc).

For example: Let’s say you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

let records = await source.records("collection_name");

You may optionally use the write method to tell Turbine where to write the output of your streaming app.

Similar to records, this can only be used once you have configured a Destination Resource on the Meroxa Platform and have identified it in your TurbineJs code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the write method. In this example case, it is destination.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc). In the case of multiple Destination Resources you will need to do this for each.

For example: Let’s say you have a bucket named logs in Amazon S3. This is where you’d define the bucket name that will serve as your destination.

await destination.write(anonymized, "logs");

In some cases, special configurations for certain Resources may be necessary to achieve certain outcomes. This can be done by including the following argument with the configuration and value associated with the desired setting.

let records = await destination.records("logs", {"configuration_name":"setting_value"});

Processing Data

To start processing the data you have streaming through your application you will need to write your custom code you want to process against the stream of data.

The process method determines how to process the records coming in to your streaming app against a function.

First, you will need to write your the custom code you would like to execute against the stream. In the following example code, we are using CDC events. You will see in the example app’s index.js file we have defined a function Anonymize that looks for payload.after.customer_email. Turbine will search for an customer_email field in the event payload and apply a hash to that field.

anonymize(records) {
    records.forEach((record) => {
      let payload = record.value.payload;

      // If CDC formatted records / `demo-cdc.json` fixture in use `payload.after.customer_email`
      // If non-CDC formatted records / `demo-no-cdc.json` fixture in use `payload.customer_email`
      // Remember to reflect changes in the `app.json` configuration
      if (
        isAttributePresent(payload.after) &&
        isAttributePresent(payload.after.customer_email)
      ) {
        payload.after.customer_email = iAmHelping(
          stringHash(payload.after.customer_email).toString()
        );
      }
    });

    return records;
  }

Next, you will use the process method, calling the name of the stream from the Source Resource by referencing the variable name you have given it. In the example case: records.

let anonymized = await turbine.process(records, this.anonymize);

Once your streaming app is deployed on the Meroxa Platform, Meroxa will do the work to take each record or event that is streamed to your app and run your code against it. The Meroxa Platform will scale out the processing of records relative to the velocity of records streaming in.

Replace this.anonymize with the function you wish to process against the event or data stream.

You may also forgo process and do a direct end-to-end stream of records or events.

Simply eliminate the process method in the code and change the write code to call records instead of anonymized.

await destination.write(records, "collection_archive");

Managing Secrets

The following example depicts how you would manage secrets used to authenticate with external systems you may be tapping into to retrieve data for streaming and processing through an API or integration.

You will need to create environment variables and reference them in the TurbineJs code by passing an additional parameter into the process method used.

let anonymized = await turbine.process(records, this.anonymize, { SECRET_KEY: process.env.SECRET_KEY });

app.json

This file serves as a configuration file, used by the Meroxa CLI and Platform to attain information about your streaming app. Upon initialization of a streaming app, the CLI will scaffold the file for you with available options:

{
  "name": "liveapp",
  "language": "javascript",
  "environment": "common",
  "resources": {
    "source_name": "fixtures/path"
  }
}

Name — The name of your application. It is set upon initializing your streaming app and should not change.

Language — The language your streaming app will be written in. It is set upon initializing your streaming app and should not change.

Environmentcommon is the only available environment for streaming apps. The Meroxa Platform does have the ability to create isolated environments but this feature is currently in private beta.

Resources — These are the named systems and databases that you'll use in your streaming app. The source_name needs to match the name of the Source Resource that you create on Meroxa Platform running the meroxa resources create command with the Meroxa CLI or Dashboard. You can point to the path in the fixtures that'll be used to mock the Resource when you run meroxa apps run.

The source_name string should map to a Resource name in your app.json to map the Resource variable tied to the .records method to a fixture file in your /fixtures/.

Unit Testing

Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific. We have provided an example in your TurbineJs boilerplate code. However, you may use any testing framework of your choice.

We have provided an example unit test in your TurbineJs boilerplate code called index.test.js. This test can be run with QUnit running the npm run test command in the root of your TurbineJs streaming app project directory. You may use any testing framework of your choice, such as Jest, Mocha, or others, and replace what we have already provided in your initialized app with your favorite testing tool.

To add more test files, follow the file naming convention as follows <name-of-test>.test.js file to grow your test suite as you build, modify and scale your app.