Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ public static Map<Long, String> createPartitions(
new TabletServerInfo(2, "rack2")
});

// register partition assignments
zkClient.registerPartitionAssignment(
// register partition assignments and metadata
zkClient.registerPartitionAssignmentAndMetadata(
partitionId,
partition,
new PartitionAssignment(
tableInfo.getTableId(), assignment.getBucketAssignments()));

// register partition
zkClient.registerPartition(tablePath, tableInfo.getTableId(), partition, partitionId);
tableInfo.getTableId(), assignment.getBucketAssignments()),
tablePath,
tableInfo.getTableId());
}
return newPartitionIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,9 @@ public void createPartition(

try {
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
// register partition assignments to zk first
zookeeperClient.registerPartitionAssignment(partitionId, partitionAssignment);
// then register the partition metadata to zk
zookeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId);
// register partition assignments and partition metadata to zk in transaction
zookeeperClient.registerPartitionAssignmentAndMetadata(
partitionId, partitionName, partitionAssignment, tablePath, tableId);
LOG.info(
"Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
} catch (KeeperException.NodeExistsException nodeExistsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,6 @@ public void registerTableAssignment(long tableId, TableAssignment tableAssignmen
LOG.info("Registered table assignment {} for table id {}.", tableAssignment, tableId);
}

/** Register partition assignment to ZK. */
public void registerPartitionAssignment(
long partitionId, PartitionAssignment partitionAssignment) throws Exception {
String path = PartitionIdZNode.path(partitionId);
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, PartitionIdZNode.encode(partitionAssignment));
}

/** Get the table assignment in ZK. */
public Optional<TableAssignment> getTableAssignment(long tableId) throws Exception {
Optional<byte[]> bytes = getOrEmpty(TableIdZNode.path(tableId));
Expand Down Expand Up @@ -502,23 +492,68 @@ public int getPartitionNumber(TablePath tablePath) throws Exception {
return stat.getNumChildren();
}

/** Create a partition for a table in ZK. */
public void registerPartition(
TablePath tablePath, long tableId, String partitionName, long partitionId)
throws Exception {
String path = PartitionZNode.path(tablePath, partitionName);
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, PartitionZNode.encode(new TablePartition(tableId, partitionId)));
}

/** Delete a partition for a table in ZK. */
public void deletePartition(TablePath tablePath, String partitionName) throws Exception {
String path = PartitionZNode.path(tablePath, partitionName);
zkClient.delete().forPath(path);
}

/** Register partition assignment and metadata in transaction. */
public void registerPartitionAssignmentAndMetadata(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this method, we can remove methods registerPartition & registerPartitionAssignment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

long partitionId,
String partitionName,
PartitionAssignment partitionAssignment,
TablePath tablePath,
long tableId)
throws Exception {
// Merge "registerPartitionAssignment()" and "registerPartition()"
// into one transaction. This is to avoid the case that the partition assignment is
// registered
// but the partition metadata is not registered.

// Create parent dictionary in advance.
try {
String tabletServerPartitionParentPath = ZkData.PartitionIdsZNode.path();
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(tabletServerPartitionParentPath);
} catch (KeeperException.NodeExistsException e) {
// ignore
}
try {
String metadataPartitionParentPath = PartitionsZNode.path(tablePath);
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(metadataPartitionParentPath);
} catch (KeeperException.NodeExistsException e) {
// ignore
}

List<CuratorOp> ops = new ArrayList<>(2);
String tabletServerPartitionPath = PartitionIdZNode.path(partitionId);
CuratorOp tabletServerPartitionNode =
zkClient.transactionOp()
.create()
.withMode(CreateMode.PERSISTENT)
.forPath(
tabletServerPartitionPath,
PartitionIdZNode.encode(partitionAssignment));

String metadataPath = PartitionZNode.path(tablePath, partitionName);
CuratorOp metadataPartitionNode =
zkClient.transactionOp()
.create()
.withMode(CreateMode.PERSISTENT)
.forPath(
metadataPath,
PartitionZNode.encode(new TablePartition(tableId, partitionId)));

ops.add(tabletServerPartitionNode);
ops.add(metadataPartitionNode);
zkClient.transaction().forOperations(ops);
}
// --------------------------------------------------------------------------------------------
// Schema
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,10 @@ private Tuple2<PartitionIdName, PartitionIdName> preparePartitionAssignment(
long partition2Id = zookeeperClient.getPartitionIdAndIncrement();
String partition1Name = "2024";
String partition2Name = "2025";
zookeeperClient.registerPartitionAssignment(partition1Id, partitionAssignment);
zookeeperClient.registerPartition(tablePath, tableId, partition1Name, partition1Id);
zookeeperClient.registerPartitionAssignment(partition2Id, partitionAssignment);
zookeeperClient.registerPartition(tablePath, tableId, partition2Name, partition2Id);
zookeeperClient.registerPartitionAssignmentAndMetadata(
partition1Id, partition1Name, partitionAssignment, tablePath, tableId);
zookeeperClient.registerPartitionAssignmentAndMetadata(
partition2Id, partition2Name, partitionAssignment, tablePath, tableId);

return Tuple2.of(
new PartitionIdName(partition1Id, partition1Name),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,18 @@ void testCreateAndDropPartition() throws Exception {

PartitionAssignment partitionAssignment =
new PartitionAssignment(tableId, createAssignment().getBucketAssignments());
zookeeperClient.registerPartitionAssignment(
zookeeperClient.getPartitionIdAndIncrement(), partitionAssignment);
String partitionName = "2024";
zookeeperClient.registerPartitionAssignmentAndMetadata(
zookeeperClient.getPartitionIdAndIncrement(),
partitionName,
partitionAssignment,
DATA1_TABLE_PATH,
tableId);

// create partition
long partitionId = 1L;
tableManager.onCreateNewPartition(
DATA1_TABLE_PATH, tableId, partitionId, "2024", partitionAssignment);
DATA1_TABLE_PATH, tableId, partitionId, partitionName, partitionAssignment);

// all replicas should be online
checkReplicaOnline(tableId, partitionId, partitionAssignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,11 @@ void testPartitionedTable() throws Exception {
new TabletServerInfo(2, "rack2")
})
.getBucketAssignments());
// register assignment
zookeeperClient.registerPartitionAssignment(1L, partitionAssignment);
zookeeperClient.registerPartitionAssignment(2L, partitionAssignment);

// register partitions
zookeeperClient.registerPartition(tablePath, tableId, "2011", 1L);
zookeeperClient.registerPartition(tablePath, tableId, "2022", 2L);
// register assignment and metadata
zookeeperClient.registerPartitionAssignmentAndMetadata(
1L, "2011", partitionAssignment, tablePath, tableId);
zookeeperClient.registerPartitionAssignmentAndMetadata(
2L, "2022", partitionAssignment, tablePath, tableId);

// create partitions events
expectedEvents.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.alibaba.fluss.server.zk;

import com.alibaba.fluss.cluster.Endpoint;
import com.alibaba.fluss.cluster.TabletServerInfo;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.Schema;
Expand All @@ -31,6 +32,7 @@
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import com.alibaba.fluss.server.zk.data.PartitionAssignment;
import com.alibaba.fluss.server.zk.data.TableAssignment;
import com.alibaba.fluss.server.zk.data.TableRegistration;
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
Expand All @@ -57,6 +59,7 @@
import java.util.Optional;
import java.util.Set;

import static com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -411,8 +414,22 @@ void testPartition() throws Exception {
assertThat(partitions).isEmpty();

// test create new partitions
zookeeperClient.registerPartition(tablePath, tableId, "p1", 1L);
zookeeperClient.registerPartition(tablePath, tableId, "p2", 2L);
PartitionAssignment partitionAssignment =
new PartitionAssignment(
tableId,
generateAssignment(
3,
3,
new TabletServerInfo[] {
new TabletServerInfo(0, "rack0"),
new TabletServerInfo(1, "rack1"),
new TabletServerInfo(2, "rack2")
})
.getBucketAssignments());
zookeeperClient.registerPartitionAssignmentAndMetadata(
1L, "p1", partitionAssignment, tablePath, tableId);
zookeeperClient.registerPartitionAssignmentAndMetadata(
2L, "p2", partitionAssignment, tablePath, tableId);

// check created partitions
partitions = zookeeperClient.getPartitions(tablePath);
Expand Down