Oracle Database
Oracle Database, otherwise known as Oracle DB, is a multi-model database management system commonly used for running online transaction processing (OLTP) and data warehousing. Turbine streaming apps can read data in real time from your Oracle Database via the records
function.
Meroxa supports all Standard Edition Amazon RDS instances of Oracle Database versions 21
, 19
, 18
, 12
, or 11.2
as resources. If you require Enterprise Edition or hosting and connection support beyond Amazon RDS, please contact us.
Setup
Networking
To add Oracle Database as a resource, the database must be accessible by Meroxa. If not publicly accessible, here are some ways to give Meroxa access:
Resource Configuration
Use the meroxa resource create
command to configure your Oracle Database resource.
The following example depicts how this command is used to create an Oracle Database resource named my-oracle-db
on the Meroxa Platform.
$ meroxa resource create my-oracle-db \
--type oracle \
--url oracle://user:[email protected]:1521/database
Using Oracle as a Source with Turbine
- TurbineJs
- TurbineGo
- TurbinePy
- TurbineRb
TurbineJs
This example shows how to call the users
source table from the Oracle Database Resource my-oracle-db
. The minimum configuration for source connections requires the orderingColumn
to be set to a temporal or incrementing column, in this example we set it to id
.
exports.App = class App {
async run(turbine) {
let source = await turbine.resources("my-oracle-db");
let records = await source.records("users", {"orderingColumn": "id"});
}
};
TurbineGo
This example shows how to call the users
source table from the Oracle Database Resource my-oracle-db
. The minimum configuration for source connections requires the orderingColumn
to be set to a temporal or incrementing column, in this example we set it to id
.
package main
func main() {
runner.Start(App{})
}
var _ turbine.App = (*App)(nil)
type App struct{}
func (a App) Run(v turbine.Turbine) error {
source, err := v.Resources("my-oracle-db")
if err != nil {
return err
}
rr, err := source.Records("users", turbine.ConnectionOptions{
turbine.ConnectionOption{
Field: "orderingColumn",
Value: "id",
}})
if err != nil {
return err
}
return nil
}
TurbinePy
This example shows how to call the users
source table from the Oracle Database Resource my-oracle-db
. The minimum configuration for source connections requires the orderingColumn
to be set to a temporal or incrementing column, in this example we set it to id
.
class App:
@staticmethod
async def run(turbine: Turbine):
source = await turbine.resources("my-oracle-db")
records = await source.records("users", {"orderingColumn": "id"})
TurbineRb
This example shows how to call the users
source table from the Oracle Database Resource my-oracle-db
. The minimum configuration for source connections requires the orderingColumn
to be set to a temporal or incrementing column, in this example we set it to id
.
# frozen_string_literal: true
require "rubygems"
require "bundler/setup"
require "turbine_rb"
class MyApp
def call(app)
Sourcedb = app.reSource(name: "my-oracle-db")
records = sourcedb.records(collection: "users", configs: {"orderingColumn" => "id"})
end
end
Configurations
The following Source connection configurations are supported:
Configuration | Required? | Example | Description |
---|---|---|---|
orderingColumn | Required | id | The column name that the connector will use for ordering rows. Column must contain unique values and suitable for sorting, otherwise the snapshot won't work correctly. |
keyColumns | Optional | id,uuid | If the field is empty, the connector makes a request to the database and uses the received list of primary keys of the specified table. If the table does not contain primary keys, the connector uses the value of the orderingColumn field as the keyColumns value. |
snapshot | Optional, default is true | false | Enables or disables the ability to snapshot the entire table before starting CDC mode. |
columns | Optional | id,name,age | A list of column names that should be included in each record's payload, by default includes all columns. |
batchSize | Optional, default is 1000 | 100 | Sets the size of rows batch. Min value is 1 and max value is 100000 . |
Supported Connections
The Oracle Database resource on the Meroxa Platform leverages Change Data Capture (CDC) for Source connections.
Change Data Capture (CDC)
This connection implements CDC features for Oracle by adding a tracking table and a trigger to populate it. Your Oracle Database user must have privileges to CREATE TABLE
and ADMINISTER DATABASE TRIGGER
for the source table. We will walk you through the process of granting Oracle DB user privileges later in this documentation.
The tracking table and trigger name have the same names as a source table with the prefix MEROXA
. The tracking table has all the same columns as the source table plus three additional columns:
Column | Description |
---|---|
CONNECTOR_TRACKING_ID | Auto increment index for the position. |
CONNECTOR_OPERATION_TYPE | Operation type: INSERT , UPDATE , or DELETE . |
CONNECTOR_TRACKING_CREATED_AT | Date when the event was added to the tracking table. |
An event record will be written in a tracking table when data is added, changed, or deleted from an Oracle Database table. The queries retrieving these event records from the tracking table are similar to those used in Snapshot mode but with CONNECTOR_TRACKING_ID
as the ordering column.
An Ack method will collect the CONNECTOR_TRACKING_ID
of the event records successfully applied and are later removed from the tracking table every 5 seconds or when the connection is closed.
Position example:
{
"mode": "snapshot",
"last_processed_val": 1
}
The mode
field represents a mode of the iterator (snapshot
or cdc
).
The last_processed_val
field represents the last processed element value, and it depends on the iterator mode.
Things to know
Changes sometimes need to be made to columns in an Oracle Database table. The changes must also be applied to the tracking table when this happens by your Oracle Database administrator.
All tracking information only exists within the Oracle Database. Upon deletion of the tracking table, the tracking process will restart from the beginning by initiating a new snapshot of the table, which could lead to unintended replication of data downstream.
Data Record Format
Data records from Oracle Database will take on the following format:
{
"schema": {
"type": "struct",
"name": "resource_17464_094473.Envelope",
"fields": [
{
"type": "struct",
"optional": true,
"fields": [
{
"type": "string",
"field": "PRODUCT_NAME"
},
{
"type": "string",
"field": "PRODUCT_TYPE"
},
{
"type": "string",
"field": "SHIPPING_ADDRESS"
},
{
"type": "string",
"field": "CATEGORY"
},
{
"type": "string",
"field": "EMAIL"
},
{
"type": "double",
"field": "ID"
}
],
"field": "before"
},
{
"type": "struct",
"optional": true,
"fields": [
{
"type": "string",
"field": "PRODUCT_TYPE"
},
{
"type": "string",
"field": "SHIPPING_ADDRESS"
},
{
"type": "string",
"field": "CATEGORY"
},
{
"type": "string",
"field": "EMAIL"
},
{
"type": "double",
"field": "ID"
},
{
"type": "string",
"field": "PRODUCT_NAME"
}
],
"field": "after"
},
{
"type": "struct",
"optional": true,
"fields": [
{
"type": "string",
"field": "opencdc.createdAt"
},
{
"type": "string",
"field": "opencdc.version"
},
{
"type": "string",
"field": "opencdc.readAt"
},
{
"type": "string",
"field": "conduit.source.connector.id"
},
{
"type": "string",
"field": "oracle.table"
}
],
"field": "source"
},
{
"type": "string",
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"optional": true,
"fields": [
{
"type": "string",
"field": "id"
},
{
"type": "int64",
"field": "total_order"
},
{
"type": "int64",
"field": "data_collection_order"
}
],
"field": "transaction"
}
]
},
"payload": {
"before": null,
"after": {
"CATEGORY": "camping",
"EMAIL": "[email protected]",
"ID": 1,
"PRODUCT_NAME": "Forte 35 Sleeping Bag \ufffd\ufffd\ufffd Womens",
"PRODUCT_TYPE": "sleeping-bag",
"SHIPPING_ADDRESS": "9718 East Virginia Avenue Silver Spring, MD 20901"
},
"source": {
"conduit.source.connector.id": "22a2786f-4a83-4ea9-a5c9-76ffaff1353b",
"opencdc.createdAt": "1678894545152703306",
"opencdc.readAt": "1678894545152704451",
"opencdc.version": "v1",
"oracle.table": "ORDERS"
},
"op": "r",
"ts_ms": 1678894545152,
"transaction": null
}
}
Amazon RDS Setup
Requirements
- Configure Security Groups
- Configure Network ACLs (only if your database is within a VPC)
- Enable Oracle Database User Privileges (required for CDC Connection)
Configure Security Groups
To allow access to your Oracle Database from Meroxa IPs, follow these steps:
- Go to Amazon RDS in your AWS account.
- Go to Databases in the Amazon RDS menu.
- Click into the Oracle Database you'd like to use with Meroxa.
- In the Connectivity & security section, under Security and VPC security groups, click your security group by clicking the blue hyperlink.
- You should now see a list of Security Groups, click into the selected security group by clicking the blue hyperlink under Security group ID.
- Under Inbound rules, click the Edit inbound rules button.
- Click the Add rule button.
- Set Protocol to
TCP
. - Set Port to the port of your database. For Oracle Database it's
1521
by default. - Set the Source to include the Meroxa IPs.
Configure Network ACLs
You only need to update this if your database is within a VPC.
To allow access to your Oracle Database from Meroxa IPs, follow these steps:
- Go to Amazon RDS in your AWS account.
- Go to Databases in the Amazon RDS menu.
- Click into the Oracle Database instance you'd like to use.
- In the Connectivity & security section, under Networking and VPC, click your VPC by clicking the blue hyperlink.
- You should now see a list of Your VPCs. Click into the selected VPC by clicking the blue hyperlink under VPC ID.
- In your VPC Details, click the blue hyperlink under Main network ACL.
- You should now see a list of Network ACLs. Click into the selected Network ACL by clicking the blue hyperlink under Network ACL ID.
- You will update both the Inbound and Outbound Rules of the Network ACL to contain
ALL - 0.0.0.0/0 - ALLOW
. - If your Inbound and Outbound Rules do not contain
ALL - 0.0.0.0/0 - ALLOW
—update the Source to allow Meroxa IPs both Inbound and Outbound.
Oracle Database User Privileges
When using Amazon RDS you can connect with your Oracle Database in two ways: Oracle SQL developer or SQL*Plus. In both cases you will need an Oracle DB endpoint.
Oracle Database Endpoint in Amazon RDS
Here are instructions on how to find your Amazon RDS Oracle Database endpoint and port.
- Go to Amazon RDS in your AWS account.
- Go to Databases in the Amazon RDS menu.
- Click into the Oracle Database instance you'd like to use.
- In the Connectivity & security section, under Endpoint & port, find the Endpoint and Port for your Oracle DB instance.
Granting Privileges
If you do not have privileges to grant Oracle Database user privileges, please ask your Oracle Database administrator.
The user used to connect with the resource must have privileges to CREATE TABLE
and ADMINISTER DATABASE TRIGGER
. To enable the required CDC capabilities, run the following queries against your Oracle DB instance using Oracle SQL developer or SQL*Plus.
-- Create a 'meroxa' user with a very secure password.
SQL> CREATE USER meroxa IDENTIFIED BY very-secure-password;
-- Grant the CREATE TABLE privilege to the 'meroxa' user on the 'users' table.
SQL> GRANT CREATE TABLE TO meroxa ON users;
-- Grant the ADMINISTER DATABASE TRIGGER privilege to the 'meroxa' user on the 'users' table.
SQL> GRANT ADMINISTER DATABASE TRIGGER TO meroxa ON users;