JDBC Sink Connectors
oracdc includes two sink connectors optimized for delivering data from Kafka topics to PostgreSQL or Oracle Database.
JdbcSinkConnector
Class: solutions.a2.kafka.sink.JdbcSinkConnector
The primary sink connector for delivering CDC data to a target database. Use this connector when the source connector is configured with a2.supplemental.logging=ALL (the default).
Connection Parameters
| Parameter | Type | Description |
|---|---|---|
a2.jdbc.url | String | JDBC connection URL for the target database |
a2.jdbc.username | String | JDBC connection username |
a2.jdbc.password | String | JDBC connection password |
a2.wallet.location | String | Oracle Wallet location (alternative to username/password) |
a2.connection.init.sql | String | SQL statement(s) executed for all new connections |
Table Mapping Parameters
| Parameter | Type | Description |
|---|---|---|
a2.autocreate | Boolean | When true, creates missing tables automatically. Default: false |
a2.table.mapper | String | Class for mapping topic names to table names |
a2.table.name.prefix | String | Prefix added to target table names |
a2.table.name.suffix | String | Suffix added to target table names |
Data Processing Parameters
| Parameter | Type | Description |
|---|---|---|
a2.batch.size | Integer | Maximum number of rows per batch. Default: 1000 |
a2.pk.string.length | Integer | String length for primary key columns |
Schema Evolution
The sink connector supports schema evolution, automatically adapting to changes in the source schema. When new columns are added to the source table, the sink connector can add corresponding columns to the target table (when a2.autocreate=true).
Example Configuration
name=oracdc-sink
connector.class=solutions.a2.kafka.sink.JdbcSinkConnector
tasks.max=1
topics=HR_EMPLOYEES,HR_DEPARTMENTS
a2.jdbc.url=jdbc:postgresql://localhost:5432/targetdb
a2.jdbc.username=replicator
a2.jdbc.password=secret
a2.autocreate=true
a2.batch.size=500
WrappedDataJdbcSinkConnector
Class: solutions.a2.kafka.sink.WrappedDataJdbcSinkConnector
Used when consuming data produced by oracdc with a2.supplemental.logging=NONE. In this mode, the source connector wraps Oracle-native data types in solutions.a2.w.* schemas. The WrappedDataJdbcSinkConnector unwraps these types for correct delivery to the target database.
Accepts the same parameters as JdbcSinkConnector.
- JdbcSinkConnector — when the source uses
a2.supplemental.logging=ALL(default, recommended) - WrappedDataJdbcSinkConnector — when the source uses
a2.supplemental.logging=NONE