[server] Fix zk partition residual when using dynamic partition#1187
[server] Fix zk partition residual when using dynamic partition#1187luoyuxia merged 2 commits intoapache:mainfrom
Conversation
| } | ||
| }); | ||
| if (!lock.isLocked() && lock.getQueueLength() == 0) { | ||
| createPartitionLockMap.remove(physicalTablePath, lock); |
There was a problem hiding this comment.
If any exception happens in above inLock(xxx cde fragement, this lock won't be removed..
| } | ||
| }); | ||
| if (!lock.isLocked() && lock.getQueueLength() == 0) { | ||
| createPartitionLockMap.remove(physicalTablePath, lock); |
There was a problem hiding this comment.
Considering the following case:
- Thread A come to here, is going to remove the lock1
- Thread B acquired the lock1 in https://github.com/alibaba/fluss/pull/1187/files#diff-4623210f8554fe6ea208a8dfc413f997d6d10b8c64cfc3aa530f3620ddbd09ccR401
- Thread A remove the lock1 from the map
- Thread C find no lock for the partition, try to create lock2
There still race in Thread B & Thread C.
TBH, I'm not still thinking out a good way to solve it. Maybe a guava cache?
| try { | ||
| long partitionId = zookeeperClient.getPartitionIdAndIncrement(); | ||
| // register partition assignments to zk first | ||
| zookeeperClient.registerPartitionAssignment( |
There was a problem hiding this comment.
Can we wrap registerPartitionAssignment and registerPartition into a zk transation? We let zk to do the concurrent controll so that we won't need the lock. We will have clean code.
The only bad is that we may fire mutiple zk transation for same partition. But it shouldn't happen frequently, so I think it's fine since the current implemenation also will fire mutiple zk operations for same partition.
There was a problem hiding this comment.
very valuable suggestion. I have changed the solution.
There was a problem hiding this comment.
Pull Request Overview
This PR adds per-partition locking around the createPartition flow to prevent residual entries in ZooKeeper when dynamic partitions are created concurrently.
- Introduce a
createPartitionLockMapto manageReentrantLockperPhysicalTablePath - Wrap the partition creation logic in
inLockto enforce mutual exclusion - Clean up locks from the map after creation completes
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java:396
- The new per-partition locking logic in
createPartitionintroduces concurrency behavior that should be covered by unit or integration tests to verify correct mutual exclusion and lock cleanup under both success and failure scenarios.
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
| }); | ||
| if (!lock.isLocked() && lock.getQueueLength() == 0) { | ||
| createPartitionLockMap.remove(physicalTablePath, lock); |
There was a problem hiding this comment.
The cleanup removal of the ReentrantLock from createPartitionLockMap will not execute if an exception is thrown inside inLock, potentially leaking locks. Consider wrapping both the inLock call and the cleanup removal logic in a try/finally to ensure removal always runs.
| }); | |
| if (!lock.isLocked() && lock.getQueueLength() == 0) { | |
| createPartitionLockMap.remove(physicalTablePath, lock); | |
| }); | |
| } finally { | |
| if (!lock.isLocked() && lock.getQueueLength() == 0) { | |
| createPartitionLockMap.remove(physicalTablePath, lock); | |
| } |
4392148 to
4ab0eb9
Compare
|
@luoyuxia Thanks! I refactor my code to solve problem by using zk transaction instead of Lock Map! PLAT |
| } | ||
|
|
||
| /** Register partition assignment and metadata in transaction. */ | ||
| public void registerPartitionAssignmentAndMetadata( |
There was a problem hiding this comment.
with this method, we can remove methods registerPartition & registerPartitionAssignment
Purpose
Linked issue: close #1185
Brief change log
Tests
API and Format
Documentation