Automatically repair lost data partitions#17279
Automatically repair lost data partitions#17279zerolbsony wants to merge 42 commits intoapache:masterfrom
Conversation
… the DataPartitionTableIntegrityCheckProcedure.
Adjust method that record the earliest timeslot id for every database
Remove two unit testes only run successful in local environment
…tion Information instead of scanning data directories in the DataNode; Correct the logic that retry after the step failed; Correct skipDataNodes and failedDataNodes serialization and deserialization.
… repair_lost_data_partition
There was a problem hiding this comment.
Pull request overview
This PR introduces an automated “data partition table integrity check + repair” flow intended to run after ConfigNode restart by adding new DataNode RPCs for (1) earliest timeslot discovery and (2) DataPartitionTable regeneration from local TsFiles, plus a ConfigNode-side procedure to orchestrate detection/repair.
Changes:
- Extend the DataNode Thrift RPC API with partition-table integrity check requests/responses.
- Add a DataNode-side
DataPartitionTableGeneratorand supporting utilities (rate limiter, time-partition helpers) to rebuildDataPartitionTableby scanning TsFile resources. - Add a ConfigNode procedure and startup/registration hooks to trigger integrity check and write repaired partition tables back via consensus.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 26 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift | Adds new DataNode RPC structs + methods for integrity check and partition-table generation. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java | Introduces a leaky-bucket throughput limiter used during scanning. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java | Adds helpers to convert partitionId ↔ time and a new satisfyPartitionId overload. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java | Attempts to prevent duplicate consensus group IDs per time partition. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java | Adds a status-code enum for generator task state. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java | Adds thread names for the new scan/recovery pools. |
| iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template | Adds new recovery-related configuration parameters. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java | Adds a limiter-aware getDevices method signature. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java | Implements limiter-aware device scanning from .resource files. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java | Implements limiter-aware getDevices (no-op for in-memory index). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java | Adds getDevices(limiter) delegating to the time index. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | Implements new RPC endpoints and local filesystem scanning + DataPartitionTable serialization. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java | Adds operation types for new RPC error handling. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java | Adds generator that rebuilds a DataPartitionTable from local TsFiles. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | Loads new recovery properties into DataNode config. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | Adds new config fields + accessors for recovery tuning. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java | Adds restart-time background task intended to trigger integrity check. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java | Registers a new procedure type for integrity check. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java | Adds procedure deserialization/dispatch for the new procedure. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java | Adds state-machine states for the integrity check procedure. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java | Implements the integrity check and repair orchestration procedure. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java | Adds a new env class (currently appears redundant/unreferenced). |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java | Adds registration-time trigger that can launch the integrity check. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java | Adds a dataPartitionTableIntegrityCheck() convenience API. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java | Loads the new wait-timeout property into ConfigNode config. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java | Adds ConfigNode config field for “wait all DataNodes up” timeout. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java | Wires new sync actions to the new DataNode RPC calls. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java | Adds new sync request types for integrity check and generation. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java | Adds logging members (no new test coverage for the new RPCs). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if (timeslot | ||
| < databaseEarliestRegionMap.get(databaseName)) { | ||
| databaseEarliestRegionMap.put(databaseName, timeslot); | ||
| } |
| assignedDataPartition.put( | ||
| lostDataPartitionsOfDatabases.stream().findFirst().get(), finalDataPartitionTable); |
| } | ||
|
|
||
| status = TaskStatus.IN_PROGRESS; | ||
| return CompletableFuture.runAsync(this::generateDataPartitionTableByMemory); |
| * Environment object for ConfigNode procedures. Provides access to ConfigManager and other | ||
| * necessary components. | ||
| */ | ||
| public class ConfigNodeProcedureEnv { | ||
|
|
||
| private final ConfigManager configManager; | ||
|
|
||
| public ConfigNodeProcedureEnv(ConfigManager configManager) { | ||
| this.configManager = configManager; | ||
| } | ||
|
|
||
| public ConfigManager getConfigManager() { | ||
| return configManager; | ||
| } |
| try { | ||
| dataPartitionTableCheckFuture.get(); | ||
| } catch (ExecutionException | InterruptedException e) { | ||
| LOGGER.error("Data partition table check task execute failed", e); |
| int earliestTimeslotsSize = byteBuffer.getInt(); | ||
| earliestTimeslots = new ConcurrentHashMap<>(); | ||
| for (int i = 0; i < earliestTimeslotsSize; i++) { | ||
| String database = String.valueOf(byteBuffer.getChar()); |
| try { | ||
| Files.list(dataDir.toPath()) | ||
| .filter(Files::isDirectory) | ||
| .forEach( | ||
| sequenceTypePath -> { | ||
| try { | ||
| Files.list(sequenceTypePath) |
...e/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
Show resolved
Hide resolved
| // Trigger integrity check asynchronously | ||
| try { | ||
| configManager.getProcedureManager().dataPartitionTableIntegrityCheck(); | ||
| LOGGER.info("Data partition table integrity check procedure submitted successfully"); |
There was a problem hiding this comment.
These codes have some problems, can't pass tests.
|
|
||
| /* Need use these parameters when repair data partition table */ | ||
| private int partitionTableRecoverWorkerNum = 10; | ||
| private int partitionTableRecoverMaxReadBytesPerSecond = 1000; |
|
|
||
| private long forceWalPeriodForConfigNodeSimpleInMs = 100; | ||
|
|
||
| private long partitionTableRecoverWaitAllDnUpTimeout = 60000; |
There was a problem hiding this comment.
Refactor this parameter
| private long partitionTableRecoverWaitAllDnUpTimeout = 60000; | |
| private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000; |
| dataPartitionTableCheckExecutor.submit( | ||
| () -> { | ||
| LOGGER.info( | ||
| "Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up"); |
There was a problem hiding this comment.
Better to append a prefix for all of your logs appended, e.g.,
| "Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up"); | |
| "[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up"); |
| () -> { | ||
| LOGGER.info( | ||
| "Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up"); | ||
| // Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeout()); |
| latch.countDown(); | ||
| } else { | ||
| LOGGER.info("No running datanodes found, waiting..."); | ||
| Thread.sleep(5000); // 等待5秒后重新检查 |
There was a problem hiding this comment.
| Thread.sleep(5000); // 等待5秒后重新检查 | |
| Thread.sleep(5000); |
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
Show resolved
Hide resolved
| .forEach( | ||
| slot -> { | ||
| if (!TimePartitionUtils.satisfyPartitionId( | ||
| slot.getStartTime(), earliestTimeslot)) { | ||
| lostDataPartitionsOfDatabases.add(database); | ||
| LOG.warn( | ||
| "Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired", | ||
| database, | ||
| earliestTimeslot); | ||
| } |
There was a problem hiding this comment.
Rectify these codes. The only thing u need to check is whether the database's earlist timeSlot greater than the DataNode's.
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Show resolved
Hide resolved
| getClusterSchemaManager().adjustMaxRegionGroupNum(); | ||
|
|
||
| // Check if all DataNodes are registered and trigger integrity check if needed | ||
| checkAndTriggerIntegrityCheck(); |
| ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()); | ||
|
|
||
| private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
This could be set to a local variable.
There was a problem hiding this comment.
The executor is no longer a global parameter of class.
There was a problem hiding this comment.
Change this map into a local variable.
There was a problem hiding this comment.
Change this map into a local variable.
Changed yet
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Show resolved
Hide resolved
…eted in the progress caused by rpc timeout
…is from ConfigNode, the data partition table lost.
| .min(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) | ||
| .orElse(null); | ||
|
|
||
| if (!TimePartitionUtils.satisfyPartitionId(localEarliestSlot.getStartTime(), earliestTimeslot)) { |
There was a problem hiding this comment.
Judge whether earliestTimeSlot.startTime < localEarliestSlot.startTime
| setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); | ||
| return Flow.HAS_MORE_STATE; |
There was a problem hiding this comment.
| setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); | |
| return Flow.HAS_MORE_STATE; | |
| setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); | |
| return Flow.HAS_MORE_STATE; |
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Show resolved
Hide resolved
| /* Need use these parameters when repair data partition table */ | ||
| private int partitionTableRecoverWorkerNum = 10; | ||
| // Rate limit set to 10 MB/s | ||
| private int partitionTableRecoverMaxReadBytesPerSecond = 10; |
There was a problem hiding this comment.
MaxReadMBs instead of Bytes
| ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()); | ||
|
|
||
| private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Change this map into a local variable.
| private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>(); | ||
|
|
||
| // Must be lower than the RPC request timeout, in milliseconds | ||
| private static final long timeoutMs = 50000; |
There was a problem hiding this comment.
DO NOT append member variables easily.
...e/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
Show resolved
Hide resolved
|
|
||
| public class DataNodeInternalRPCServiceImplTest { | ||
|
|
||
| private static final Logger LOG = |
There was a problem hiding this comment.
I forgot to clean it. I coded some unit tests for test rpc interfaces on the local ago.
| return startPartition <= partitionId && endPartition >= partitionId; | ||
| } | ||
|
|
||
| public static boolean satisfyPartitionId(long startTime, long partitionId) { |
There was a problem hiding this comment.
No need to update this class in our current implementation.
The overflow problem still exist, that's no way to resolve, previously, the user set timePartitionOrigin to Long.MIN_VALUE. In this case, adding it to partitionId = -1 will indeed cause an overflow. However, the partition table in the system only accepts timestamps of the long type and does not support bigint timestamps. Therefore, if an overflow actually occurs, we have to accept the outcome where the program is interrupted by an exception being thrown.
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Outdated
Show resolved
Hide resolved
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Outdated
Show resolved
Hide resolved
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Outdated
Show resolved
Hide resolved
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Show resolved
Hide resolved
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Show resolved
Hide resolved
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
Show resolved
Hide resolved
...commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
Outdated
Show resolved
Hide resolved
...che/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
Outdated
Show resolved
Hide resolved
...b-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
Show resolved
Hide resolved
...commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java
Outdated
Show resolved
Hide resolved
Adjust new method to compute progress of data partition table generation
After the ConfigNode restarts, it will automatically start checking whether the data partition tables are complete.