Skip to main content

PostgreSQL Source

The platform supports two connection types for PostgreSQL resources used as Sources: Change Data Capture (CDC) and Polling.

Change Data Capture (CDC)

Change Data Capture (CDC) requires logical replication to be enabled for the PostgreSQL resource, including the database and credentials used on the platform. To learn more about PostgreSQL logical replication capabilities, see the PostgreSQL logical replication documentation.

Database Configuration

1. Set wal_level to logical.

The wal_level in the postgresql.conf must be set to logical. This will increase the WAL volume for table configured for REPLICA IDENTITY FULL.

Check what the wal_level value is set to by running the following command in psql:

postgres SHOW wal_level;

If you do not have the appropriate permissions, you may need to contact the administrator of the PostgreSQL database instance to check and set this for you.

2. Enable REPLICA IDENTITY.

REPLICA IDENTITY is a PostgreSQL-specific table-level setting that determines the amount of information for UPDATE and DELETE events.

Meroxa will still capture INSERT, UPDATE and DELETE events but will only see the resulting change in the after portion of the data record. It will not see the before portion of the data record.

Capture full changes including of before and after by running the following command in psql:

ALTER TABLE "table_name" REPLICA IDENTITY FULL;

Database User Permissions

For logical replication to be used, the configuration most include database credentials with the appropriate user permissions to read the table and create a replication slot.

Create a new user with replication enabled by running the following command in psql:

CREATE USER meroxa WITH REPLICATION ENCRYPTED PASSWORD 'secure-password-here';

If the user credentials provided do not have permission to create a replication slot, Meroxa will automatically fall back to Polling for changes.

Resource Configuration

When creating a resource, you must set logical_replication to true. You can accomplish this in either the Meroxa CLI and Dashboard. See PostgreSQL Resource Configuration documentation for more details.

Change Data Capture (CDC) Data Record Format

{
  "schema": {
    // redacted ...
  },
  "payload": {
    // redacted ...
    "after": {
      "id": 1,
      "category": "camping",
      "product_type": "sleeping-bag",
      "product_name": "Forte 35 Sleeping Bag — Womens",
      "stock": true,
      "product_id": 361632,
      "shipping_address": "9718 East Virginia Avenue Silver Spring, MD 20901",
      "customer_email": "[email protected]"
    },
    "source": {
    // redacted ...
    },
    // redacted ...
  }
}

Polling

Requirements

  • Enable database user permission to read table.
  • Provide incremenetal configuration in Turbine code.

Database User Permissions

For polling to be used, the configuration most include database credentials with the appropriate user permission to read the table.

Query Modes

The polling connection configuration is required to poll incremental, bulk, or custom SELECT query updates from tables.

There are three query modes supported for the polling connection type:

Incremental Updates

With the incremental updates mode, the connection keeps track of the latest rows it has already processed in each table and retrieves only rows that were added or updated after the fact.

Incremental query mode uses certain columns to discover created or updated rows. To make things efficient, we recommend that these columns be indexed.

To configure this, you must add the following configuration to your Turbine code next to the records method after indicating the table name:

.records("tablename", {"incrementing.column.name": "id", "mode":"timestamp+incrementing", "timestamp.column.name":"updatedAt"});

Bulk Mode

With the bulk mode, the connection queries tables without any filtering. Periodically retrieving all rows and sending them to the Turbine streaming application.

To configure this, you must add the following configuration to your Turbine code next to the records method after indicating the table name:

.records("tablename", {"mode":"bulk"});

Custom Query

With the custom query mode, the connection executes a custom query. This may be useful if joining tables or if selecting a subset of columns. This can be used in combination with Incremental Updates and Bulk Mode.

.records("tablename", {"mode":"bulk", "query" "SELECT non_sensitive from User"});

Polling Configurations

ConfigurationDestination
incrementing.column.nameThe name of the strictly incrementing column to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable.
modeThe mode for updating a table each time it is polled. See Table Updating Modes
queryIf specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.
timestamp.column.nameComma separated list of one or more timestamp columns to detect new or modified rows using the COALESCE SQL function. Rows whose first non-null timestamp value is greater than the largest previous timestamp value seen will be discovered with each poll. At least one column should not be nullable.

Table Updating Modes

The mode for updating a table each time it is polled. Options include:

  • bulk: perform a bulk load of the entire table each time it is polled
  • incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
  • timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
  • timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.

Capturing Deletes

There are two options to solve the tracking of DELETE operations.

  • Most applications only do a soft delete. Instead of removing the record, updating a deletedAt or boolean deleted column. If that is the case, a DELETE oepration to your application in this instance is an UPDATE operation to the database. So, deletes can be captured with timestamp and timestamp+incrementing modes.
  • Because this method is using polling to determine the changes, the absence of data is hard to track. Once common workaround here is to set up a PostgreSQL Trigger. This trigger can capture every DELETE operation and INSERT into a special delete record table. This new table can be configured as a new source within Meroxa.

Polling Data Record Format

{
  "schema": {
    // redacted ...
  },
  "payload": {
    "id": 1,
    "category": "camping",
    "product_type": "sleeping-bag",
    "product_name": "Forte 35 Sleeping Bag — Womens",
    "stock": true,
    "product_id": 361632,
    "shipping_address": "9718 East Virginia Avenue Silver Spring, MD 20901",
    "customer_email": "[email protected]"
  }
}

Turbine Code Examples

Once you have configured a PostgreSQL resource, you can use it as a source in your streaming application by using the resources and records methods in your Turbine code.

In this example, the PostgreSQL Resource is named named pg_db. We want to ingest data from the users table. If you have a schema other than public, you'll need to indicate that like so: schema_name.users This will allow your streaming app to ingest a stream of data from PostgreSQL that can later be adapted using a Function.

  exports.App = class App {
    async run(turbine) {
      let source = await turbine.resources("pg_db");
      let records = await source.records("users");
    }
  };

To learn more about how to use these Turbine methods to stream data from your PostgreSQL source to your Turbine streaming application using JavaScript, check out the Turbine App Overview for JavaScript.