diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 2be1a4f769..49dab5e987 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -671,10 +671,63 @@ public List get(List paths, List stats, int options, boolean th return _baseAccessor.get(serverPaths, stats, options, throwException); } - // TODO: add cache @Override public Stat[] getStats(List paths, int options) { + if (paths == null || paths.isEmpty()) { + return new Stat[0]; + } + + final int size = paths.size(); List serverPaths = prependChroot(paths); + + Stat[] stats = new Stat[size]; + + boolean needRead = false; + boolean[] needReads = new boolean[size]; // init to false + + Cache cache = getCache(serverPaths); + if (cache != null) { + try { + cache.lockRead(); + // Try to get stats from cache first + for (int i = 0; i < size; i++) { + ZNode zNode = cache.get(serverPaths.get(i)); + if (zNode != null) { + stats[i] = zNode.getStat(); + } else { + needRead = true; + needReads[i] = true; + } + } + } finally { + cache.unlockRead(); + } + + // cache miss, fall back to zk and update cache + if (needRead) { + cache.lockWrite(); + try { + // Batch read data and stats for cache misses to populate cache + List readStatsList = new ArrayList<>(Collections.nCopies(size, null)); + List readRecords = _baseAccessor.get(serverPaths, readStatsList, needReads, false); + + for (int i = 0; i < size; i++) { + if (needReads[i]) { + stats[i] = readStatsList.get(i); + if (readStatsList.get(i) != null) { + cache.update(serverPaths.get(i), readRecords.get(i), readStatsList.get(i)); + } + } + } + } finally { + cache.unlockWrite(); + } + } + + return stats; + } + + // no cache return _baseAccessor.getStats(serverPaths, options); } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java index bd2b381de3..8bf5955532 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java @@ -187,4 +187,137 @@ public void testZkCacheCallbackExternalOpNoChroot() throws Exception { deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } + + @Test + public void testGetStatsWithCache() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + String curStatePath = PropertyPathBuilder.instanceCurrentState(clusterName, "localhost_8901"); + ZkBaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_gZkClient); + + List cachePaths = Arrays.asList(curStatePath); + ZkCacheBaseDataAccessor accessor = + new ZkCacheBaseDataAccessor<>(baseAccessor, null, null, cachePaths); + + // Create 5 current state nodes + List paths = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + paths.add(path); + boolean success = accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT); + Assert.assertTrue(success, "Should succeed in create: " + path); + } + + // First call - populates cache + org.apache.zookeeper.data.Stat[] stats = accessor.getStats(paths, 0); + Assert.assertNotNull(stats); + Assert.assertEquals(stats.length, 5); + for (int i = 0; i < 5; i++) { + Assert.assertNotNull(stats[i], "Stat should exist for TestDB" + i); + Assert.assertEquals(stats[i].getVersion(), 0); + } + + // Verify cache is populated after first getStats call + for (int i = 0; i < 5; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + Assert.assertNotNull(accessor._zkCache._cache.get(path), + "Cache should contain TestDB" + i + " after first getStats call"); + } + + // Second call - should hit cache + org.apache.zookeeper.data.Stat[] cachedStats = accessor.getStats(paths, 0); + Assert.assertEquals(cachedStats.length, 5); + for (int i = 0; i < 5; i++) { + Assert.assertNotNull(cachedStats[i]); + Assert.assertEquals(cachedStats[i].getVersion(), 0); + } + + // Verify data still in cache and matches + for (int i = 0; i < 5; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + Assert.assertNotNull(accessor._zkCache._cache.get(path), + "Cache should still contain TestDB" + i); + // Verify we got the same stat from cache (version should match) + Assert.assertEquals( + accessor._zkCache._cache.get(path).getStat().getVersion(), + cachedStats[i].getVersion(), + "Cached stat version should match returned stat for TestDB" + i); + } + + deleteCluster(clusterName); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testGetStatsAfterUpdate() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + String curStatePath = PropertyPathBuilder.instanceCurrentState(clusterName, "localhost_8901"); + ZkBaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_gZkClient); + + List cachePaths = Arrays.asList(curStatePath); + ZkCacheBaseDataAccessor accessor = + new ZkCacheBaseDataAccessor<>(baseAccessor, null, null, cachePaths); + + // Create nodes + List paths = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + paths.add(path); + accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT); + } + + // Get initial stats - populates cache + org.apache.zookeeper.data.Stat[] initialStats = accessor.getStats(paths, 0); + for (int i = 0; i < 3; i++) { + Assert.assertEquals(initialStats[i].getVersion(), 0); + } + + // Verify cache is populated with version 0 + for (int i = 0; i < 3; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + Assert.assertNotNull(accessor._zkCache._cache.get(path), + "Cache should contain TestDB" + i); + Assert.assertEquals( + accessor._zkCache._cache.get(path).getStat().getVersion(), 0, + "Cached version should be 0 for TestDB" + i); + } + + // Update nodes + for (int i = 0; i < 3; i++) { + ZNRecord updatedRecord = new ZNRecord("TestDB" + i); + updatedRecord.setSimpleField("key", "value"); + accessor.set(paths.get(i), updatedRecord, AccessOption.PERSISTENT); + } + + // Get stats after update - should reflect new version from updated cache + org.apache.zookeeper.data.Stat[] updatedStats = accessor.getStats(paths, 0); + for (int i = 0; i < 3; i++) { + Assert.assertEquals(updatedStats[i].getVersion(), 1, "Version should be 1 after update"); + } + + // Verify cache is updated with new version + for (int i = 0; i < 3; i++) { + String path = curStatePath + "/session_0/TestDB" + i; + Assert.assertNotNull(accessor._zkCache._cache.get(path), + "Cache should still contain TestDB" + i); + Assert.assertEquals( + accessor._zkCache._cache.get(path).getStat().getVersion(), 1, + "Cached version should be updated to 1 for TestDB" + i); + // Verify the cached stat matches what getStats returned + Assert.assertEquals( + accessor._zkCache._cache.get(path).getStat().getVersion(), + updatedStats[i].getVersion(), + "Cached stat should match returned stat for TestDB" + i); + } + + deleteCluster(clusterName); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } }