Skip to content

Commit 3feef5a

Browse files
committed
Fixed an issue where intercept methods for kafka3.7 were invalid
1 parent 2e08217 commit 3feef5a

File tree

2 files changed

+178
-83
lines changed

2 files changed

+178
-83
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.kafka;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.internals.Fetch;
24+
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.header.Header;
26+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
27+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
28+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
29+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
30+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
31+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
33+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
34+
35+
import java.lang.reflect.Method;
36+
import java.nio.charset.StandardCharsets;
37+
import java.util.Iterator;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
public class Kafka3_7ConsumerInterceptor extends KafkaConsumerInterceptor {
42+
43+
@Override
44+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
45+
if (ret instanceof ConsumerRecords) {
46+
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
47+
ConsumerRecords<?, ?> consumerRecords = (ConsumerRecords<?, ?>) ret;
48+
if (consumerRecords.count() == 0) {
49+
return ret;
50+
}
51+
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
52+
if (consumerRecord == null) {
53+
continue;
54+
}
55+
ContextCarrier contextCarrier = new ContextCarrier();
56+
CarrierItem next = contextCarrier.items();
57+
while (next.hasNext()) {
58+
next = next.next();
59+
Iterator<Header> iterator = consumerRecord.headers().headers(next.getHeadKey()).iterator();
60+
if (iterator.hasNext()) {
61+
next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8));
62+
}
63+
}
64+
String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId();
65+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier).start(requiredInfo.getStartTime());
66+
activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
67+
SpanLayer.asMQ(activeSpan);
68+
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
69+
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
70+
activeSpan.setPeer(requiredInfo.getBrokerServers());
71+
ContextManager.stopSpan();
72+
}
73+
}
74+
return ret;
75+
}
76+
}

apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java

Lines changed: 102 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
5050
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
5151
public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";
5252
public static final String ENHANCE_METHOD = "pollOnce";
53+
public static final String ENHANCE_METHOD_KAFKA3_7 = "poll";
54+
public static final String INTERCEPTOR_CLASS_KAFKA3_7 = "org.apache.skywalking.apm.plugin.kafka.Kafka3_7ConsumerInterceptor";
5355
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
5456
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
5557
public static final String SUBSCRIBE_METHOD = "subscribe";
@@ -62,104 +64,121 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
6264

6365
@Override
6466
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
65-
return new ConstructorInterceptPoint[] {
66-
new ConstructorInterceptPoint() {
67-
@Override
68-
public ElementMatcher<MethodDescription> getConstructorMatcher() {
69-
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
70-
}
67+
return new ConstructorInterceptPoint[]{
68+
new ConstructorInterceptPoint() {
69+
@Override
70+
public ElementMatcher<MethodDescription> getConstructorMatcher() {
71+
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
72+
}
7173

72-
@Override
73-
public String getConstructorInterceptor() {
74-
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
75-
}
76-
},
77-
new ConstructorInterceptPoint() {
78-
@Override
79-
public ElementMatcher<MethodDescription> getConstructorMatcher() {
80-
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
81-
}
82-
83-
@Override
84-
public String getConstructorInterceptor() {
85-
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
86-
}
87-
},
74+
@Override
75+
public String getConstructorInterceptor() {
76+
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
77+
}
78+
},
79+
new ConstructorInterceptPoint() {
80+
@Override
81+
public ElementMatcher<MethodDescription> getConstructorMatcher() {
82+
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
83+
}
84+
85+
@Override
86+
public String getConstructorInterceptor() {
87+
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
88+
}
89+
},
8890

8991
};
9092
}
9193

9294
@Override
9395
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
94-
return new InstanceMethodsInterceptPoint[] {
95-
new InstanceMethodsInterceptPoint() {
96-
@Override
97-
public ElementMatcher<MethodDescription> getMethodsMatcher() {
98-
// targeting Kafka Client < 3.2
99-
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
100-
}
96+
return new InstanceMethodsInterceptPoint[]{
97+
new InstanceMethodsInterceptPoint() {
98+
@Override
99+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
100+
// targeting Kafka Client < 3.2
101+
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
102+
}
101103

102-
@Override
103-
public String getMethodsInterceptor() {
104-
return INTERCEPTOR_CLASS;
105-
}
104+
@Override
105+
public String getMethodsInterceptor() {
106+
return INTERCEPTOR_CLASS;
107+
}
106108

107-
@Override
108-
public boolean isOverrideArgs() {
109-
return false;
110-
}
111-
},
112-
new InstanceMethodsInterceptPoint() {
113-
@Override
114-
public ElementMatcher<MethodDescription> getMethodsMatcher() {
115-
// targeting Kafka Client >= 3.2
116-
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
117-
}
109+
@Override
110+
public boolean isOverrideArgs() {
111+
return false;
112+
}
113+
},
114+
new InstanceMethodsInterceptPoint() {
115+
@Override
116+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
117+
// targeting Kafka Client >= 3.2
118+
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
119+
}
118120

119-
@Override
120-
public String getMethodsInterceptor() {
121-
return INTERCEPTOR_CLASS_KAFKA3_2;
122-
}
121+
@Override
122+
public String getMethodsInterceptor() {
123+
return INTERCEPTOR_CLASS_KAFKA3_2;
124+
}
123125

124-
@Override
125-
public boolean isOverrideArgs() {
126-
return false;
127-
}
128-
},
129-
new InstanceMethodsInterceptPoint() {
130-
@Override
131-
public ElementMatcher<MethodDescription> getMethodsMatcher() {
132-
return named(SUBSCRIBE_METHOD)
133-
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
134-
}
126+
@Override
127+
public boolean isOverrideArgs() {
128+
return false;
129+
}
130+
},
131+
new InstanceMethodsInterceptPoint() {
132+
@Override
133+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
134+
// targeting Kafka Client >= 3.7
135+
return named(ENHANCE_METHOD_KAFKA3_7).and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords")));
136+
}
135137

136-
@Override
137-
public String getMethodsInterceptor() {
138-
return SUBSCRIBE_INTERCEPT_CLASS;
139-
}
138+
@Override
139+
public String getMethodsInterceptor() {
140+
return INTERCEPTOR_CLASS_KAFKA3_7;
141+
}
140142

141-
@Override
142-
public boolean isOverrideArgs() {
143-
return false;
144-
}
145-
},
146-
new InstanceMethodsInterceptPoint() {
147-
@Override
148-
public ElementMatcher<MethodDescription> getMethodsMatcher() {
149-
return named(SUBSCRIBE_METHOD)
150-
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
151-
}
143+
@Override
144+
public boolean isOverrideArgs() {
145+
return false;
146+
}
147+
},
148+
new InstanceMethodsInterceptPoint() {
149+
@Override
150+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
151+
return named(SUBSCRIBE_METHOD)
152+
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
153+
}
152154

153-
@Override
154-
public String getMethodsInterceptor() {
155-
return SUBSCRIBE_INTERCEPT_CLASS;
156-
}
155+
@Override
156+
public String getMethodsInterceptor() {
157+
return SUBSCRIBE_INTERCEPT_CLASS;
158+
}
157159

158-
@Override
159-
public boolean isOverrideArgs() {
160-
return false;
161-
}
162-
},
160+
@Override
161+
public boolean isOverrideArgs() {
162+
return false;
163+
}
164+
},
165+
new InstanceMethodsInterceptPoint() {
166+
@Override
167+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
168+
return named(SUBSCRIBE_METHOD)
169+
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
170+
}
171+
172+
@Override
173+
public String getMethodsInterceptor() {
174+
return SUBSCRIBE_INTERCEPT_CLASS;
175+
}
176+
177+
@Override
178+
public boolean isOverrideArgs() {
179+
return false;
180+
}
181+
},
163182
new InstanceMethodsInterceptPoint() {
164183
@Override
165184
public ElementMatcher<MethodDescription> getMethodsMatcher() {

0 commit comments

Comments
 (0)