-
Bug
-
Resolution: Done
-
Critical
-
2.0.0.Alpha2
-
None
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
Lob-type data is inconsistent between source and sink, after modifying the primary key
What Debezium connector do you use and what version?
oracle-connector
Â
What is the captured database version and mode of depoyment?
(E.g. on-premises, with a specific cloud provider, etc.)
oracle_19c
What behaviour do you see?
After modifying the primary key in source DB, i got the  "__debezium_unavailable_value" from lob-type column in sink db
Do you see the same behaviour using the latest relesead Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
V2.0,V1.9
Â
How to reproduce the issue using our tutorial deployment?
just modifying  the primary key in source DB
Â
Implementation ideas (optional)
the logic is flawed in  the function named "emitUpdateRecord"  declared in  RelationalChangeRecordEmitter.java
###########PK update -> emit as delete and re-insert with new key
@Override protected void emitUpdateRecord(Receiver<P> receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues); Struct newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { LOGGER.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } // some configurations does not provide old values in case of updates // in this case we handle all updates as regular ones if (oldKey == null || Objects.equals(oldKey, newKey)) { Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null); } // PK update -> emit as delete and re-insert with new key else { ConnectHeaders headers = new ConnectHeaders(); headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema()); Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers); headers = new ConnectHeaders(); headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema()); envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers); } }
This will change the update-event to two new events which include delete & create event.
The create event caused the error.