From 4cb04269abd31e496aed98c461bf6b7369e4eb61 Mon Sep 17 00:00:00 2001 From: Nero Date: Sun, 7 Jun 2026 22:55:57 +0800 Subject: [PATCH] [lake] Stabilize Paimon tiering IT --- .../testutils/FlinkPaimonTieringTestBase.java | 71 ++++++++------ .../paimon/tiering/PaimonTieringITCase.java | 95 ++++++++++++++----- 2 files changed, 113 insertions(+), 53 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 90ab136fde..10dbbe9e50 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -64,7 +64,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -450,13 +449,25 @@ protected void waitUntilBucketSynced(TableBucket tb) { protected void checkDataInPaimonPrimaryKeyTable( TablePath tablePath, List expectedRows) throws Exception { - Iterator paimonRowIterator = - getPaimonRowCloseableIterator(tablePath); - for (InternalRow expectedRow : expectedRows) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0)); - assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString()); - } + retry( + Duration.ofMinutes(1), + () -> { + try (CloseableIterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath)) { + Map actualRows = new HashMap<>(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + actualRows.put(row.getInt(0), row.getString(1).toString()); + } + + Map expectedRowsByKey = new HashMap<>(); + for (InternalRow expectedRow : expectedRows) { + expectedRowsByKey.put( + expectedRow.getInt(0), expectedRow.getString(1).toString()); + } + assertThat(actualRows).isEqualTo(expectedRowsByKey); + } + }); } protected CloseableIterator getPaimonRowCloseableIterator( @@ -475,24 +486,30 @@ protected CloseableIterator getPaimonRowClos protected void checkFlussOffsetsInSnapshot( TablePath tablePath, Map expectedOffsets) throws Exception { - FileStoreTable table = - (FileStoreTable) - getPaimonCatalog() - .getTable( - Identifier.create( - tablePath.getDatabaseName(), - tablePath.getTableName())); - Snapshot snapshot = table.snapshotManager().latestSnapshot(); - assertThat(snapshot).isNotNull(); - - String offsetFile = snapshot.properties().get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); - Map recordedOffsets = - new LakeTable( - new LakeTable.LakeSnapshotMetadata( - // don't care about snapshot id - -1, new FsPath(offsetFile), null)) - .getOrReadLatestTableSnapshot() - .getBucketLogEndOffset(); - assertThat(recordedOffsets).isEqualTo(expectedOffsets); + retry( + Duration.ofMinutes(1), + () -> { + FileStoreTable table = + (FileStoreTable) + getPaimonCatalog() + .getTable( + Identifier.create( + tablePath.getDatabaseName(), + tablePath.getTableName())); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot).isNotNull(); + + String offsetFile = + snapshot.properties().get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); + assertThat(offsetFile).isNotNull(); + Map recordedOffsets = + new LakeTable( + new LakeTable.LakeSnapshotMetadata( + // don't care about snapshot id + -1, new FsPath(offsetFile), null)) + .getOrReadLatestTableSnapshot() + .getBucketLogEndOffset(); + assertThat(recordedOffsets).isEqualTo(expectedOffsets); + }); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 3d4da4fe50..07596636c5 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -68,6 +68,7 @@ import java.util.stream.Stream; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; /** IT case for tiering tables to paimon. */ @@ -496,19 +497,39 @@ private Tuple2 createPartitionedTable( private void checkDataInPaimonAppendOnlyTable( TablePath tablePath, List expectedRows, long startingOffset) throws Exception { - Iterator paimonRowIterator = - getPaimonRowCloseableIterator(tablePath); - Iterator flussRowIterator = expectedRows.iterator(); - while (paimonRowIterator.hasNext()) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - InternalRow flussRow = flussRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); - assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); - // system columns are always the last three: __bucket, __offset, __timestamp - int offsetIndex = row.getFieldCount() - 2; - assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++); - } - assertThat(flussRowIterator.hasNext()).isFalse(); + retry( + Duration.ofMinutes(1), + () -> { + try (CloseableIterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath)) { + List actualRows = new ArrayList<>(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + // system columns are always the last three: + // __bucket, __offset, __timestamp + int offsetIndex = row.getFieldCount() - 2; + actualRows.add( + row.getInt(0) + + "|" + + row.getString(1).toString() + + "|" + + row.getLong(offsetIndex)); + } + + List expectedPaimonRows = new ArrayList<>(); + long offset = startingOffset; + for (InternalRow flussRow : expectedRows) { + expectedPaimonRows.add( + flussRow.getInt(0) + + "|" + + flussRow.getString(1).toString() + + "|" + + offset++); + } + assertThat(actualRows) + .containsExactlyInAnyOrderElementsOf(expectedPaimonRows); + } + }); } private void checkDataInPaimonAppendOnlyPartitionedTable( @@ -517,19 +538,41 @@ private void checkDataInPaimonAppendOnlyPartitionedTable( List expectedRows, long startingOffset) throws Exception { - Iterator paimonRowIterator = - getPaimonRowCloseableIterator(tablePath, partitionSpec); - Iterator flussRowIterator = expectedRows.iterator(); - while (paimonRowIterator.hasNext()) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - InternalRow flussRow = flussRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); - assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); - assertThat(row.getString(2).toString()).isEqualTo(flussRow.getString(2).toString()); - // the idx 3 is __bucket, so use 4 - assertThat(row.getLong(4)).isEqualTo(startingOffset++); - } - assertThat(flussRowIterator.hasNext()).isFalse(); + retry( + Duration.ofMinutes(1), + () -> { + try (CloseableIterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath, partitionSpec)) { + List actualRows = new ArrayList<>(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + // the idx 3 is __bucket, so use 4 + actualRows.add( + row.getInt(0) + + "|" + + row.getString(1).toString() + + "|" + + row.getString(2).toString() + + "|" + + row.getLong(4)); + } + + List expectedPaimonRows = new ArrayList<>(); + long offset = startingOffset; + for (InternalRow flussRow : expectedRows) { + expectedPaimonRows.add( + flussRow.getInt(0) + + "|" + + flussRow.getString(1).toString() + + "|" + + flussRow.getString(2).toString() + + "|" + + offset++); + } + assertThat(actualRows) + .containsExactlyInAnyOrderElementsOf(expectedPaimonRows); + } + }); } private CloseableIterator getPaimonRowCloseableIterator(