Skip to main content

TurbineRb App Overview

Take a closer look at the contents of the TurbineRb streaming app project directory as well as learn how to configure your streaming app to work with your production data on the Meroxa Platform.

App Project Files

Navigate to the root of the app project using cd /your/local/path/liveapp. Then, use tree to list the app project folders and files.

$ tree

The output you see will be exhaustive. Here is a shortened list of what should be included in the app project folder, along with a description.

├── Gemfile # Describes gem dependencies required to run a Ruby program.
├── app.json # A configuration file for your data app. 
├── app.rb # The base of your application code. Includes an example data app to get you started. 
└── fixtures # Fixtures are JSON-formatted data records you can develop against and run with your data app locally.
   └── demo.json # A data record sample for the example data app.

app.rb

This is the core of your streaming app. Self-documented TurbineRb boilerplate code is already written to help you get started at /your/local/path/liveapp/app.rb.

# frozen_string_literal: true

require "rubygems"
require "bundler/setup"
require "turbine_rb"

class MyApp
  def call(app)
    # To configure resources for your production datastores
    # on Meroxa, use the Dashboard, CLI, or Terraform Provider
    # For more details refer to: http://docs.meroxa.com/
    #
    # Identify the upstream datastore with the `resource` function
    # Replace `demopg` with the resource name configured on Meroxa
    database = app.resource(name: "demopg")

    # Specify which upstream records to pull
    # with the `records` function
    # Replace `collection_name` with a table, collection,
    # or bucket name in your data store.
    # If a configuration is needed for your source,
    # you can pass it as a second argument to the `records` function. For example:
    # database.records(collection: "collection_name", configs: {"incrementing.column.name" => "id"})
    records = database.records(collection: "collection_name")

    # Register secrets to be available in the function:
    # app.register_secrets("MY_ENV_TEST")

    # Register several secrets at once:
    # app.register_secrets(["MY_ENV_TEST", "MY_OTHER_ENV_TEST"])

    # Specify the code to execute against `records` with the `process` function.
    # Replace `Passthrough` with your desired function.
    # Ensure desired function matches `Passthrough`'s' function signature.
    processed_records = app.process(records: records, process: Passthrough.new)

    # Specify where to write records using the `write` function.
    # Replace `collection_archive` with whatever data organisation method
    # is relevant to the datastore (e.g., table, bucket, collection, etc.)
    # If additional connector configs are needed, provided another argument. For example:
    # database.write(
    #   records: processed_records,
    #   collection: "collection_archive",
    #   configs: {"behavior.on.null.values": "ignore"})
    database.write(records: processed_records, collection: "collection_archive")
  end
end

class Passthrough < TurbineRb::Process
  def call(records:)
    puts "got records: #{records}"
    # To get the value of unformatted records, use record .value getter method
    # records.map { |r| puts r.value }
    #
    # To transform unformatted records, use record .value setter method
    # records.map { |r| r.value = "newdata" }
    #
    # To get the value of json formatted records, use record .get method
    # records.map { |r| puts r.get("message") }
    #
    # To transform json formatted records, use record .set methods
    # records.map { |r| r.set('message', 'goodbye') }
    records
  end
end

TurbineRb.register(MyApp.new)rub

TurbineRb Methods

The following are the methods available for use in TurbineRb code.

Using Resources

Once you have successfully created Resources for your streaming app on the Meroxa Platform, you may use the resources method to call them in your TurbineRb code.

In the following code example, the source_name or destination_name are the names you’ve defined for your Resources on the Meroxa Platform. e.g. We recommend something descriptive and recognizable like pgproduction or dwh.

source = app.resource(name: 'resource_name')
// ...
destination = app.resource(name: 'destination_name')

A limit of one Source Resource is required for your streaming app. However, you may enrich data records using any number of APIs through secrets. A Destination Resource is both not required or you may have multiple for your streaming app.

Streaming Data

To get data streaming into your app, use the records method. This is how you identify the records or events you want to stream in your TurbineRb code.

This can only be used once you have configured a Source Resource on the Meroxa Platform and have identified it in your TurbineRb code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the records method. In this example case, it is source.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc).

For example: Let’s say you have a table named Users in a relational-database. In the case of PostgreSQL using logical replication, you might use public.Users or Users for MySQL using binlog.

records = database.records(collection: 'collection_name')

You may optionally use the write method to tell Turbine where to write the output of your streaming app.

Similar to records, this can only be used once you have configured a Destination Resource on the Meroxa Platform and have identified it in your TurbineRb code using the resources method.

You will want to pair the appropriate Source Resource variable name you defined earlier with the write method. In this example case, it is destination.

The collection_name should be replaced with the name of your data collection construct in your datastore (e.g., table, collection, bucket, index, etc). In the case of multiple Destination Resources you will need to do this for each.

For example: Let’s say you have a bucket named logs in Amazon S3. This is where you’d define the bucket name that will serve as your destination.

database.write(records: processed_records, collection: "collection_archive")

# You'll notice `anonymized` referenced in this line of code—we'll speak more to this under `Processing data`.

In some cases, special configurations for certain Resources may be necessary to achieve certain outcomes. This can be done by including the following argument with the configuration and value associated with the desired setting.

await destination_db.write(anonymized, "collection_name", {"configuration_name":"setting_value"})

Processing Data

To start processing the data you have streaming through your application you will need to write your custom code you want to process against the stream of data.

The process method determines how to process the records coming in to your streaming app against a function.

First, you will need to write your the custom code you would like to execute against the stream. In the following example code, we are using CDC events. You will see in the example app’s app.rb file we have defined a function Anonymize that looks for payload.after.customer_email. Turbine will search for an customer_email field in the event payload and apply a hash to that field.

class Passthrough < TurbineRb::Process
  def call(records:)
    puts "got records: #{records}"
    # To get the value of unformatted records, use record .value getter method
    # records.map { |r| puts r.value }
    #
    # To transform unformatted records, use record .value setter method
    # records.map { |r| r.value = "newdata" }
    #
    # To get the value of json formatted records, use record .get method
    # records.map { |r| puts r.get("message") }
    #
    # To transform json formatted records, use record .set methods
    # records.map { |r| r.set('message', 'goodbye') }
    records
  end
end

Next, you will use the process method, calling the name of the stream from the Source Resource by referencing the variable name you have given it. In the example case: records.

processed_records = app.process(records: records, process: Passthrough.new)

Once your streaming app is deployed on the Meroxa Platform, Meroxa will do the work to take each record or event that is streamed to your app and run your code against it. The Meroxa Platform will scale out the processing of records relative to the velocity of records streaming in.

Replace Passthrough.new with the function you wish to process against the event or data stream.

You may also forgo process and do a direct end-to-end stream of records or events.

Simply eliminate the process method in the code and change the write code to call records instead of processed_records.

database.write(records: records, collection: "collection_name")

Managing Secrets

The following example depicts how you would manage secrets used to authenticate with external systems you may be tapping into to retrieve data for streaming and processing through an API or integration.

You will need to create environment variables and reference them in the TurbineRb code by passing an additional parameter into the process method used.

processed_records = app.process(records: records, process: Passthrough.new, { SECRET_KEY: process.env.SECRET_KEY })

app.json

This file serves as a configuration file, used by the Meroxa CLI and Platform to attain information about your streaming app. Upon initialization of a streaming app, the CLI will scaffold the file for you with available options:

{
  "name": "liveapp",
  "language": "ruby",
  "environment": "common",
  "resources": {
    "source_name": "fixtures/path"
  }
}

Name — The name of your application. It is set upon initializing your streaming app and should not change.

Language — The language your streaming app will be written in. It is set upon initializing your streaming app and should not change.

Environmentcommon is the only available environment for streaming apps. The Meroxa Platform does have the ability to create isolated environments but this feature is currently in private beta.

Resources — These are the named systems and databases that you'll use in your streaming app. The source_name needs to match the name of the Source Resource that you create on Meroxa Platform running the meroxa resources create command with the Meroxa CLI or Dashboard. You can point to the path in the fixtures that'll be used to mock the Resource when you run meroxa apps run.

The source_name string should map to a Resource name in your app.json to map the Resource variable tied to the .records method to a fixture file in your /fixtures/.

Unit Testing

Those familiar with software development are likely already in the habit of writing unit tests. We encourage you to follow this convention throughout the development process to ensure each unit of code works as expected and as a way to identify any issues or bugs early on.

Unit testing is language-specific, you may use any testing framework of your choice.

Bundle install

In your local app project directory, run the following command to ensure you have all the dependencies needed in the Gemfile before running or deploying your application:

$ bundle install