Skip to content

Commit 73ad9b5

Browse files
HADOOP-19864 Followup: rejection of unregistered protocols. (#8470)
Server will immediately reject any protocols for which no handler is registered. --------- Contributed by Steve Loughran Co-authored-by: Cheng Pan <pan3793@gmail.com> Contains content written by github copilot
1 parent ff66216 commit 73ad9b5

3 files changed

Lines changed: 66 additions & 15 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,9 +1095,22 @@ Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RPC.RpcKind rpcKind) {
10951095
new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
10961096
}
10971097
}
1098-
return protocolImplMapArray.get(rpcKind.ordinal());
1098+
return protocolImplMapArray.get(rpcKind.ordinal());
10991099
}
1100-
1100+
1101+
/**
1102+
* Returns {@code true} only if at least one protocol has been registered
1103+
* on this server instance for the given {@link RPC.RpcKind}.
1104+
* Used to reject incoming requests for unsupported RPC kinds before any
1105+
* deserialization of the request payload takes place.
1106+
* @param rpcKind the RPC kind from the incoming request header.
1107+
* @return {@code true} if at least one protocol is registered for this kind.
1108+
*/
1109+
boolean hasRegisteredProtocols(RPC.RpcKind rpcKind) {
1110+
Map<ProtoNameVer, ProtoClassProtoImpl> implMap = getProtocolImplMap(rpcKind);
1111+
return implMap != null && !implMap.isEmpty();
1112+
}
1113+
11011114
// Register protocol and its impl for rpc calls
11021115
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
11031116
Object protocolImpl) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -350,13 +350,13 @@ public static Server get() {
350350
* after the call returns.
351351
*/
352352
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
353-
353+
354354
/** @return Get the current call. */
355355
@VisibleForTesting
356356
public static ThreadLocal<Call> getCurCall() {
357357
return CurCall;
358358
}
359-
359+
360360
/**
361361
* Returns the currently active RPC call's sequential ID number. A negative
362362
* call ID indicates an invalid value, such as if there is no currently active
@@ -2869,28 +2869,41 @@ private void checkRpcHeaders(RpcRequestHeaderProto header)
28692869
private void processRpcRequest(RpcRequestHeaderProto header,
28702870
RpcWritable.Buffer buffer) throws RpcServerException,
28712871
InterruptedException {
2872-
Class<? extends Writable> rpcRequestClass =
2872+
// Reject requests for RPC kinds with no registered protocols on this
2873+
// server instance. This prevents deserialization of untrusted payloads
2874+
// for unsupported kinds. See HADOOP-19864.
2875+
if (Server.this instanceof RPC.Server server) {
2876+
RPC.Server server = (RPC.ServerServer.this;
2877+
final RPC.RpcKind kind = ProtoUtil.convert(header.getRpcKind());
2878+
if (!server.hasRegisteredProtocols(kind)) {
2879+
final String err = "No protocols registered on this server for RpcKind "
2880+
+ header.getRpcKind()
2881+
+ ". Rejecting request without deserialization.";
2882+
LOG.info("{} Client: {}", err, getHostAddress());
2883+
throw new FatalRpcServerException(
2884+
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
2885+
}
2886+
}
2887+
Class<? extends Writable> rpcRequestClass =
28732888
getRpcRequestWrapper(header.getRpcKind());
28742889
if (rpcRequestClass == null) {
2875-
LOG.warn("Unknown rpc kind " + header.getRpcKind() +
2876-
" from client " + getHostAddress());
2877-
final String err = "Unknown rpc kind in rpc header" +
2878-
header.getRpcKind();
2890+
LOG.warn("Unknown rpc kind {} from client {}", header.getRpcKind(), getHostAddress());
28792891
throw new FatalRpcServerException(
2880-
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
2892+
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
2893+
"Unknown rpc kind in rpc header " + header.getRpcKind());
28812894
}
28822895
Writable rpcRequest;
28832896
try { //Read the rpc request
28842897
rpcRequest = buffer.newInstance(rpcRequestClass, conf);
28852898
} catch (RpcServerException rse) { // lets tests inject failures.
28862899
throw rse;
28872900
} catch (Throwable t) { // includes runtime exception from newInstance
2888-
LOG.warn("Unable to read call parameters for client " +
2889-
getHostAddress() + "on connection protocol " +
2890-
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
2891-
String err = "IPC server unable to read call parameters: "+ t.getMessage();
2901+
LOG.warn(
2902+
"Unable to read call parameters for client {} on connection protocol {} for rpcKind {}",
2903+
getHostAddress(), this.protocolName, header.getRpcKind(), t);
28922904
throw new FatalRpcServerException(
2893-
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
2905+
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
2906+
"IPC server unable to read call parameters: " + t.getMessage());
28942907
}
28952908

28962909
Span span = null;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2099,6 +2099,31 @@ public void testNumTotalRequestsMetrics() throws Exception {
20992099
}
21002100

21012101

2102+
/**
2103+
* Test that a Protobuf-only RPC server rejects requests for RpcKinds
2104+
* that have no registered protocols, without deserializing the payload.
2105+
*/
2106+
@Test
2107+
@Timeout(value = 30)
2108+
public void testUnregisteredRpcKindRejectedWithoutDeserialization()
2109+
throws Exception {
2110+
// Standard test server: only RPC_PROTOCOL_BUFFER protocols are registered.
2111+
RPC.Server server = setupTestServer(conf, 1);
2112+
try {
2113+
// RPC_PROTOCOL_BUFFER has registered protocols — must be accepted.
2114+
assertThat(server.hasRegisteredProtocols(RPC.RpcKind.RPC_PROTOCOL_BUFFER))
2115+
.as("RPC_PROTOCOL_BUFFER should have registered protocols")
2116+
.isTrue();
2117+
2118+
// RPC_BUILTIN has no protocols registered on this server — must be rejected.
2119+
assertThat(server.hasRegisteredProtocols(RPC.RpcKind.RPC_BUILTIN))
2120+
.as("RPC_BUILTIN should have no registered protocols on a Protobuf-only server")
2121+
.isFalse();
2122+
} finally {
2123+
server.stop();
2124+
}
2125+
}
2126+
21022127
public static void main(String[] args) throws Exception {
21032128
new TestRPC().testCallsInternal(conf);
21042129
}

0 commit comments

Comments
 (0)