Skip to main content

Improving turbine-js function ergonomics

· 2 min read
@jmar910

Our latest Meroxa CLI version v2.4.0 includes one fix and two improvements to turbine-js based streaming apps that improve developer ergonomics and reduce boilerplate:

  • Added an unwrap transform, allowing users to optionally unwrap CDC formatted records in their data app and send them along to destinations. As a side-effect, this also fixes MySQL -> fn -> PG streaming apps.

  • Added get and set methods to individual records which allows users to read and write data on the record regardless of the underlying format

Here are some details:

Unwrap Transform

The records argument in the data app's function now comes with an optional, yet very important transform:

records.unwrap();

A user can use this transform in their data app function to unwrap CDC formatted records into the right format that destinations expect. Currently, most destinations will not accept CDC formatted data. (Amazon S3 being an exception)

A user will want to call this when CDC records are going to any destination (usually at the end of the data app function).

A user will not want to call this when CDC records are going to an s3 destination AND they need the CDC format preserved

This function only operates on CDC formatted data, and no ops otherwise.

Accessing records with get/set

The records argument in the data app's function is an array of records that can be iterated on. Each record in the array comes with two new functions for reading from/writing to the record as it passes through the function.

record.get('key')
record.set('key', 'some_value')
tip
  • record.get('nested.key.down.some.levels') to access nested keys and will differentiate between keys with dots

  • record.set('nested.\\key.with\\.dot', 'somevalue') to set nested keys, and use \\ to escape keys that have dots in them

We now recommend using get/set as the preferred ways of accessing data on record objects. These functions will work regardless if the data is CDC formatted or not (as long as the record data is a valid JSON object)

Putting it all together

before
  anonymize(records) {
records.forEach((record) => {
let payload = record.value.payload;
if (
isAttributePresent(payload.after) &&
isAttributePresent(payload.after.customer_email)
) {
payload.after.customer_email = iAmHelping(
stringHash(payload.after.customer_email)
);
}
});

return records;
}
after
  anonymize(records) {
records.forEach((record) => {
record.set(
"customer_email",
iAmHelping(stringHash(record.get("customer_email")))
);
});

// Used for most destinations
records.unwrap();

return records;
}