RedoMiner Connector
solutions.a2.cdc.oracle.OraCdcRedoMinerConnector reads changes from redo log files directly without using any additional software installed on the database server or in the Oracle database itself.
Oracle Database Connections
The connector requires a connection to an Oracle database for:
- Reading information about table column data types and table key fields
- Reading the current database SCN and the topology of archived and online redo files
| Parameter Name | Type | Description |
|---|---|---|
a2.jdbc.url | String | Oracle Database JDBC URL |
a2.jdbc.username | String | JDBC connection username |
a2.jdbc.password | String | JDBC connection password |
Selecting the Operating Mode
Online Redo Log Processing
With default settings, the connector processes only archived redo log files. To process online redo logs:
| Parameter Name | Value | Description |
|---|---|---|
a2.process.online.redo.logs | true | Mandatory setting |
a2.scn.query.interval.ms | Integer | Optional, minimum time in ms to determine current SCN. Default - 60,000 |
Memory Management
Supports both On-Heap and Off-Heap memory (off-heap is default). Set a2.transaction.implementation:
- ChronicleQueue (default) - uses Chronicle Queue for off-heap memory
- ArrayList - uses ArrayList for heap memory
For ArrayList, set initial capacity with:
| Parameter Name | Value | Description |
|---|---|---|
a2.array.list.default.capacity | Integer | Optional, initial capacity. Default - 32 |
Large Objects Processing
By default, large objects (BLOB, CLOB, NCLOB, extended RAW/VARCHAR2/NVARCHAR2, XMLTYPE, JSON, VECTOR) are not processed. To enable:
| Parameter Name | Value | Description |
|---|---|---|
a2.transaction.implementation | ChronicleQueue | Mandatory setting |
a2.process.lobs | true | Mandatory setting |
Set Apache Kafka parameters for large object sizes:
- Source connector:
producer.max.request.size - Kafka broker/controller:
replica.fetch.max.bytesandmessage.max.bytes
Oracle RAC
To work with Oracle RAC, set a2.use.rac=true. The connector detects RAC by querying V$ACTIVE_INSTANCES and starts a separate Kafka Connect task for each redo thread/RAC instance. Partition mapping: <KAFKA_PARTITION_NUMBER> = <THREAD#> - 1
Using a Physical Standby Database
To avoid load on the primary, use a standby database (MOUNTED, READ ONLY, or READ ONLY WITH APPLY). Required parameters:
| Parameter Name | Value | Description |
|---|---|---|
a2.standby.activate | true | Mandatory |
a2.standby.jdbc.url | String | JDBC URL to Physical Standby |
a2.standby.wallet.location | String | Oracle Wallet location for standby |
a2.standby.privilege | Default: sysdg | Can be sysdg, sysbackup, or sysdba |
Using asynchronous redo transport (LogXptMode=ASYNC / Maximum Performance) makes latency unpredictable. Using FASTSYNC (Maximum Availability) or SYNC (Maximum Protection) minimizes latency.
Methods for Reading Redo Log Files
The a2.storage.media parameter controls the reading method:
| Value | Description |
|---|---|
FS | Default. Read from local file system |
ASM | Read from Oracle ASM |
SSH | Read from remote file system via SSH |
SMB | Read from remote file system via SMB |
BFILE | Read via Oracle BFILENAME() function over Oracle Net |
TRANSFER | Copy from ASM via DBMS_FILE_TRANSFER, then read as BFILE |
Reading via SSH
Parameters for SSH redo log access:
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | SSH | Mandatory |
a2.ssh.provider | sshj or maverick | Optional, SSH library. Default - sshj |
a2.ssh.hostname | String | Mandatory, FQDN or IP of remote server |
a2.ssh.port | String | Optional, SSH port. Default - 22 |
a2.ssh.user | String | Mandatory, SSH username |
a2.ssh.private.key | String | Path to private key file |
a2.ssh.password | String | Password for authentication |
a2.ssh.reconnect.ms | Integer | Optional, reconnect interval. Default - 86,400,000 (24h) |
a2.ssh.strict.host.key.checking | Boolean | Optional. Default - false |
a2.ssh.max.unconfirmed.reads | Integer | Optional, for sshj. Default - 256 |
a2.ssh.buffer.size | Integer | Optional, for sshj. Default - 32768 |
You must specify either a2.ssh.private.key or a2.ssh.password.
Reading via BFILENAME()
Uses Oracle directory objects. Requires disabling Oracle Managed Files:
ALTER SYSTEM SET "_omf" = disabled SCOPE=SPFILE;
Create directory objects:
CREATE OR REPLACE DIRECTORY CDC_ONLINE AS '/oradata/online';
CREATE OR REPLACE DIRECTORY CDC_ARCHIVE AS '/oradata/archive';
For Amazon RDS for Oracle:
BEGIN
RDSADMIN.RDSADMIN_MASTER_UTIL.CREATE_ONLINELOG_DIR;
RDSADMIN.RDSADMIN_MASTER_UTIL.CREATE_ARCHIVELOG_DIR;
END;
/
Grant read permissions:
GRANT READ ON DIRECTORY CDC_ONLINE TO <username>;
GRANT READ ON DIRECTORY CDC_ARCHIVE TO <username>;
Parameters:
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | BFILE | Mandatory |
a2.bfile.directory.online | String | Mandatory, directory for online redo logs |
a2.bfile.directory.archive | String | Mandatory, directory for archived redo logs |
a2.bfile.reconnect.ms | Integer | Optional. Default - 3,600,000 (1h) |
a2.bfile.buffer.size | Integer | Optional. Default - 4,194,304 |
Reading via SMB
Parameters for SMB redo log access:
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | SMB | Mandatory |
a2.smb.server | String | Mandatory, FQDN or IP of SMB server |
a2.smb.share.online | String | Mandatory, share with online redo logs |
a2.smb.share.archive | String | Mandatory, share with archived redo logs |
a2.smb.user | String | Mandatory, SMB username |
a2.smb.password | String | Mandatory, SMB password |
a2.smb.domain | String | Mandatory, Windows domain |
a2.smb.timeout | Integer | Optional. Default - 180,000 |
a2.smb.socket.timeout | Integer | Optional. Default - 180,000 |
a2.smb.reconnect.ms | Integer | Optional. Default - 86,400,000 (24h) |
a2.smb.buffer.size | Integer | Optional. Default - 1,048,576 |
Direct File Access
Default mode (FS). Assumes either the connector is on the database server or redo files are mounted locally. Not recommended for production.
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | FS | Mandatory |
a2.redo.filename.convert | String | Optional, converts redo log path. Format: ORIGINAL_PATH=NEW_PATH |
Direct ASM Access
Calls DBMS_DISKGROUP internal Oracle ASM API. Does not create/update/delete any ASM objects.
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | ASM | Mandatory |
a2.asm.jdbc.url | String | Mandatory, JDBC URL to ASM instance |
a2.asm.username | String | Mandatory, must have SYSASM or SYSDBA |
a2.asm.password | String | Mandatory |
a2.asm.read.ahead | Boolean | Optional. Default - true |
a2.asm.reconnect.ms | Integer | Optional. Default - 604,800,000 (1 week) |
a2.asm.privilege | sysasm or sysdba | Optional. Default - sysasm |
Reading via DBMS_FILE_TRANSFER and BFILENAME
Extends the BFILENAME method. First perform the same BFILENAME setup, plus:
Create a stage directory:
CREATE OR REPLACE DIRECTORY CDC_ONLINE AS '+DATA/ASMTST/ONLINELOG';
CREATE OR REPLACE DIRECTORY CDC_ARCHIVE AS '+DATA/ASMTST/ARCHIVE';
CREATE OR REPLACE DIRECTORY CDC_STAGE AS '/var/tmp/stage';
Grant permissions:
GRANT READ ON DIRECTORY CDC_ONLINE TO <username>;
GRANT READ ON DIRECTORY CDC_ARCHIVE TO <username>;
GRANT READ,WRITE ON DIRECTORY CDC_STAGE TO <username>;
GRANT EXECUTE ON DBMS_FILE_TRANSFER TO <username>;
GRANT EXECUTE ON UTL_FILE TO <username>;
Additional parameters:
| Parameter Name | Value | Description |
|---|---|---|
a2.storage.media | TRANSFER | Mandatory |
a2.transfer.directory.stage | String | Mandatory, stage directory on file system |
In addition to the parameter above, all the a2.bfile.* parameters from the BFILE section are required.
Selecting Objects to Track
| Parameter Name | Value | Description |
|---|---|---|
a2.include | String | Comma separated list of tables to include. Supports % wildcard |
a2.exclude | String | Comma separated list of tables to exclude. Supports % wildcard |
a2.table.list.style | static or dynamic | static (default): reads table list at startup. dynamic: builds list on the fly |
Selecting Table Keys
| Parameter Name | Value | Description |
|---|---|---|
a2.pk.type | String | well_defined uses PK or NOT NULL unique key. any_unique allows nullable unique keys. Default: well_defined |
a2.use.rowid.as.key | Boolean | When true and no appropriate key exists, uses ROWID as surrogate key. Default: true |
Override per table with a2.key.override:
NONE- no key fieldsROWID- use ROWID as keyINDEX(INDEX_NAME)- use specific index columns
Supplemental Logging Requirements
| Parameter Name | Value | Description |
|---|---|---|
a2.supplemental.logging | ALL or NONE | Default: ALL. When ALL, requires SUPPLEMENTAL LOG DATA(ALL) COLUMNS on tables and SUPPLEMENTAL LOG DATA at database level. When NONE, minimal requirements. |
Before setting a2.supplemental.logging=NONE, consult with A2 Solutions at oracle@a2.solutions.
Data Type Mapping
When supplemental.logging = ALL
| Oracle Type | Kafka Connect Schema | Additional Information |
|---|---|---|
DATE | int32 | org.apache.kafka.connect.data.Date |
TIMESTAMP% | int64 | org.apache.kafka.connect.data.Timestamp |
INTERVALYM | bytes | solutions.a2.cdc.oracle.data.OraIntervalYM |
INTERVALDS | bytes | solutions.a2.cdc.oracle.data.OraIntervalDS |
NUMBER | int8 | NUMBER(1,0) & NUMBER(2,0) |
NUMBER | int16 | NUMBER(3,0) & NUMBER(4,0) |
NUMBER | int32 | NUMBER(5,0) through NUMBER(8,0) |
NUMBER | int64 | Other integers up to 10^18 |
NUMBER | float64 | NUMBER without specified SCALE and PRECISION |
NUMBER | bytes | org.apache.kafka.connect.data.Decimal |
BINARY_FLOAT | float32 | |
BINARY_DOUBLE | float64 | |
RAW | bytes | |
BLOB | bytes | solutions.a2.OraBlob |
CHAR | string | |
NCHAR | string | |
VARCHAR2 | string | |
NVARCHAR2 | string | |
CLOB | string | solutions.a2.OraClob |
NCLOB | string | solutions.a2.OraNClob |
XMLTYPE | string | solutions.a2.OraXml |
JSON | string | solutions.a2.OraJson |
VECTOR | struct | solutions.a2.OraVector, contains 4 optional arrays |
BOOLEAN | bool |
When supplemental.logging = NONE
When a2.supplemental.logging is set to NONE, the same Oracle types are mapped but wrapped in solutions.a2.w.* schemas:
| Oracle Type | Kafka Connect Schema | Additional Information |
|---|---|---|
DATE | int32 | solutions.a2.w.OraDate |
TIMESTAMP% | int64 | solutions.a2.w.OraTimestamp |
INTERVALYM | bytes | solutions.a2.w.OraIntervalYM |
INTERVALDS | bytes | solutions.a2.w.OraIntervalDS |
NUMBER | int8 | solutions.a2.w.OraNumber (NUMBER(1,0) & NUMBER(2,0)) |
NUMBER | int16 | solutions.a2.w.OraNumber (NUMBER(3,0) & NUMBER(4,0)) |
NUMBER | int32 | solutions.a2.w.OraNumber (NUMBER(5,0) through NUMBER(8,0)) |
NUMBER | int64 | solutions.a2.w.OraNumber (Other integers up to 10^18) |
NUMBER | float64 | solutions.a2.w.OraNumber (without specified SCALE and PRECISION) |
NUMBER | bytes | solutions.a2.w.OraNumber (Decimal) |
BINARY_FLOAT | float32 | solutions.a2.w.OraBinaryFloat |
BINARY_DOUBLE | float64 | solutions.a2.w.OraBinaryDouble |
RAW | bytes | solutions.a2.w.OraRaw |
BLOB | bytes | solutions.a2.w.OraBlob |
CHAR | string | solutions.a2.w.OraChar |
NCHAR | string | solutions.a2.w.OraNChar |
VARCHAR2 | string | solutions.a2.w.OraVarchar2 |
NVARCHAR2 | string | solutions.a2.w.OraNVarchar2 |
CLOB | string | solutions.a2.w.OraClob |
NCLOB | string | solutions.a2.w.OraNClob |
XMLTYPE | string | solutions.a2.w.OraXml |
JSON | string | solutions.a2.w.OraJson |
VECTOR | struct | solutions.a2.w.OraVector, contains 4 optional arrays |
BOOLEAN | bool | solutions.a2.w.OraBoolean |
Overriding Data Type Mappings
The parameter a2.number.map.[PDB_NAME.]SCHEMA_NAME.TABLE_NAME.COL_NAME_OR_PATTERN overrides the default NUMBER type mapping. The column name supports the % wildcard.
Supported values:
| Value | Kafka Connect Schema |
|---|---|
BOOL / BOOLEAN | bool |
BYTE / TINYINT | int8 |
SHORT / SMALLINT | int16 |
INT / INTEGER | int32 |
LONG / BIGINT | int64 |
FLOAT | float32 |
DOUBLE | float64 |
DECIMAL([P],S) | bytes (Decimal) |
NUMERIC([P],S) | bytes (Decimal) |
Before and After Values
By default, the connector sends only after-data for INSERT/UPDATE and before-data for DELETE. To send both before and after values, set a2.schema.type=debezium.
To filter no-change UPDATEs (e.g., UPDATE DEPT SET DNAME=DNAME WHERE DEPTNO=10), set a2.process.all.update.statements=false.
Selecting a Kafka Topic
Set a2.topic.mapper to control topic assignment. Two built-in mappers are available:
OraCdcDefaultTopicNameMapper (default)
Generates a unique topic name per table using PDB, schema, and table name.
| Parameter Name | Value | Description |
|---|---|---|
a2.topic.prefix | String | Prefix for topic name |
a2.topic.name.style | TABLE, SCHEMA_TABLE, PDB_SCHEMA_TABLE | Default: TABLE |
a2.topic.name.delimiter | _ or - or . | Default: _ |
OraCdcSingleTopicNameMapper
All change events go to a single topic set by a2.kafka.topic.
TDE Encryption Support
For Oracle Transparent Data Encryption (column encryption):
| Parameter Name | Value | Description |
|---|---|---|
a2.tde.wallet.path | String | Full path to ewallet.p12 |
a2.tde.wallet.password | String | Wallet password |
Currently supports column encryption only; tablespace encryption is planned for CY2026.