LogMiner Connector
solutions.a2.cdc.oracle.OraCdcLogMinerConnector uses Oracle LogMiner as the source for data changes. The connector minimizes the side effects of using Oracle LogMiner. Even for Oracle RDBMS versions with DBMS_LOGMNR.CONTINUOUS_MINE support, it does not use it. Instead, it reads V$LOGMNR_CONTENTS and saves information when OPERATION is INSERT, DELETE, or UPDATE in Java off-heap memory structures provided by Chronicle Queue. This approach minimizes database server load but requires additional disk space on the oracdc server.
Since version 2.0, the connector supports online redo log processing when a2.process.online.redo.logs=true.
Starting from version 1.5, ArrayList (heap) mode is also supported via a2.transaction.implementation=ArrayList. Note that LOB support is not available in this mode.
Supported Oracle Configurations
| # | Configuration | V$DATABASE.OPEN_MODE | Description |
|---|---|---|---|
| 1 | Standalone / Primary Database | READ WRITE | Standard primary database deployment |
| 2 | Active DataGuard Physical Standby | READ ONLY | Read-only standby with Active DataGuard |
| 3 | Physical Standby (MOUNTED) | MOUNTED | Uses LogMiner on standby with limited dictionary queries on primary. Auto-detects redo threads for Oracle RAC. Set a2.standby.activate=true. |
| 4 | Distributed Configuration | N/A | Source database generates redo logs, a compatible mining database processes them |
| 5 | Oracle RAC | N/A | Full support for Real Application Clusters |
Large Objects (LOBs) and XMLTYPE
LOB support is available from version 0.9.7 when a2.process.lobs=true. CLOB is supported only for columns where DBA_LOBS.FORMAT='ENDIAN NEUTRAL'. XMLTYPE is supported from version 0.9.8.2.
When working with large objects, ensure you set appropriate Apache Kafka parameters for large message sizes:
- Source connector:
producer.max.request.size - Kafka broker/controller:
replica.fetch.max.bytesandmessage.max.bytes
LOB_TRIM and LOB_ERASE Operations
Oracle LogMiner generates unparseable output for LOB_TRIM and LOB_ERASE operations when there is no dictionary connection. oracdc currently ignores these operations but logs debug-level information (since version 1.2.2).
LOB Transformation Feature
Available since version 0.9.8.3
For tables with large BLOBs that need to be delivered to systems with limited LOB support (such as Snowflake), you can implement custom LOB transformations. Create a class implementing the OraCdcLobTransformationsIntf interface:
package com.example.oracdc;
import solutions.a2.cdc.oracle.data.OraCdcLobTransformationsIntf;
public class TransformScannedDataInBlobs implements OraCdcLobTransformationsIntf {
@Override
public Schema transformSchema(String pdbName, String tableOwner,
String tableName, OraColumn lobColumn, SchemaBuilder valueSchema) {
if ("FND_LOBS".equals(tableName) &&
"FILE_DATA".equals(lobColumn.getColumnName())) {
final SchemaBuilder transformedSchemaBuilder = SchemaBuilder
.struct().optional()
.name(lobColumn.getColumnName()).version(1);
transformedSchemaBuilder.field("S3_URL", Schema.OPTIONAL_STRING_SCHEMA);
valueSchema.field(lobColumn.getColumnName(),
transformedSchemaBuilder.build());
return transformedSchemaBuilder.build();
} else {
return OraCdcLobTransformationsIntf.super.transformSchema(
pdbName, tableOwner, tableName, lobColumn, valueSchema);
}
}
@Override
public Struct transformData(String pdbName, String tableOwner,
String tableName, OraColumn lobColumn, byte[] content,
Struct keyStruct, Schema valueSchema) {
if ("FND_LOBS".equals(tableName) &&
"FILE_DATA".equals(lobColumn.getColumnName())) {
final Struct valueStruct = new Struct(valueSchema);
// Upload to S3 and get URL...
valueStruct.put("S3_URL", s3ObjectKey);
return valueStruct;
}
return null;
}
}
Then configure the connector to use your transformation class:
a2.process.lobs=true
a2.lob.transformation.class=com.example.oracdc.TransformScannedDataInBlobs
DDL Support and Schema Evolution
oracdc supports the following ALTER TABLE DDL clauses:
ADDcolumn(s)MODIFYcolumn(s)DROPcolumn(s)RENAMEcolumnSET UNUSEDcolumn(s)
Schema Evolution Algorithm
- First encounter: oracdc reads the table definition from the data dictionary, creates an immutable key schema (
[PDB_NAME:]OWNER.TABLE_NAME.Key,version=1) and a mutable value schema (version=1). - After DDL parsing and comparison: If a change is detected, the value schema version is incremented.
Error Resiliency
oracdc is resilient to Oracle database shutdown and restart events during DBMS_LOGMNR.ADD_LOGFILE calls or when waiting for new archived redo logs. The ORA-17410 error is intercepted, and the connector performs reconnection after a backoff time set by the a2.connection.backoff parameter.
Setting Up the Oracle Database
Enable ARCHIVELOG Mode
First, check the current archive log mode:
SELECT LOG_MODE FROM V$DATABASE;
If the database is in NOARCHIVELOG mode, enable archiving:
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
Enable Supplemental Logging
You can enable supplemental logging for the whole database:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Or, for better performance, enable it for specific tables only (recommended):
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE <OWNER>.<TABLE_NAME> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Enabling supplemental logging at the table level rather than the database level can significantly reduce redo log generation and improve performance.
Verify Database-Level Settings
SELECT SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK,
SUPPLEMENTAL_LOG_DATA_ALL
FROM V$DATABASE;
Verify Table-Level Settings
SELECT LOG_GROUP_NAME, TABLE_NAME,
DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS
FROM DBA_LOG_GROUPS;
Create Oracle User for LogMiner
For CDB (Container Database)
CREATE USER C##ORACDC IDENTIFIED BY ORACDC
DEFAULT TABLESPACE SYSAUX
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON SYSAUX
CONTAINER=ALL;
ALTER USER C##ORACDC SET CONTAINER_DATA=ALL CONTAINER=CURRENT;
GRANT
CREATE SESSION,
SET CONTAINER,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY,
EXECUTE_CATALOG_ROLE,
LOGMINING
TO C##ORACDC
CONTAINER=ALL;
For Non-CDB or PDB Connection (19.10+)
CREATE USER ORACDC IDENTIFIED BY ORACDC
DEFAULT TABLESPACE SYSAUX
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON SYSAUX;
GRANT
CREATE SESSION,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY,
EXECUTE_CATALOG_ROLE,
LOGMINING
TO ORACDC;
Database Settings Check Utility
Use the built-in utility to verify your Oracle database is correctly configured for oracdc:
java -cp oracdc-kafka-standalone.jar \
solutions.a2.cdc.oracle.utils.OracleSetupCheck \
--jdbc-url <ORA-JDBC-URL> \
--user <ACCOUNT> --password <PASSWORD>
Using Docker:
docker run --rm -it a2solutions/oracdc oraCheck.sh \
--jdbc-url <ORA-JDBC-URL> \
--user <ACCOUNT> --password <PASSWORD>
CDB Architecture
In a CDB environment, oracdc must connect to CDB$ROOT. Starting from Oracle 19c RU 10 and Oracle 21c, you can connect either to CDB$ROOT or to an individual PDB.
Physical Standby Configuration (MOUNTED Mode)
Connecting to a physical standby in MOUNTED mode requires the SYSDBA privilege. Create an Oracle Wallet for the standby connection user and set a2.standby.activate=true in the connector configuration.
Distributed Database Configuration
To set up a distributed configuration:
- Copy the
oracdc-kafkastandalone jar to both the source and target servers. - On the source server, start the
SourceDatabaseShipmentAgentwith the--bind-addressand--portparameters. - On the target server, start the
TargetDatabaseShipmentAgentwith a connection to the source agent and the--file-destinationparameter. - Configure the connector with
a2.distributed.activate=trueand the appropriate distributed parameters.
Data Type Mapping
The LogMiner connector follows the standard oracdc type mapping. When a2.oracdc.schemas=false (the default), the mapping is compatible with the Confluent JDBC Sink connector.
| Oracle Type | Kafka Connect Schema | Additional Information |
|---|---|---|
DATE | int32 | org.apache.kafka.connect.data.Date |
TIMESTAMP% | int64 | org.apache.kafka.connect.data.Timestamp |
INTERVAL YEAR TO MONTH | bytes | solutions.a2.cdc.oracle.data.OraIntervalYM |
INTERVAL DAY TO SECOND | bytes | solutions.a2.cdc.oracle.data.OraIntervalDS |
NUMBER(1,0), NUMBER(2,0) | int8 | |
NUMBER(3,0), NUMBER(4,0) | int16 | |
NUMBER(5,0) through NUMBER(8,0) | int32 | |
NUMBER (other integers up to 10^18) | int64 | |
NUMBER (no scale/precision) | float64 | |
NUMBER (all other) | bytes | org.apache.kafka.connect.data.Decimal |
BINARY_FLOAT | float32 | |
BINARY_DOUBLE | float64 | |
RAW | bytes | |
BLOB | bytes | solutions.a2.OraBlob |
CHAR | string | |
NCHAR | string | |
VARCHAR2 | string | |
NVARCHAR2 | string | |
CLOB | string | solutions.a2.OraClob |
NCLOB | string | solutions.a2.OraNClob |
XMLTYPE | string | solutions.a2.OraXml |
When a2.oracdc.schemas=true, oracdc uses its own extensions for Oracle NUMBER, TIMESTAMP WITH [LOCAL] TIMEZONE, INTERVAL YEAR TO MONTH, and INTERVAL DAY TO SECOND types.
Known Issues
1. Chronicle Queue Disk Space Messages
Chronicle Queue may produce excessive disk space monitoring messages. To suppress them, set the following in KAFKA_OPTS:
chronicle.disk.monitor.disable=true
2. Chronicle Queue Statistics Collection
Chronicle Queue may collect usage statistics. To avoid this, either switch to ArrayList mode or follow the Chronicle Queue opt-out instructions.
3. Excessive LogMiner Output in alert.log
Oracle LogMiner may generate excessive output in the database alert log. Refer to Oracle Support Notes 1632209.1 and 1485217.1 for mitigation steps.
Performance Tips
- Disable transaction auditing -- Consider setting
_transaction_auditing=falseafter consulting with Oracle Support. - Table-level supplemental logging -- Set supplemental logging at the table level rather than the database level to reduce redo generation.
- File system sizing -- Ensure proper file system sizing for Chronicle Queue storage.
- Open files limits -- Configure appropriate open files limits for the OS user running oracdc.
- Enable LogMiner tracing -- Set
a2.logminer.trace=trueto identify performance bottlenecks. - Increase SDU -- Increase the Session Data Unit (SDU) size for network optimization.
- Review JMX metrics -- Monitor JMX metrics and tune
a2.fetch.sizeaccordingly. - Monitor Chronicle Queue storage -- Use JMX metrics such as
MaxTransactionSizeMiBto monitor Chronicle Queue storage usage. - System tuning -- Set appropriate values for
vm.max_map_countand-XX:MaxDirectMemorySize. - Use Java 17 LTS or later -- Ensure you are running Java 17 LTS or later with the required module exports configured.
- Oracle patching -- Review Oracle GoldenGate and Database patch recommendations for LogMiner stability.