Skip to content

[flink] Support stream read Chain Table#8262

Open
yunfengzhou-hub wants to merge 2 commits into
apache:masterfrom
yunfengzhou-hub:chain-table-streaming
Open

[flink] Support stream read Chain Table#8262
yunfengzhou-hub wants to merge 2 commits into
apache:masterfrom
yunfengzhou-hub:chain-table-streaming

Conversation

@yunfengzhou-hub

Copy link
Copy Markdown
Contributor

Purpose

Chain Table (chain-table.enabled=true) separates data into a snapshot branch (batch-imported full partitions) and a delta branch (incremental updates). Prior to this change, streaming read was not supported because the standard DataTableStreamScan is unaware of the two-branch architecture.

This PR introduces ChainTableFileStoreTable (a wrapper over FallbackReadFileStoreTable) and ChainTableStreamScan which implements a two-phase streaming scan: Phase 1 does a full load by reading delta data pinned to the current snapshot and merging snapshot files for overlapping partitions; Phase 2 incrementally monitors the delta branch only, returning DataSplit(isStreaming=true) for changelog passthrough. The snapshot-pinning strategy makes the Phase 1 / Phase 2 boundary deterministic — no overlap or data loss regardless of concurrent commits.

Tests

Added FlinkChainTableITCase with 16 tests (all passing, ~75s):

  • Full load with snapshot+delta overlap, empty delta, empty snapshot
  • Changelog passthrough (-U/+U) with changelog-producer=input
  • Snapshot OVERWRITE does not trigger streaming output
  • Stateless restart and stateful restart (MiniCluster checkpoint/restore)
  • WHERE predicate forwarding, withShard forwarding
  • scan.mode=latest bypass, changelog-producer=none rejection
  • restore(id, scanAll=true) and restore(null, scanAll=true) state reset
  • chain-partition-keys group partition streaming

@yunfengzhou-hub yunfengzhou-hub force-pushed the chain-table-streaming branch 2 times, most recently from 49a4f20 to f2dc523 Compare June 17, 2026 14:50
@yunfengzhou-hub yunfengzhou-hub marked this pull request as ready for review June 18, 2026 01:42
@yunfengzhou-hub yunfengzhou-hub changed the title [POC][flink] Support stream read Chain Table [flink] Support stream read Chain Table Jun 18, 2026
@Nullable
@Override
public Long checkpoint() {
return nextDeltaSnapshotId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocking] checkpoint() returns a stale snapshot id in Phase 2, causing duplicate consumption after recovery.

checkpoint() returns nextDeltaSnapshotId, which is set only once in captureDeltaPosition() (the Phase-1 boundary, latestId + 1). In Phase 2, plan() delegates to deltaStreamScan.plan(), which advances the delta scan's internal cursor, but nextDeltaSnapshotId is never written back — so checkpoint() is frozen at the boundary.

The Flink enumerator persists scan.checkpoint() on every checkpoint (ContinuousFileSplitEnumerator#snapshotState). So during Phase 2 it keeps storing the boundary value; on failure, restore(boundary) makes the delta stream re-read every snapshot consumed since Phase 1 → large-scale duplicates.

Note that watermark() and startingContext() already delegate to deltaStreamScan when startingDonecheckpoint() is the one that was missed.

Secondary effect: because checkpoint() never changes, ContinuousFileSplitEnumerator#scanNextSnapshot never increments handledSnapshotCount, so scan.max-snapshot-count backpressure is silently defeated (the splitMaxNum guard still applies). The same fix resolves this.

Suggested fix:

@Nullable
@Override
public Long checkpoint() {
    if (startingDone) {
        return deltaStreamScan.checkpoint();
    }
    return nextDeltaSnapshotId;
}

* Converts a {@link DataSplit} to a {@link ChainSplit} where all files belong to the given
* branch. The partition value is preserved as-is (no rewriting).
*/
private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, String branch) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Maintainability] ChainSplit construction is duplicated in three places.

The per-file loop that builds fileBranchMapping + fileBucketPathMapping and then calls new ChainSplit(...) appears here in dataSplitToChainSplit(), and twice in ChainGroupReadTable (plan() around L249 and around L408). Any future change to how a ChainSplit is built (e.g. adding a field) has to be applied in all three; missing one would silently diverge the read/write paths.

Suggest extracting a single factory, e.g. ChainSplit.from(DataSplit dataSplit, String branch), and using it everywhere.


// 2. Read all snapshot branch data, grouped by partition.
// Reuse batchScan.mainScan which has predicates/shard already applied.
Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Performance] planStarting() reads file-level splits for every snapshot partition, then keeps only the latest per group.

groupByPartition(batchScan.mainScan) runs a full batch scan that reads the manifests and file lists of all snapshot-branch partitions, but only the latest chain partition per group is kept (steps 3-4). With many historical full-dump partitions (e.g. a daily ODS dump over a year), this reads hundreds of partitions' metadata to keep one — slow startup and heavy manifest I/O (especially on object stores).

The batch path already does the cheaper thing: ChainGroupReadTable.plan() uses newChainPartitionListingScan(...).listPartitions() (partition metadata only) to locate partitions before scanning files. Suggest the same here: list partitions first, pick the latest per group, then withPartitionFilter(...) to scan only that partition. The pinned-delta scan (L166) has the same pattern.

}

@Override
public void restore(@Nullable Long nextSnapshotId) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] restore(null) does not reset startingDone, so it cannot actually re-run Phase 1.

When nextSnapshotId == null the if body is skipped, leaving startingDone unchanged. On an instance that already finished Phase 1 (startingDone == true), a subsequent restore(null) + plan() enters Phase 2 (deltaStreamScan.plan()) instead of re-running planStarting().

In the Flink runtime this is harmless because restore is always called on a fresh scan instance (startingDone defaults to false). But it contradicts the intent of testStreamingReadRestoreAfterNewData, whose comment states "restore(null) re-runs Phase 1". Suggest resetting startingDone = false here so the documented semantics hold.

if (bucketDir.startsWith("bucket-")) {
try {
bucketId = Integer.parseInt(bucketDir.substring("bucket-".length()));
} catch (NumberFormatException ignored) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] Silent catch hides bucket-id parse failures.

catch (NumberFormatException ignored) {} swallows the failure and falls back to bucketId = 0, which would route all affected splits to the same reader subtask (skew) with no trace. The bucket-{N} layout is a stable convention so this shouldn't happen in practice, but a LOG.warn(...) here would make any future regression diagnosable rather than silent.

*/
@Test
@Timeout(180)
public void testStreamingReadChainTableStatefulRestart() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Test gap] This stateful-restart test doesn't exercise the checkpoint/restore path that actually regresses.

The checkpoint here is triggered before Phase 2 has consumed any new delta snapshot (no delta is written between the Phase-1 write at L1009 and the checkpoint at L1024). So the frozen nextDeltaSnapshotId happens to equal the real delta cursor, and the "no duplicates" assertion (L1088) passes even if checkpoint() is broken.

The checkpoint() regression (see comment on ChainTableStreamScan#checkpoint) only manifests when Phase 2 has already advanced past the boundary. Suggest adding a case: after Phase 1, write delta and let Phase 2 consume ≥1 snapshot, then checkpoint → cancel → restart, and assert no duplicates.

JobClient jobClient = tableResult.getJobClient().get();

// Wait for data to flow to the sink
Thread.sleep(5000);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Test, minor] Timing-based synchronization is flaky-prone, and debug prints are left in.

This class uses ~19 Thread.sleep(2000-5000) calls to synchronize with the streaming job, which tends to be flaky under CI load; there are also a few leftover System.err.println("[TEST] ...") debug statements. Consider polling for a condition instead of fixed sleeps, and removing the prints.

@yunfengzhou-hub yunfengzhou-hub force-pushed the chain-table-streaming branch from f2dc523 to 3e00ac2 Compare June 18, 2026 11:48
@yunfengzhou-hub yunfengzhou-hub force-pushed the chain-table-streaming branch from 3e00ac2 to 6d67d97 Compare June 18, 2026 13:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants