Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,22 @@ private void runOnce(boolean drainAll) throws Exception {
sendLookups(lookups);
}

private void sendLookups(List<AbstractLookupQuery<?>> lookups) {
private void sendLookups(List<AbstractLookupQuery<?>> lookups) throws Exception {
if (lookups.isEmpty()) {
return;
}
// group by <leader, lookup type> to lookup batches
Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> lookupBatches =
groupByLeaderAndType(lookups);

// if no lookup batches, sleep a bit to avoid busy loop. This case will happen when there is
// no leader for all the lookup request in queue.
if (lookupBatches.isEmpty() && !lookupQueue.hasUnDrained()) {
// TODO: may use wait/notify mechanism to avoid active sleep, and use a dynamic sleep
// time based on the request waited time.
Thread.sleep(100);
}

// now, send the batches
lookupBatches.forEach(
(destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch));
Expand All @@ -148,11 +157,10 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
// lookup the leader node
TableBucket tb = lookup.tableBucket();
try {
// TODO this can be a re-triable operation. We should retry here instead of
// throwing exception.
leader = metadataUpdater.leaderFor(tb);
} catch (Exception e) {
lookup.future().completeExceptionally(e);
// if leader is not found, re-enqueue the lookup to send again.
reEnqueueLookup(lookup);
continue;
}
lookupBatchesByLeader
Expand All @@ -165,24 +173,16 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
@VisibleForTesting
void sendLookups(
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
if (gateway == null) {
// TODO handle this exception, like retry.
throw new LeaderNotAvailableException(
"Server " + destination + " is not found in metadata cache.");
}

if (lookupType == LookupType.LOOKUP) {
sendLookupRequest(destination, gateway, lookupBatches);
sendLookupRequest(destination, lookupBatches);
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
sendPrefixLookupRequest(destination, gateway, lookupBatches);
sendPrefixLookupRequest(destination, lookupBatches);
} else {
throw new IllegalArgumentException("Unsupported lookup type: " + lookupType);
}
}

private void sendLookupRequest(
int destination, TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) {
private void sendLookupRequest(int destination, List<AbstractLookupQuery<?>> lookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
Expand All @@ -195,6 +195,19 @@ private void sendLookupRequest(
.addLookup(lookup);
}

TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
if (gateway == null) {
lookupByTableId.forEach(
(tableId, lookupsByBucket) ->
handleLookupRequestException(
new LeaderNotAvailableException(
"Server "
+ destination
+ " is not found in metadata cache."),
destination,
lookupsByBucket));
}

lookupByTableId.forEach(
(tableId, lookupsByBucket) ->
sendLookupRequestAndHandleResponse(
Expand All @@ -206,9 +219,7 @@ private void sendLookupRequest(
}

private void sendPrefixLookupRequest(
int destination,
TabletServerGateway gateway,
List<AbstractLookupQuery<?>> prefixLookups) {
int destination, List<AbstractLookupQuery<?>> prefixLookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
Expand All @@ -221,6 +232,19 @@ private void sendPrefixLookupRequest(
.addLookup(prefixLookup);
}

TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
if (gateway == null) {
lookupByTableId.forEach(
(tableId, lookupsByBucket) ->
handlePrefixLookupException(
new LeaderNotAvailableException(
"Server "
+ destination
+ " is not found in metadata cache."),
destination,
lookupsByBucket));
}

lookupByTableId.forEach(
(tableId, prefixLookupBatch) ->
sendPrefixLookupRequestAndHandleResponse(
Expand Down Expand Up @@ -396,7 +420,6 @@ private void handlePrefixLookupException(
}

private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
lookup.incrementRetries();
lookupQueue.appendLookup(lookup);
}

Expand Down Expand Up @@ -455,6 +478,7 @@ private void handleLookupError(
tableBucket,
maxRetries - lookup.retries(),
error.formatErrMsg());
lookup.incrementRetries();
reEnqueueLookup(lookup);
} else {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ void testSendLookupRequestWithNotLeaderOrFollowerException() throws Exception {
assertThat(result).isNotDone();
lookupQueue.appendLookup(lookupQuery);

// Wait for all retries to complete and verify it eventually fails
assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
.isInstanceOf(ExecutionException.class)
.hasMessageContaining("Leader not found after retry");
// Wait for all retries to complete and verify it eventually fails. This case will be failed
// after timeout.
assertThatThrownBy(() -> result.get(2, TimeUnit.SECONDS))
.isInstanceOf(java.util.concurrent.TimeoutException.class);

// Verify that retries happened (should be 1, because server meta invalidated)
assertThat(lookupQuery.retries()).isEqualTo(1);
Expand Down Expand Up @@ -173,10 +173,10 @@ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws Except
assertThat(future).isNotDone();
lookupQueue.appendLookup(prefixLookupQuery);

// Wait for all retries to complete and verify it eventually fails
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
.isInstanceOf(ExecutionException.class)
.hasMessageContaining("Leader not found after retry");
// Wait for all retries to complete and verify it eventually fails. This case will be failed
// after timeout.
assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
.isInstanceOf(java.util.concurrent.TimeoutException.class);

// Verify that retries happened (should be 1, because server meta invalidated)
assertThat(prefixLookupQuery.retries()).isEqualTo(1);
Expand Down