Skip to main content

RedoMiner Connector

solutions.a2.cdc.oracle.OraCdcRedoMinerConnector reads changes from redo log files directly without using any additional software installed on the database server or in the Oracle database itself.

Oracle Database Connections

The connector requires a connection to an Oracle database for:

  1. Reading information about table column data types and table key fields
  2. Reading the current database SCN and the topology of archived and online redo files
Parameter NameTypeDescription
a2.jdbc.urlStringOracle Database JDBC URL
a2.jdbc.usernameStringJDBC connection username
a2.jdbc.passwordStringJDBC connection password

Selecting the Operating Mode

Online Redo Log Processing

With default settings, the connector processes only archived redo log files. To process online redo logs:

Parameter NameValueDescription
a2.process.online.redo.logstrueMandatory setting
a2.scn.query.interval.msIntegerOptional, minimum time in ms to determine current SCN. Default - 60,000

Memory Management

Supports both On-Heap and Off-Heap memory (off-heap is default). Set a2.transaction.implementation:

  • ChronicleQueue (default) - uses Chronicle Queue for off-heap memory
  • ArrayList - uses ArrayList for heap memory

For ArrayList, set initial capacity with:

Parameter NameValueDescription
a2.array.list.default.capacityIntegerOptional, initial capacity. Default - 32

Large Objects Processing

By default, large objects (BLOB, CLOB, NCLOB, extended RAW/VARCHAR2/NVARCHAR2, XMLTYPE, JSON, VECTOR) are not processed. To enable:

Parameter NameValueDescription
a2.transaction.implementationChronicleQueueMandatory setting
a2.process.lobstrueMandatory setting
warning

Set Apache Kafka parameters for large object sizes:

  • Source connector: producer.max.request.size
  • Kafka broker/controller: replica.fetch.max.bytes and message.max.bytes

Oracle RAC

To work with Oracle RAC, set a2.use.rac=true. The connector detects RAC by querying V$ACTIVE_INSTANCES and starts a separate Kafka Connect task for each redo thread/RAC instance. Partition mapping: <KAFKA_PARTITION_NUMBER> = <THREAD#> - 1

Using a Physical Standby Database

To avoid load on the primary, use a standby database (MOUNTED, READ ONLY, or READ ONLY WITH APPLY). Required parameters:

Parameter NameValueDescription
a2.standby.activatetrueMandatory
a2.standby.jdbc.urlStringJDBC URL to Physical Standby
a2.standby.wallet.locationStringOracle Wallet location for standby
a2.standby.privilegeDefault: sysdgCan be sysdg, sysbackup, or sysdba
note

Using asynchronous redo transport (LogXptMode=ASYNC / Maximum Performance) makes latency unpredictable. Using FASTSYNC (Maximum Availability) or SYNC (Maximum Protection) minimizes latency.

Methods for Reading Redo Log Files

The a2.storage.media parameter controls the reading method:

ValueDescription
FSDefault. Read from local file system
ASMRead from Oracle ASM
SSHRead from remote file system via SSH
SMBRead from remote file system via SMB
BFILERead via Oracle BFILENAME() function over Oracle Net
TRANSFERCopy from ASM via DBMS_FILE_TRANSFER, then read as BFILE

Reading via SSH

Parameters for SSH redo log access:

Parameter NameValueDescription
a2.storage.mediaSSHMandatory
a2.ssh.providersshj or maverickOptional, SSH library. Default - sshj
a2.ssh.hostnameStringMandatory, FQDN or IP of remote server
a2.ssh.portStringOptional, SSH port. Default - 22
a2.ssh.userStringMandatory, SSH username
a2.ssh.private.keyStringPath to private key file
a2.ssh.passwordStringPassword for authentication
a2.ssh.reconnect.msIntegerOptional, reconnect interval. Default - 86,400,000 (24h)
a2.ssh.strict.host.key.checkingBooleanOptional. Default - false
a2.ssh.max.unconfirmed.readsIntegerOptional, for sshj. Default - 256
a2.ssh.buffer.sizeIntegerOptional, for sshj. Default - 32768
important

You must specify either a2.ssh.private.key or a2.ssh.password.

Reading via BFILENAME()

Uses Oracle directory objects. Requires disabling Oracle Managed Files:

ALTER SYSTEM SET "_omf" = disabled SCOPE=SPFILE;

Create directory objects:

CREATE OR REPLACE DIRECTORY CDC_ONLINE AS '/oradata/online';
CREATE OR REPLACE DIRECTORY CDC_ARCHIVE AS '/oradata/archive';

For Amazon RDS for Oracle:

BEGIN
RDSADMIN.RDSADMIN_MASTER_UTIL.CREATE_ONLINELOG_DIR;
RDSADMIN.RDSADMIN_MASTER_UTIL.CREATE_ARCHIVELOG_DIR;
END;
/

Grant read permissions:

GRANT READ ON DIRECTORY CDC_ONLINE TO <username>;
GRANT READ ON DIRECTORY CDC_ARCHIVE TO <username>;

Parameters:

Parameter NameValueDescription
a2.storage.mediaBFILEMandatory
a2.bfile.directory.onlineStringMandatory, directory for online redo logs
a2.bfile.directory.archiveStringMandatory, directory for archived redo logs
a2.bfile.reconnect.msIntegerOptional. Default - 3,600,000 (1h)
a2.bfile.buffer.sizeIntegerOptional. Default - 4,194,304

Reading via SMB

Parameters for SMB redo log access:

Parameter NameValueDescription
a2.storage.mediaSMBMandatory
a2.smb.serverStringMandatory, FQDN or IP of SMB server
a2.smb.share.onlineStringMandatory, share with online redo logs
a2.smb.share.archiveStringMandatory, share with archived redo logs
a2.smb.userStringMandatory, SMB username
a2.smb.passwordStringMandatory, SMB password
a2.smb.domainStringMandatory, Windows domain
a2.smb.timeoutIntegerOptional. Default - 180,000
a2.smb.socket.timeoutIntegerOptional. Default - 180,000
a2.smb.reconnect.msIntegerOptional. Default - 86,400,000 (24h)
a2.smb.buffer.sizeIntegerOptional. Default - 1,048,576

Direct File Access

Default mode (FS). Assumes either the connector is on the database server or redo files are mounted locally. Not recommended for production.

Parameter NameValueDescription
a2.storage.mediaFSMandatory
a2.redo.filename.convertStringOptional, converts redo log path. Format: ORIGINAL_PATH=NEW_PATH

Direct ASM Access

Calls DBMS_DISKGROUP internal Oracle ASM API. Does not create/update/delete any ASM objects.

Parameter NameValueDescription
a2.storage.mediaASMMandatory
a2.asm.jdbc.urlStringMandatory, JDBC URL to ASM instance
a2.asm.usernameStringMandatory, must have SYSASM or SYSDBA
a2.asm.passwordStringMandatory
a2.asm.read.aheadBooleanOptional. Default - true
a2.asm.reconnect.msIntegerOptional. Default - 604,800,000 (1 week)
a2.asm.privilegesysasm or sysdbaOptional. Default - sysasm

Reading via DBMS_FILE_TRANSFER and BFILENAME

Extends the BFILENAME method. First perform the same BFILENAME setup, plus:

Create a stage directory:

CREATE OR REPLACE DIRECTORY CDC_ONLINE AS '+DATA/ASMTST/ONLINELOG';
CREATE OR REPLACE DIRECTORY CDC_ARCHIVE AS '+DATA/ASMTST/ARCHIVE';
CREATE OR REPLACE DIRECTORY CDC_STAGE AS '/var/tmp/stage';

Grant permissions:

GRANT READ ON DIRECTORY CDC_ONLINE TO <username>;
GRANT READ ON DIRECTORY CDC_ARCHIVE TO <username>;
GRANT READ,WRITE ON DIRECTORY CDC_STAGE TO <username>;
GRANT EXECUTE ON DBMS_FILE_TRANSFER TO <username>;
GRANT EXECUTE ON UTL_FILE TO <username>;

Additional parameters:

Parameter NameValueDescription
a2.storage.mediaTRANSFERMandatory
a2.transfer.directory.stageStringMandatory, stage directory on file system

In addition to the parameter above, all the a2.bfile.* parameters from the BFILE section are required.

Selecting Objects to Track

Parameter NameValueDescription
a2.includeStringComma separated list of tables to include. Supports % wildcard
a2.excludeStringComma separated list of tables to exclude. Supports % wildcard
a2.table.list.stylestatic or dynamicstatic (default): reads table list at startup. dynamic: builds list on the fly

Selecting Table Keys

Parameter NameValueDescription
a2.pk.typeStringwell_defined uses PK or NOT NULL unique key. any_unique allows nullable unique keys. Default: well_defined
a2.use.rowid.as.keyBooleanWhen true and no appropriate key exists, uses ROWID as surrogate key. Default: true

Override per table with a2.key.override:

  • NONE - no key fields
  • ROWID - use ROWID as key
  • INDEX(INDEX_NAME) - use specific index columns

Supplemental Logging Requirements

Parameter NameValueDescription
a2.supplemental.loggingALL or NONEDefault: ALL. When ALL, requires SUPPLEMENTAL LOG DATA(ALL) COLUMNS on tables and SUPPLEMENTAL LOG DATA at database level. When NONE, minimal requirements.
warning

Before setting a2.supplemental.logging=NONE, consult with A2 Solutions at oracle@a2.solutions.

Data Type Mapping

When supplemental.logging = ALL

Oracle TypeKafka Connect SchemaAdditional Information
DATEint32org.apache.kafka.connect.data.Date
TIMESTAMP%int64org.apache.kafka.connect.data.Timestamp
INTERVALYMbytessolutions.a2.cdc.oracle.data.OraIntervalYM
INTERVALDSbytessolutions.a2.cdc.oracle.data.OraIntervalDS
NUMBERint8NUMBER(1,0) & NUMBER(2,0)
NUMBERint16NUMBER(3,0) & NUMBER(4,0)
NUMBERint32NUMBER(5,0) through NUMBER(8,0)
NUMBERint64Other integers up to 10^18
NUMBERfloat64NUMBER without specified SCALE and PRECISION
NUMBERbytesorg.apache.kafka.connect.data.Decimal
BINARY_FLOATfloat32
BINARY_DOUBLEfloat64
RAWbytes
BLOBbytessolutions.a2.OraBlob
CHARstring
NCHARstring
VARCHAR2string
NVARCHAR2string
CLOBstringsolutions.a2.OraClob
NCLOBstringsolutions.a2.OraNClob
XMLTYPEstringsolutions.a2.OraXml
JSONstringsolutions.a2.OraJson
VECTORstructsolutions.a2.OraVector, contains 4 optional arrays
BOOLEANbool

When supplemental.logging = NONE

When a2.supplemental.logging is set to NONE, the same Oracle types are mapped but wrapped in solutions.a2.w.* schemas:

Oracle TypeKafka Connect SchemaAdditional Information
DATEint32solutions.a2.w.OraDate
TIMESTAMP%int64solutions.a2.w.OraTimestamp
INTERVALYMbytessolutions.a2.w.OraIntervalYM
INTERVALDSbytessolutions.a2.w.OraIntervalDS
NUMBERint8solutions.a2.w.OraNumber (NUMBER(1,0) & NUMBER(2,0))
NUMBERint16solutions.a2.w.OraNumber (NUMBER(3,0) & NUMBER(4,0))
NUMBERint32solutions.a2.w.OraNumber (NUMBER(5,0) through NUMBER(8,0))
NUMBERint64solutions.a2.w.OraNumber (Other integers up to 10^18)
NUMBERfloat64solutions.a2.w.OraNumber (without specified SCALE and PRECISION)
NUMBERbytessolutions.a2.w.OraNumber (Decimal)
BINARY_FLOATfloat32solutions.a2.w.OraBinaryFloat
BINARY_DOUBLEfloat64solutions.a2.w.OraBinaryDouble
RAWbytessolutions.a2.w.OraRaw
BLOBbytessolutions.a2.w.OraBlob
CHARstringsolutions.a2.w.OraChar
NCHARstringsolutions.a2.w.OraNChar
VARCHAR2stringsolutions.a2.w.OraVarchar2
NVARCHAR2stringsolutions.a2.w.OraNVarchar2
CLOBstringsolutions.a2.w.OraClob
NCLOBstringsolutions.a2.w.OraNClob
XMLTYPEstringsolutions.a2.w.OraXml
JSONstringsolutions.a2.w.OraJson
VECTORstructsolutions.a2.w.OraVector, contains 4 optional arrays
BOOLEANboolsolutions.a2.w.OraBoolean

Overriding Data Type Mappings

The parameter a2.number.map.[PDB_NAME.]SCHEMA_NAME.TABLE_NAME.COL_NAME_OR_PATTERN overrides the default NUMBER type mapping. The column name supports the % wildcard.

Supported values:

ValueKafka Connect Schema
BOOL / BOOLEANbool
BYTE / TINYINTint8
SHORT / SMALLINTint16
INT / INTEGERint32
LONG / BIGINTint64
FLOATfloat32
DOUBLEfloat64
DECIMAL([P],S)bytes (Decimal)
NUMERIC([P],S)bytes (Decimal)

Before and After Values

By default, the connector sends only after-data for INSERT/UPDATE and before-data for DELETE. To send both before and after values, set a2.schema.type=debezium.

To filter no-change UPDATEs (e.g., UPDATE DEPT SET DNAME=DNAME WHERE DEPTNO=10), set a2.process.all.update.statements=false.

Selecting a Kafka Topic

Set a2.topic.mapper to control topic assignment. Two built-in mappers are available:

OraCdcDefaultTopicNameMapper (default)

Generates a unique topic name per table using PDB, schema, and table name.

Parameter NameValueDescription
a2.topic.prefixStringPrefix for topic name
a2.topic.name.styleTABLE, SCHEMA_TABLE, PDB_SCHEMA_TABLEDefault: TABLE
a2.topic.name.delimiter_ or - or .Default: _

OraCdcSingleTopicNameMapper

All change events go to a single topic set by a2.kafka.topic.

TDE Encryption Support

For Oracle Transparent Data Encryption (column encryption):

Parameter NameValueDescription
a2.tde.wallet.pathStringFull path to ewallet.p12
a2.tde.wallet.passwordStringWallet password
note

Currently supports column encryption only; tablespace encryption is planned for CY2026.