Skip to main content

LogMiner Connector

solutions.a2.cdc.oracle.OraCdcLogMinerConnector uses Oracle LogMiner as the source for data changes. The connector minimizes the side effects of using Oracle LogMiner. Even for Oracle RDBMS versions with DBMS_LOGMNR.CONTINUOUS_MINE support, it does not use it. Instead, it reads V$LOGMNR_CONTENTS and saves information when OPERATION is INSERT, DELETE, or UPDATE in Java off-heap memory structures provided by Chronicle Queue. This approach minimizes database server load but requires additional disk space on the oracdc server.

Online Redo Log Processing

Since version 2.0, the connector supports online redo log processing when a2.process.online.redo.logs=true.

ArrayList Mode

Starting from version 1.5, ArrayList (heap) mode is also supported via a2.transaction.implementation=ArrayList. Note that LOB support is not available in this mode.

Supported Oracle Configurations

#ConfigurationV$DATABASE.OPEN_MODEDescription
1Standalone / Primary DatabaseREAD WRITEStandard primary database deployment
2Active DataGuard Physical StandbyREAD ONLYRead-only standby with Active DataGuard
3Physical Standby (MOUNTED)MOUNTEDUses LogMiner on standby with limited dictionary queries on primary. Auto-detects redo threads for Oracle RAC. Set a2.standby.activate=true.
4Distributed ConfigurationN/ASource database generates redo logs, a compatible mining database processes them
5Oracle RACN/AFull support for Real Application Clusters

Large Objects (LOBs) and XMLTYPE

LOB support is available from version 0.9.7 when a2.process.lobs=true. CLOB is supported only for columns where DBA_LOBS.FORMAT='ENDIAN NEUTRAL'. XMLTYPE is supported from version 0.9.8.2.

warning

When working with large objects, ensure you set appropriate Apache Kafka parameters for large message sizes:

  • Source connector: producer.max.request.size
  • Kafka broker/controller: replica.fetch.max.bytes and message.max.bytes

LOB_TRIM and LOB_ERASE Operations

Oracle LogMiner generates unparseable output for LOB_TRIM and LOB_ERASE operations when there is no dictionary connection. oracdc currently ignores these operations but logs debug-level information (since version 1.2.2).

LOB Transformation Feature

Available since version 0.9.8.3

For tables with large BLOBs that need to be delivered to systems with limited LOB support (such as Snowflake), you can implement custom LOB transformations. Create a class implementing the OraCdcLobTransformationsIntf interface:

package com.example.oracdc;

import solutions.a2.cdc.oracle.data.OraCdcLobTransformationsIntf;

public class TransformScannedDataInBlobs implements OraCdcLobTransformationsIntf {

@Override
public Schema transformSchema(String pdbName, String tableOwner,
String tableName, OraColumn lobColumn, SchemaBuilder valueSchema) {
if ("FND_LOBS".equals(tableName) &&
"FILE_DATA".equals(lobColumn.getColumnName())) {
final SchemaBuilder transformedSchemaBuilder = SchemaBuilder
.struct().optional()
.name(lobColumn.getColumnName()).version(1);
transformedSchemaBuilder.field("S3_URL", Schema.OPTIONAL_STRING_SCHEMA);
valueSchema.field(lobColumn.getColumnName(),
transformedSchemaBuilder.build());
return transformedSchemaBuilder.build();
} else {
return OraCdcLobTransformationsIntf.super.transformSchema(
pdbName, tableOwner, tableName, lobColumn, valueSchema);
}
}

@Override
public Struct transformData(String pdbName, String tableOwner,
String tableName, OraColumn lobColumn, byte[] content,
Struct keyStruct, Schema valueSchema) {
if ("FND_LOBS".equals(tableName) &&
"FILE_DATA".equals(lobColumn.getColumnName())) {
final Struct valueStruct = new Struct(valueSchema);
// Upload to S3 and get URL...
valueStruct.put("S3_URL", s3ObjectKey);
return valueStruct;
}
return null;
}
}

Then configure the connector to use your transformation class:

a2.process.lobs=true
a2.lob.transformation.class=com.example.oracdc.TransformScannedDataInBlobs

DDL Support and Schema Evolution

oracdc supports the following ALTER TABLE DDL clauses:

  1. ADD column(s)
  2. MODIFY column(s)
  3. DROP column(s)
  4. RENAME column
  5. SET UNUSED column(s)

Schema Evolution Algorithm

  1. First encounter: oracdc reads the table definition from the data dictionary, creates an immutable key schema ([PDB_NAME:]OWNER.TABLE_NAME.Key, version=1) and a mutable value schema (version=1).
  2. After DDL parsing and comparison: If a change is detected, the value schema version is incremented.

Error Resiliency

oracdc is resilient to Oracle database shutdown and restart events during DBMS_LOGMNR.ADD_LOGFILE calls or when waiting for new archived redo logs. The ORA-17410 error is intercepted, and the connector performs reconnection after a backoff time set by the a2.connection.backoff parameter.

Setting Up the Oracle Database

Enable ARCHIVELOG Mode

First, check the current archive log mode:

SELECT LOG_MODE FROM V$DATABASE;

If the database is in NOARCHIVELOG mode, enable archiving:

SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

Enable Supplemental Logging

You can enable supplemental logging for the whole database:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Or, for better performance, enable it for specific tables only (recommended):

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE <OWNER>.<TABLE_NAME> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
tip

Enabling supplemental logging at the table level rather than the database level can significantly reduce redo log generation and improve performance.

Verify Database-Level Settings

SELECT SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK,
SUPPLEMENTAL_LOG_DATA_ALL
FROM V$DATABASE;

Verify Table-Level Settings

SELECT LOG_GROUP_NAME, TABLE_NAME,
DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS
FROM DBA_LOG_GROUPS;

Create Oracle User for LogMiner

For CDB (Container Database)

CREATE USER C##ORACDC IDENTIFIED BY ORACDC
DEFAULT TABLESPACE SYSAUX
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON SYSAUX
CONTAINER=ALL;

ALTER USER C##ORACDC SET CONTAINER_DATA=ALL CONTAINER=CURRENT;

GRANT
CREATE SESSION,
SET CONTAINER,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY,
EXECUTE_CATALOG_ROLE,
LOGMINING
TO C##ORACDC
CONTAINER=ALL;

For Non-CDB or PDB Connection (19.10+)

CREATE USER ORACDC IDENTIFIED BY ORACDC
DEFAULT TABLESPACE SYSAUX
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON SYSAUX;

GRANT
CREATE SESSION,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY,
EXECUTE_CATALOG_ROLE,
LOGMINING
TO ORACDC;

Database Settings Check Utility

Use the built-in utility to verify your Oracle database is correctly configured for oracdc:

java -cp oracdc-kafka-standalone.jar \
solutions.a2.cdc.oracle.utils.OracleSetupCheck \
--jdbc-url <ORA-JDBC-URL> \
--user <ACCOUNT> --password <PASSWORD>

Using Docker:

docker run --rm -it a2solutions/oracdc oraCheck.sh \
--jdbc-url <ORA-JDBC-URL> \
--user <ACCOUNT> --password <PASSWORD>

CDB Architecture

In a CDB environment, oracdc must connect to CDB$ROOT. Starting from Oracle 19c RU 10 and Oracle 21c, you can connect either to CDB$ROOT or to an individual PDB.

Physical Standby Configuration (MOUNTED Mode)

Connecting to a physical standby in MOUNTED mode requires the SYSDBA privilege. Create an Oracle Wallet for the standby connection user and set a2.standby.activate=true in the connector configuration.

Distributed Database Configuration

To set up a distributed configuration:

  1. Copy the oracdc-kafka standalone jar to both the source and target servers.
  2. On the source server, start the SourceDatabaseShipmentAgent with the --bind-address and --port parameters.
  3. On the target server, start the TargetDatabaseShipmentAgent with a connection to the source agent and the --file-destination parameter.
  4. Configure the connector with a2.distributed.activate=true and the appropriate distributed parameters.

Data Type Mapping

The LogMiner connector follows the standard oracdc type mapping. When a2.oracdc.schemas=false (the default), the mapping is compatible with the Confluent JDBC Sink connector.

Oracle TypeKafka Connect SchemaAdditional Information
DATEint32org.apache.kafka.connect.data.Date
TIMESTAMP%int64org.apache.kafka.connect.data.Timestamp
INTERVAL YEAR TO MONTHbytessolutions.a2.cdc.oracle.data.OraIntervalYM
INTERVAL DAY TO SECONDbytessolutions.a2.cdc.oracle.data.OraIntervalDS
NUMBER(1,0), NUMBER(2,0)int8
NUMBER(3,0), NUMBER(4,0)int16
NUMBER(5,0) through NUMBER(8,0)int32
NUMBER (other integers up to 10^18)int64
NUMBER (no scale/precision)float64
NUMBER (all other)bytesorg.apache.kafka.connect.data.Decimal
BINARY_FLOATfloat32
BINARY_DOUBLEfloat64
RAWbytes
BLOBbytessolutions.a2.OraBlob
CHARstring
NCHARstring
VARCHAR2string
NVARCHAR2string
CLOBstringsolutions.a2.OraClob
NCLOBstringsolutions.a2.OraNClob
XMLTYPEstringsolutions.a2.OraXml
note

When a2.oracdc.schemas=true, oracdc uses its own extensions for Oracle NUMBER, TIMESTAMP WITH [LOCAL] TIMEZONE, INTERVAL YEAR TO MONTH, and INTERVAL DAY TO SECOND types.

Known Issues

1. Chronicle Queue Disk Space Messages

Chronicle Queue may produce excessive disk space monitoring messages. To suppress them, set the following in KAFKA_OPTS:

chronicle.disk.monitor.disable=true

2. Chronicle Queue Statistics Collection

Chronicle Queue may collect usage statistics. To avoid this, either switch to ArrayList mode or follow the Chronicle Queue opt-out instructions.

3. Excessive LogMiner Output in alert.log

Oracle LogMiner may generate excessive output in the database alert log. Refer to Oracle Support Notes 1632209.1 and 1485217.1 for mitigation steps.

Performance Tips

  1. Disable transaction auditing -- Consider setting _transaction_auditing=false after consulting with Oracle Support.
  2. Table-level supplemental logging -- Set supplemental logging at the table level rather than the database level to reduce redo generation.
  3. File system sizing -- Ensure proper file system sizing for Chronicle Queue storage.
  4. Open files limits -- Configure appropriate open files limits for the OS user running oracdc.
  5. Enable LogMiner tracing -- Set a2.logminer.trace=true to identify performance bottlenecks.
  6. Increase SDU -- Increase the Session Data Unit (SDU) size for network optimization.
  7. Review JMX metrics -- Monitor JMX metrics and tune a2.fetch.size accordingly.
  8. Monitor Chronicle Queue storage -- Use JMX metrics such as MaxTransactionSizeMiB to monitor Chronicle Queue storage usage.
  9. System tuning -- Set appropriate values for vm.max_map_count and -XX:MaxDirectMemorySize.
  10. Use Java 17 LTS or later -- Ensure you are running Java 17 LTS or later with the required module exports configured.
  11. Oracle patching -- Review Oracle GoldenGate and Database patch recommendations for LogMiner stability.