Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,63 @@ public List<T> get(List<String> paths, List<Stat> stats, int options, boolean th
return _baseAccessor.get(serverPaths, stats, options, throwException);
}

// TODO: add cache
@Override
public Stat[] getStats(List<String> paths, int options) {
if (paths == null || paths.isEmpty()) {
return new Stat[0];
}

final int size = paths.size();
List<String> serverPaths = prependChroot(paths);

Stat[] stats = new Stat[size];

boolean needRead = false;
boolean[] needReads = new boolean[size]; // init to false

Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockRead();
// Try to get stats from cache first
for (int i = 0; i < size; i++) {
ZNode zNode = cache.get(serverPaths.get(i));
if (zNode != null) {
stats[i] = zNode.getStat();
} else {
needRead = true;
needReads[i] = true;
}
}
} finally {
cache.unlockRead();
}

// cache miss, fall back to zk and update cache
if (needRead) {
cache.lockWrite();
try {
// Batch read data and stats for cache misses to populate cache
List<Stat> readStatsList = new ArrayList<>(Collections.<Stat>nCopies(size, null));
List<T> readRecords = _baseAccessor.get(serverPaths, readStatsList, needReads, false);

for (int i = 0; i < size; i++) {
if (needReads[i]) {
stats[i] = readStatsList.get(i);
if (readStatsList.get(i) != null) {
cache.update(serverPaths.get(i), readRecords.get(i), readStatsList.get(i));
}
}
}
} finally {
cache.unlockWrite();
}
}

return stats;
}

// no cache
return _baseAccessor.getStats(serverPaths, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,137 @@ public void testZkCacheCallbackExternalOpNoChroot() throws Exception {
deleteCluster(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testGetStatsWithCache() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));

String curStatePath = PropertyPathBuilder.instanceCurrentState(clusterName, "localhost_8901");
ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);

List<String> cachePaths = Arrays.asList(curStatePath);
ZkCacheBaseDataAccessor<ZNRecord> accessor =
new ZkCacheBaseDataAccessor<>(baseAccessor, null, null, cachePaths);

// Create 5 current state nodes
List<String> paths = new ArrayList<>();
for (int i = 0; i < 5; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
paths.add(path);
boolean success = accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
Assert.assertTrue(success, "Should succeed in create: " + path);
}

// First call - populates cache
org.apache.zookeeper.data.Stat[] stats = accessor.getStats(paths, 0);
Assert.assertNotNull(stats);
Assert.assertEquals(stats.length, 5);
for (int i = 0; i < 5; i++) {
Assert.assertNotNull(stats[i], "Stat should exist for TestDB" + i);
Assert.assertEquals(stats[i].getVersion(), 0);
}

// Verify cache is populated after first getStats call
for (int i = 0; i < 5; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
Assert.assertNotNull(accessor._zkCache._cache.get(path),
"Cache should contain TestDB" + i + " after first getStats call");
}

// Second call - should hit cache
org.apache.zookeeper.data.Stat[] cachedStats = accessor.getStats(paths, 0);
Assert.assertEquals(cachedStats.length, 5);
for (int i = 0; i < 5; i++) {
Assert.assertNotNull(cachedStats[i]);
Assert.assertEquals(cachedStats[i].getVersion(), 0);
}

// Verify data still in cache and matches
for (int i = 0; i < 5; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
Assert.assertNotNull(accessor._zkCache._cache.get(path),
"Cache should still contain TestDB" + i);
// Verify we got the same stat from cache (version should match)
Assert.assertEquals(
accessor._zkCache._cache.get(path).getStat().getVersion(),
cachedStats[i].getVersion(),
"Cached stat version should match returned stat for TestDB" + i);
}

deleteCluster(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testGetStatsAfterUpdate() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));

String curStatePath = PropertyPathBuilder.instanceCurrentState(clusterName, "localhost_8901");
ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);

List<String> cachePaths = Arrays.asList(curStatePath);
ZkCacheBaseDataAccessor<ZNRecord> accessor =
new ZkCacheBaseDataAccessor<>(baseAccessor, null, null, cachePaths);

// Create nodes
List<String> paths = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
paths.add(path);
accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
}

// Get initial stats - populates cache
org.apache.zookeeper.data.Stat[] initialStats = accessor.getStats(paths, 0);
for (int i = 0; i < 3; i++) {
Assert.assertEquals(initialStats[i].getVersion(), 0);
}

// Verify cache is populated with version 0
for (int i = 0; i < 3; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
Assert.assertNotNull(accessor._zkCache._cache.get(path),
"Cache should contain TestDB" + i);
Assert.assertEquals(
accessor._zkCache._cache.get(path).getStat().getVersion(), 0,
"Cached version should be 0 for TestDB" + i);
}

// Update nodes
for (int i = 0; i < 3; i++) {
ZNRecord updatedRecord = new ZNRecord("TestDB" + i);
updatedRecord.setSimpleField("key", "value");
accessor.set(paths.get(i), updatedRecord, AccessOption.PERSISTENT);
}

// Get stats after update - should reflect new version from updated cache
org.apache.zookeeper.data.Stat[] updatedStats = accessor.getStats(paths, 0);
for (int i = 0; i < 3; i++) {
Assert.assertEquals(updatedStats[i].getVersion(), 1, "Version should be 1 after update");
}

// Verify cache is updated with new version
for (int i = 0; i < 3; i++) {
String path = curStatePath + "/session_0/TestDB" + i;
Assert.assertNotNull(accessor._zkCache._cache.get(path),
"Cache should still contain TestDB" + i);
Assert.assertEquals(
accessor._zkCache._cache.get(path).getStat().getVersion(), 1,
"Cached version should be updated to 1 for TestDB" + i);
// Verify the cached stat matches what getStats returned
Assert.assertEquals(
accessor._zkCache._cache.get(path).getStat().getVersion(),
updatedStats[i].getVersion(),
"Cached stat should match returned stat for TestDB" + i);
}

deleteCluster(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
Loading