Skip to main content

Polling

For PostgreSQL, while using the polling connection type the connector will periodically query its data source, capture the result as a data record and pushed into a stream.

The connector can poll in various ways:

  • Polling incremental updates from tables
  • Polling bulk updates from tables
  • Polling incremental or bulk using a custom SELECT query

To add a new resource, see Add Postgres Resource.

Source Configuration#

To configure PostgreSQL as a destination:

Query modes#

The following query modes are supported:

Incremental Updates#

In these modes, the connector keeps track of the latest rows it has already processed in each table and retrieves only rows that were added or updated after those.

The incremental query modes use certain columns to discover created or updated rows. To make polling done by the connector efficient, it is advisable to have these columns indexed.

meroxa connector create from-postgres --from postgresDB --input User --config '{"incrementing.column.name": "id", "mode":"timestamp+incrementing", "timestamp.column.name":"updatedAt"}'

Bulk Mode#

In this mode, the connector will query tables without any filtering, periodically retrieving all rows from them and publishing them to Meroxa.

meroxa connector create from-postgres --from postgresDB --input User --config '{"mode":"bulk"}'

Custom Query#

Instead of relying on data source discovery, it is possible to set a custom query to be executed by the connector. This may be useful if joining tables or selecting a subset of columns is needed.

meroxa connector create from-postgres --from postgresDB --input User --config '{"mode":"bulk", "query" "SELECT non_sensitive from User"}'

Data Record#

The payload within the Data Records generated by this connector takes the following format:

{ "schema": <schema of payload>, "payload": <result of polling query> }

Configuration Options#

The following configuration is supported for this Connector:

ConfigurationDestination
incrementing.column.nameThe name of the strictly incrementing column to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable.
modeThe mode for updating a table each time it is polled. See Table Updating Modes
queryIf specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.
timestamp.column.nameComma separated list of one or more timestamp columns to detect new or modified rows using the COALESCE SQL function. Rows whose first non-null timestamp value is greater than the largest previous timestamp value seen will be discovered with each poll. At least one column should not be nullable.

Table Updating Modes#

The mode for updating a table each time it is polled. Options include:

  • bulk - perform a bulk load of the entire table each time it is polled
  • incrementing - use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
  • timestamp - use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
  • timestamp+incrementing - use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.

Considerations#

Capturing Deletes#

There are two options to solve the tracking of DELETE operations.

  • Most applications only do a soft delete. Instead of removing the record, updating a deletedAt or deleted(boolean) column. If that is the case, a "delete" operation to your application in this instance is an "update" operation to the database. So, deletes can be captures with timestamp and timestamp+incrementing modes.
  • Because this method is using polling to determine the changes, the absence of data is hard to track. Once common workaround here is to set up a Postgres Trigger. This trigger can capture every DELETE op and INSERT into a special delete record table. This new table can be configured as a new source within Meroxa.