Skip to main content

MongoDB

MongoDB is a NoSQL database that uses JSON-like documents with optional schemas. It can be used as an upstream or downstream resource in your Turbine streaming apps using the records or write functions to a select collection in a database.

Meroxa supports self-hosted and Mongo Atlas instances of MongoDB versions 3.4.x or later.

Setup

Resource Configuration

Use the meroxa resource create command to configure your MongoDB resource.

The following example depicts how this command is used to create an MongoDB resource named mongo with the minimum configuration required.

$ meroxa resource create mongo \
--type mongodb \
--url "mongodb://$MONGO_USER:$MONGO_PASS@$MONGO_URL:$MONGO_PORT"

In the example above, replace following variables with valid credentials from your MongoDB environment:

  • $MONGO_USER - MongoDB Username
  • $MONGO_PASS - MongoDB Password
  • $MONGO_URL - MongoDB URL
  • $MONGO_PORT - MongoDB Port (e.g., 27017).

Meroxa accepts any valid Mongo Connection String URI.

Connections

Operations Log (Oplog)

The MongoDB resources leverage the native oplog (as used internally within replica sets) to capture changes from a collection.

The following configuration is supported for this resource:

ConfigurationDestination
collectionWhitelist filter for extracting a subset of fields from elastic-search JSON documents. The whitelist filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).
index.prefixIndices prefix to include in copying.
incrementing.field.nameAn incremental/temporal field such as a timestamp or an incrementing id.

Data Record Format

Data records from MongoDB using Oplog events will take on the following format:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Json",
        "version": 1,
        "field": "after"
      },
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Json",
        "version": 1,
        "field": "patch"
      },
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Json",
        "version": 1,
        "field": "filter"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "rs"
          },
          {
            "type": "string",
            "optional": false,
            "field": "collection"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "ord"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "h"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "tord"
          },
          {
            "type": "string",
            "optional": true,
            "field": "stxnid"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mongo.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": true,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "resource.public.orders.Envelope"
  },
  "payload": {
    "after": "{\"_id\": {\"$oid\": \"6258ed4929e237f48c3e5e52\"},\"id\": {\"$numberLong\": \"1\"},\"category\": \"camping\",\"product_type\": \"sleeping-bag\",\"product_name\": \"Forte 35 Sleeping Bag - Womens\",\"stock\": true,\"product_id\": {\"$numberLong\": \"361632\"},\"shipping_address\": \"9718 East Virginia Avenue Silver Spring, MD 20901\",\"email\": \"[email protected]\"}",
    "patch": null,
    "filter": null,
    "source": {
      "version": "1.2.5.Final",
      "connector": "mongodb",
      "name": "resource",
      "ts_ms": 1650321082000,
      "snapshot": "true",
      "db": "public",
      "rs": "atlas-mhg1us-shard-0",
      "collection": "orders",
      "ord": 5,
      "h": null,
      "tord": null,
      "stxnid": null
    },
    "op": "r",
    "ts_ms": 1650418309976,
    "transaction": null
  }
}