diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 5bdef252a2f4a..dc2ab1d381fd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -112,6 +112,7 @@ private PipeRawTabletInsertionEvent( this.isAligned = isAligned; this.sourceEvent = sourceEvent; this.needToReport = needToReport; + inheritSourceEventReportSkippingIfNecessary(); // Allocate empty memory block, will be resized later. this.allocatedMemoryBlock = @@ -399,6 +400,18 @@ public void markAsNeedToReport() { }); } this.needToReport = true; + inheritSourceEventReportSkippingIfNecessary(); + } + + private void inheritSourceEventReportSkippingIfNecessary() { + if (needToReport && shouldSkipReportOnCommitBecauseOfSourceEvent()) { + skipReportOnCommit(); + } + } + + private boolean shouldSkipReportOnCommitBecauseOfSourceEvent() { + return sourceEvent instanceof PipeTsFileInsertionEvent + && !((PipeTsFileInsertionEvent) sourceEvent).shouldReportGeneratedEventsOnCommit(); } // This getter is reserved for user-defined plugins diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 8f664761aa032..b4e1df26dbecd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -105,6 +105,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected volatile ProgressIndex overridingProgressIndex; private Set tableNames; private String tsFileDedupScopeID; + // False when generated tablet events should wait for an external progress report. + private volatile boolean shouldReportGeneratedEventsOnCommit = true; // This is set to check the tsFile paths by privilege private Map treeSchemaMap; @@ -462,6 +464,23 @@ public ProgressIndex forceGetProgressIndex() { return resource.getMaxProgressIndex(); } + public PipeTsFileInsertionEvent skipReportOnCommitAndGeneratedEvents() { + return setShouldReportGeneratedEventsOnCommit(false); + } + + public boolean shouldReportGeneratedEventsOnCommit() { + return shouldReportGeneratedEventsOnCommit; + } + + private PipeTsFileInsertionEvent setShouldReportGeneratedEventsOnCommit( + final boolean shouldReportGeneratedEventsOnCommit) { + this.shouldReportGeneratedEventsOnCommit = shouldReportGeneratedEventsOnCommit; + if (!shouldReportGeneratedEventsOnCommit) { + skipReportOnCommit(); + } + return this; + } + public void eliminateProgressIndex() { if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource) @@ -517,7 +536,8 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep startTime, endTime, isTsFileSealed) - .bindTsFileDedupScopeID(tsFileDedupScopeID); + .bindTsFileDedupScopeID(tsFileDedupScopeID) + .setShouldReportGeneratedEventsOnCommit(shouldReportGeneratedEventsOnCommit); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index c5dc7662bbc72..7e8b5a1f71f25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -75,6 +75,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE; @@ -107,6 +108,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STREAMING_KEY; @@ -186,6 +188,16 @@ public void validate(final PipeParameterValidator validator) throws Exception { SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validateAttributeValueRange( SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .validateAttributeValueRange( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + true, + Boolean.TRUE.toString(), + Boolean.FALSE.toString()) + .validateAttributeValueRange( + SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + true, + Boolean.TRUE.toString(), + Boolean.FALSE.toString()) .validate( args -> (boolean) args[0] || (boolean) args[1], "Should not set both history.enable and realtime.enable to false.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 36b84e1e12712..ad3b360aec28f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; @@ -58,6 +59,7 @@ import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; +import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeReaderPriority; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -98,6 +100,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE; @@ -110,6 +114,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY; @@ -149,6 +154,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private boolean sloppyTimeRange; // true to disable time range filter after extraction private boolean sloppyPattern; // true to disable pattern filter after extraction + private boolean shouldOrderHistoricalTsFileByQueryPriority = + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_DEFAULT_VALUE; private Pair listeningOptionPair; private boolean shouldExtractInsertion; @@ -170,6 +177,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource new HashMap<>(); private final Map pendingResource2ReplicateIndexForIoTV2 = new HashMap<>(); + private ProgressIndex maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE; + private boolean shouldReportMaxHistoricalProgressIndex = false; private int extractedHistoricalTsFileCount = 0; private int extractedHistoricalDeletionCount = 0; @@ -185,6 +194,13 @@ public void validate(final PipeParameterValidator validator) { throw new PipeParameterNotValidException(e.getMessage()); } + shouldOrderHistoricalTsFileByQueryPriority = + parameters.getBooleanOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY), + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_DEFAULT_VALUE); + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { final boolean isStrictMode = parameters.getBooleanOrDefault( @@ -311,6 +327,12 @@ public void customize( throws IllegalPathException { shouldExtractInsertion = listeningOptionPair.getLeft(); shouldExtractDeletion = listeningOptionPair.getRight(); + shouldOrderHistoricalTsFileByQueryPriority = + parameters.getBooleanOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY), + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_DEFAULT_VALUE); final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); @@ -496,6 +518,8 @@ public synchronized void start() { hasBeenStarted = true; extractedHistoricalTsFileCount = 0; extractedHistoricalDeletionCount = 0; + maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE; + shouldReportMaxHistoricalProgressIndex = false; final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); @@ -519,15 +543,15 @@ public synchronized void start() { .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } + if (shouldUseHistoricalTsFileQueryPriorityOrder()) { + prepareResourcesForHistoricalTsFileQueryPriorityOrder(originalResourceList); + } + // Sort tsFileResource and deletionResource long startTime = System.currentTimeMillis(); LOGGER.info( DataNodePipeMessages.PIPE_START_TO_SORT_ALL_EXTRACTED_RESOURCES, pipeName, dataRegionId); - originalResourceList.sort( - (o1, o2) -> - startIndex instanceof TimeWindowStateProgressIndex - ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) - : o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex())); + sortExtractedResources(originalResourceList); pendingQueue = new ArrayDeque<>(originalResourceList); PipeTerminateEvent.initializeHistoricalTransferSummary( pipeName, @@ -546,6 +570,87 @@ public synchronized void start() { } } + private boolean shouldUseHistoricalTsFileQueryPriorityOrder() { + // Deletion resources only carry progressIndex. Keep the old progressIndex order when deletions + // are extracted together with TsFiles so insertion/deletion ordering semantics are unchanged. + return shouldOrderHistoricalTsFileByQueryPriority + && shouldExtractInsertion + && !shouldExtractDeletion; + } + + private void prepareResourcesForHistoricalTsFileQueryPriorityOrder( + final List resourceList) { + // Query-priority order is intentionally not compatible with progressIndex order, so report + // progress only after all selected historical TsFiles are supplied. This prefers possible + // retransmission over losing overwrite semantics. + resourceList.removeIf( + resource -> + resource instanceof TsFileResource + && !filteredTsFileResources2TableNames.containsKey(resource)); + updateMaxHistoricalProgressIndex(resourceList); + shouldReportMaxHistoricalProgressIndex = !resourceList.isEmpty(); + } + + private void updateMaxHistoricalProgressIndex(final List resourceList) { + for (final PersistentResource resource : resourceList) { + final ProgressIndex progressIndex = resource.getProgressIndex(); + if (Objects.nonNull(progressIndex)) { + maxHistoricalProgressIndex = + maxHistoricalProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex); + } + } + } + + private void sortExtractedResources(final List resourceList) { + if (shouldUseHistoricalTsFileQueryPriorityOrder()) { + // Send TsFiles from lower query/compaction priority to higher priority. For duplicated + // points, covered files are loaded first on the receiver and covering files are loaded later + // to preserve overwrite semantics. + resourceList.sort( + (o1, o2) -> + o1 instanceof TsFileResource && o2 instanceof TsFileResource + ? compareTsFileResourcesByQueryPriority((TsFileResource) o1, (TsFileResource) o2) + : comparePersistentResourcesByProgressIndex(o1, o2)); + return; + } + + resourceList.sort( + (o1, o2) -> + startIndex instanceof TimeWindowStateProgressIndex + ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) + : comparePersistentResourcesByProgressIndex(o1, o2)); + } + + private int comparePersistentResourcesByProgressIndex( + final PersistentResource resource1, final PersistentResource resource2) { + return resource1.getProgressIndex().topologicalCompareTo(resource2.getProgressIndex()); + } + + private int compareTsFileResourcesByQueryPriority( + final TsFileResource resource1, final TsFileResource resource2) { + int result = + new MergeReaderPriority( + resource1.getTsFileID().timestamp, resource1.getVersion(), 0, resource1.isSeq()) + .compareTo( + new MergeReaderPriority( + resource2.getTsFileID().timestamp, + resource2.getVersion(), + 0, + resource2.isSeq())); + if (result != 0) { + return result; + } + + result = + Long.compare( + resource1.getTsFileID().compactionVersion, resource2.getTsFileID().compactionVersion); + if (result != 0) { + return result; + } + + return resource1.getTsFilePath().compareTo(resource2.getTsFilePath()); + } + private void flushTsFilesForExtraction(DataRegion dataRegion) { LOGGER.info(DataNodePipeMessages.PIPE_START_TO_FLUSH_DATA_REGION, pipeName, dataRegionId); @@ -554,7 +659,8 @@ private void flushTsFilesForExtraction(DataRegion dataRegion) { // Since a large number of consensus pipes are not created at the same time, resulting in no // serious waiting for locks. Therefore, the flush operation is always performed for the // consensus pipe, and the lastFlushed timestamp is not updated here. - if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX) + || shouldUseHistoricalTsFileQueryPriorityOrder()) { dataRegion.syncCloseAllWorkingTsFileProcessors(); } else { dataRegion.asyncCloseAllWorkingTsFileProcessors(); @@ -881,27 +987,36 @@ public synchronized Event supply() { return null; } - final PersistentResource resource = pendingQueue.peek(); - if (resource == null) { - return supplyTerminateEvent(); - } + while (true) { + final PersistentResource resource = pendingQueue.peek(); + if (resource == null) { + if (shouldReportMaxHistoricalProgressIndex) { + shouldReportMaxHistoricalProgressIndex = false; + return supplyProgressReportEvent(maxHistoricalProgressIndex); + } + return supplyTerminateEvent(); + } + + if (resource instanceof TsFileResource) { + final TsFileResource tsFileResource = (TsFileResource) resource; + if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { + clearReplicateIndexForResource(tsFileResource); + pendingQueue.poll(); + if (shouldUseHistoricalTsFileQueryPriorityOrder()) { + continue; + } + return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + } - if (resource instanceof TsFileResource) { - final TsFileResource tsFileResource = (TsFileResource) resource; - if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { - clearReplicateIndexForResource(tsFileResource); + final Event event = supplyTsFileEvent(tsFileResource); pendingQueue.poll(); - return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + return event; } - final Event event = supplyTsFileEvent(tsFileResource); + final Event event = supplyDeletionEvent((DeletionResource) resource); pendingQueue.poll(); return event; } - - final Event event = supplyDeletionEvent((DeletionResource) resource); - pendingQueue.poll(); - return event; } private Event supplyTerminateEvent() { @@ -982,7 +1097,9 @@ protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) { protected Event supplyTsFileEvent(final TsFileResource resource) { if (!filteredTsFileResources2TableNames.containsKey(resource)) { clearReplicateIndexForResource(resource); - return supplyProgressReportEvent(resource.getMaxProgressIndex()); + return shouldUseHistoricalTsFileQueryPriorityOrder() + ? null + : supplyProgressReportEvent(resource.getMaxProgressIndex()); } boolean shouldUnpinResource = false; @@ -1010,6 +1127,10 @@ protected Event supplyTsFileEvent(final TsFileResource resource) { historicalDataExtractionStartTime, historicalDataExtractionEndTime); + if (shouldUseHistoricalTsFileQueryPriorityOrder()) { + event.skipReportOnCommitAndGeneratedEvents(); + } + // if using IoTV2, assign a replicateIndex for this event if (shouldAssignReplicateIndexForIoTV2(event)) { event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(resource)); @@ -1081,7 +1202,6 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { cliHostname, skipIfNoPrivileges, false); - // if using IoTV2, assign a replicateIndex for this historical deletion event if (shouldAssignReplicateIndexForIoTV2(event)) { event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(deletionResource)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index cfe35ef041de9..f5c3f1fa05c8c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; @@ -53,6 +54,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.time.LocalDate; import java.util.ArrayList; @@ -262,6 +264,63 @@ private void createTablet() { new Tablet(deviceId, Arrays.asList(schemas), times, values, null, times.length); } + @Test + public void markAsNeedToReportShouldInheritSourceTsFileGeneratedReportSkipping() + throws Exception { + final PipeTsFileInsertionEvent sourceEvent = Mockito.mock(PipeTsFileInsertionEvent.class); + Mockito.when(sourceEvent.shouldReportGeneratedEventsOnCommit()).thenReturn(true); + final PipeRawTabletInsertionEvent tabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + false); + + tabletEvent.markAsNeedToReport(); + Assert.assertTrue(tabletEvent.isShouldReportOnCommit()); + + Mockito.when(sourceEvent.shouldReportGeneratedEventsOnCommit()).thenReturn(false); + final PipeRawTabletInsertionEvent skippedTabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + false); + + skippedTabletEvent.markAsNeedToReport(); + Assert.assertFalse(skippedTabletEvent.isShouldReportOnCommit()); + + final PipeRawTabletInsertionEvent constructorSkippedTabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + true); + + Assert.assertFalse(constructorSkippedTabletEvent.isShouldReportOnCommit()); + } + @Test public void convertToTabletForTest() throws Exception { TabletInsertionEventTreePatternParser container1 = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java index edffbb5b31c1f..804df8d5722ce 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -181,6 +181,215 @@ public void testSupplyRetriesSameTsFileAfterEventCreationFailure() throws Except } } + @Test + public void testHistoricalTsFileQueryPriorityOrderDefaultsToTrue() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + + source.validate(new PipeParameterValidator(new PipeParameters(new HashMap<>()))); + + Assert.assertTrue( + (Boolean) getPrivateField(source, "shouldOrderHistoricalTsFileByQueryPriority")); + } + + @Test + public void testHistoricalTsFileQueryPriorityOrderMatchesQueryCoverage() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileOrder").toFile(); + + try { + final TsFileResource seqLowerVersionNewerFileTimestamp = + createTsFileResource(tempDir, "300-1-0-0.tsfile"); + seqLowerVersionNewerFileTimestamp.setSeq(true); + final TsFileResource seqSameVersionOlderFileTimestamp = + createTsFileResource(tempDir, "100-2-0-0.tsfile"); + seqSameVersionOlderFileTimestamp.setSeq(true); + final TsFileResource seqSameVersionNewerFileTimestamp = + createTsFileResource(tempDir, "200-2-0-0.tsfile"); + seqSameVersionNewerFileTimestamp.setSeq(true); + final TsFileResource seqHigherVersionOlderFileTimestamp = + createTsFileResource(tempDir, "50-3-0-0.tsfile"); + seqHigherVersionOlderFileTimestamp.setSeq(true); + final TsFileResource unseqLowerVersionOldestFileTimestamp = + createTsFileResource(tempDir, "1-1-0-0.tsfile"); + unseqLowerVersionOldestFileTimestamp.setSeq(false); + + setPrivateField(source, "shouldOrderHistoricalTsFileByQueryPriority", true); + setPrivateField(source, "shouldExtractInsertion", true); + setPrivateField(source, "shouldExtractDeletion", false); + setPrivateField(source, "startIndex", MinimumProgressIndex.INSTANCE); + + final List resources = + new ArrayList<>( + Arrays.asList( + unseqLowerVersionOldestFileTimestamp, + seqHigherVersionOlderFileTimestamp, + seqSameVersionNewerFileTimestamp, + seqSameVersionOlderFileTimestamp, + seqLowerVersionNewerFileTimestamp)); + sortExtractedResources(source, resources); + + Assert.assertEquals( + Arrays.asList( + seqLowerVersionNewerFileTimestamp, + seqSameVersionOlderFileTimestamp, + seqSameVersionNewerFileTimestamp, + seqHigherVersionOlderFileTimestamp, + unseqLowerVersionOldestFileTimestamp), + resources); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testHistoricalTsFileQueryPriorityOrderCanBeDisabled() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put( + PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + Boolean.FALSE.toString()); + } + }); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileProgressOrder").toFile(); + + try { + source.validate(new PipeParameterValidator(parameters)); + final TsFileResource earlierProgressIndex = createTsFileResource(tempDir, "300-1-0-0.tsfile"); + earlierProgressIndex.updateProgressIndex(new SimpleProgressIndex(0, 1)); + final TsFileResource laterProgressIndex = createTsFileResource(tempDir, "100-1-0-0.tsfile"); + laterProgressIndex.updateProgressIndex(new SimpleProgressIndex(0, 2)); + + setPrivateField(source, "shouldExtractInsertion", true); + setPrivateField(source, "shouldExtractDeletion", false); + setPrivateField(source, "startIndex", MinimumProgressIndex.INSTANCE); + + final List resources = + new ArrayList<>(Arrays.asList(laterProgressIndex, earlierProgressIndex)); + sortExtractedResources(source, resources); + + Assert.assertFalse( + (Boolean) getPrivateField(source, "shouldOrderHistoricalTsFileByQueryPriority")); + Assert.assertEquals(Arrays.asList(earlierProgressIndex, laterProgressIndex), resources); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testHistoricalTsFileQueryPriorityOrderCanBeDisabledByExtractorKey() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put( + PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY, + Boolean.FALSE.toString()); + } + }); + + source.validate(new PipeParameterValidator(parameters)); + + Assert.assertFalse( + (Boolean) getPrivateField(source, "shouldOrderHistoricalTsFileByQueryPriority")); + } + + @Test + public void testHistoricalTsFileQueryPriorityOrderFallsBackWhenDeletionExtracted() + throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = + Files.createTempDirectory("pipeHistoricalTsFileDeletionProgressOrder").toFile(); + + try { + final TsFileResource higherQueryPriorityEarlierProgress = + createTsFileResource(tempDir, "300-3-0-0.tsfile"); + higherQueryPriorityEarlierProgress.setSeq(true); + higherQueryPriorityEarlierProgress.updateProgressIndex(new SimpleProgressIndex(0, 1)); + final TsFileResource lowerQueryPriorityLaterProgress = + createTsFileResource(tempDir, "100-1-0-0.tsfile"); + lowerQueryPriorityLaterProgress.setSeq(true); + lowerQueryPriorityLaterProgress.updateProgressIndex(new SimpleProgressIndex(0, 2)); + + setPrivateField(source, "shouldOrderHistoricalTsFileByQueryPriority", true); + setPrivateField(source, "shouldExtractInsertion", true); + setPrivateField(source, "shouldExtractDeletion", true); + setPrivateField(source, "startIndex", MinimumProgressIndex.INSTANCE); + + final List resources = + new ArrayList<>( + Arrays.asList(lowerQueryPriorityLaterProgress, higherQueryPriorityEarlierProgress)); + sortExtractedResources(source, resources); + + Assert.assertEquals( + Arrays.asList(higherQueryPriorityEarlierProgress, lowerQueryPriorityLaterProgress), + resources); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testQueryPriorityOrderReportsProgressAfterAllHistoricalResources() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final ProgressIndex expectedProgressIndex = new SimpleProgressIndex(0, 10); + + setPrivateField(source, "hasBeenStarted", true); + setPrivateField(source, "pipeName", "pipe"); + setPrivateField(source, "creationTime", 1L); + setPrivateField(source, "dataRegionId", 1); + setPrivateField(source, "pipeTaskMeta", new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)); + setPrivateField(source, "pendingQueue", new ArrayDeque()); + setPrivateField(source, "maxHistoricalProgressIndex", expectedProgressIndex); + setPrivateField(source, "shouldReportMaxHistoricalProgressIndex", true); + + final Event event = source.supply(); + + Assert.assertTrue(event instanceof ProgressReportEvent); + Assert.assertEquals(expectedProgressIndex, ((ProgressReportEvent) event).getProgressIndex()); + Assert.assertFalse((Boolean) getPrivateField(source, "shouldReportMaxHistoricalProgressIndex")); + } + + @Test + @SuppressWarnings("unchecked") + public void testQueryPriorityOrderProgressOnlyCoversSelectedResources() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileSelectedProgress").toFile(); + + try { + final TsFileResource selectedResource = createTsFileResource(tempDir, "100-1-0-0.tsfile"); + selectedResource.updateProgressIndex(new SimpleProgressIndex(0, 1)); + final TsFileResource filteredResource = createTsFileResource(tempDir, "200-1-0-0.tsfile"); + filteredResource.updateProgressIndex(new SimpleProgressIndex(0, 100)); + + ((Map>) + getPrivateField(source, "filteredTsFileResources2TableNames")) + .put(selectedResource, Set.of()); + + final List resources = + new ArrayList<>(Arrays.asList(filteredResource, selectedResource)); + prepareResourcesForHistoricalTsFileQueryPriorityOrder(source, resources); + + Assert.assertEquals(Arrays.asList(selectedResource), resources); + Assert.assertEquals( + new SimpleProgressIndex(0, 1), getPrivateField(source, "maxHistoricalProgressIndex")); + Assert.assertTrue( + (Boolean) getPrivateField(source, "shouldReportMaxHistoricalProgressIndex")); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + @Test public void testReplicateIndexShouldBeStableBeforeResourceConsumed() throws Exception { final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source = @@ -302,6 +511,28 @@ private static ProgressIndex hybridProgressIndex( return result; } + private static void sortExtractedResources( + final PipeHistoricalDataRegionTsFileAndDeletionSource source, + final List resources) + throws ReflectiveOperationException { + final Method method = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod( + "sortExtractedResources", List.class); + method.setAccessible(true); + method.invoke(source, resources); + } + + private static void prepareResourcesForHistoricalTsFileQueryPriorityOrder( + final PipeHistoricalDataRegionTsFileAndDeletionSource source, + final List resources) + throws ReflectiveOperationException { + final Method method = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod( + "prepareResourcesForHistoricalTsFileQueryPriorityOrder", List.class); + method.setAccessible(true); + method.invoke(source, resources); + } + private static void setPrivateField( final PipeHistoricalDataRegionTsFileAndDeletionSource source, final String fieldName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java index e755ef3c6019e..523fa9b0698f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java @@ -101,6 +101,11 @@ public class PipeSourceConstant { public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE = "path"; public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE = "all"; public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE = ""; + public static final String EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY = + "extractor.history.tsfile.order-by-query-priority"; + public static final String SOURCE_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_KEY = + "source.history.tsfile.order-by-query-priority"; + public static final boolean EXTRACTOR_HISTORY_TSFILE_ORDER_BY_QUERY_PRIORITY_DEFAULT_VALUE = true; public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;