feat: Enable CDC checkpointing on Postgres (#24373)
* Fix the error reading offset file meanwhile Debezium is writing. Enable CDC checkpointing to Postgres. Minor change in the variable name to fit the type. * Add final statement on exception ;) * Add comments to CDC Checkpoint tests. Clean a bit. * Bump connector versioning * Add log message * Fix changelog * auto-bump connector version * Manually generate definitions --------- Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=2.0.11
|
||||
LABEL io.airbyte.version=2.0.12
|
||||
LABEL io.airbyte.name=airbyte/source-postgres
|
||||
|
||||
@@ -105,4 +105,8 @@ public class PostgresCdcStateHandler implements CdcStateHandler {
|
||||
return Integer.parseInt(lsnA) == Integer.parseInt(lsnB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCdcCheckpointEnabled() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,6 @@ import javax.sql.DataSource;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
@@ -548,9 +547,8 @@ public class CdcPostgresSourceTest extends CdcSourceTest {
|
||||
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert the
|
||||
* exact number of messages sent as depends on Debezium.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception Exception happening in the test.
|
||||
*/
|
||||
/* TODO: Re-enable when connector allows CDC checkpointing
|
||||
@Test
|
||||
protected void verifyCheckpointStatesByRecords() throws Exception {
|
||||
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
|
||||
@@ -582,19 +580,16 @@ public class CdcPostgresSourceTest extends CdcSourceTest {
|
||||
.toListAndClose(secondBatchIterator);
|
||||
|
||||
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
|
||||
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
|
||||
assertTrue(stateMessagesCDC.size() > 1);
|
||||
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count());
|
||||
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
|
||||
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
|
||||
}
|
||||
*/
|
||||
|
||||
/** This test verify that multiple states are sent during the CDC process based on time ranges. We can
|
||||
* ensure that more than one `STATE` type of message is sent, but we are not able to assert the exact
|
||||
* number of messages sent as depends on Debezium.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception Exception happening in the test.
|
||||
*/
|
||||
/* TODO: Re-enable when connector allows CDC checkpointing
|
||||
@Test
|
||||
protected void verifyCheckpointStatesBySeconds() throws Exception {
|
||||
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
|
||||
@@ -626,9 +621,7 @@ public class CdcPostgresSourceTest extends CdcSourceTest {
|
||||
.toListAndClose(secondBatchIterator);
|
||||
|
||||
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
|
||||
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
|
||||
assertTrue(stateMessagesCDC.size() > 1);
|
||||
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count());
|
||||
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
|
||||
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user