diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml
new file mode 100644
index 0000000000..cf10d852f1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+
+
+ apm-sdk-plugin
+ org.apache.skywalking
+ 9.3.0-SNAPSHOT
+
+ 4.0.0
+
+ apm-kafka-3.7.x-plugin
+
+
+ 3.7.0
+
+
+
+
+ org.apache.skywalking
+ apm-kafka-commons
+ ${project.version}
+ provided
+
+
+ org.apache.skywalking
+ apm-kafka-plugin
+ ${project.version}
+ provided
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka-clients.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java
new file mode 100644
index 0000000000..cca615cdc1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka37;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.kafka.ConsumerEnhanceRequiredInfo;
+import org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor;
+
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * kafak3.7 removed the method named pollForFetches with return type Fetch.
+ * When the kafka version < 3.7, the enhance method could be: org.apache.kafka.clients.consumer.KafkaConsumer#pollForFetches.
+ * But when the kafka version >= 3.7, the enhance method should be: org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration).
+ * And the return type was also changed, from org.apache.kafka.clients.consumer.internals.Fetch to org.apache.kafka.clients.consumer.ConsumerRecords.
+ */
+public class Kafka37ConsumerInterceptor extends KafkaConsumerInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ if (ret instanceof ConsumerRecords) {
+ ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ ConsumerRecords, ?> consumerRecords = (ConsumerRecords, ?>) ret;
+ if (consumerRecords.count() == 0) {
+ return ret;
+ }
+ for (ConsumerRecord, ?> consumerRecord : consumerRecords) {
+ if (consumerRecord == null) {
+ continue;
+ }
+ ContextCarrier contextCarrier = new ContextCarrier();
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ Iterator iterator = consumerRecord.headers().headers(next.getHeadKey()).iterator();
+ if (iterator.hasNext()) {
+ next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8));
+ }
+ }
+ String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId();
+ AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier).start(requiredInfo.getStartTime());
+ activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
+ SpanLayer.asMQ(activeSpan);
+ Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
+ Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
+ activeSpan.setPeer(requiredInfo.getBrokerServers());
+ ContextManager.stopSpan();
+ }
+ }
+ return ret;
+ }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java
new file mode 100644
index 0000000000..7b0a35e8e3
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka37.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.returns;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class Kafka37ConsumerInstrumentation extends KafkaConsumerInstrumentation {
+
+ public static final String ENHANCE_CLASS_37 = "org.apache.kafka.clients.consumer.KafkaConsumer";
+
+ public static final String INTERCEPTOR_CLASS_37 = "org.apache.skywalking.apm.plugin.kafka37.Kafka37ConsumerInterceptor";
+
+ // Kafka 3.7.x's pull message method's name is "poll"
+ public static final String ENHANCE_METHOD_37 = "poll";
+
+ // Kafka 3.7.x's pull message method's return type is "ConsumerRecords"
+ public static final String ENHANCE_RETURN_TYPE_37 = "org.apache.kafka.clients.consumer.ConsumerRecords";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS_37);
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named(ENHANCE_METHOD_37).and(returns(named(ENHANCE_RETURN_TYPE_37)));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS_37;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..38dd523050
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
+kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka37.define.Kafka37ConsumerInstrumentation
+kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
+kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
+kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java
new file mode 100644
index 0000000000..cef56ed627
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.kafka37.Kafka37ConsumerInterceptor;
+import org.hamcrest.MatcherAssert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_CONSUMER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+@RunWith(TracingSegmentRunner.class)
+public class Kafka37ConsumerInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+ @Rule
+ public MockitoRule rule = MockitoJUnit.rule();
+
+ private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+ private Kafka37ConsumerInterceptor consumerInterceptor;
+
+ private EnhancedInstance consumerInstance = new EnhancedInstance() {
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return consumerEnhanceRequiredInfo;
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(Object value) {
+ consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) value;
+ }
+ };
+
+ private Map> messages;
+
+ @Before
+ public void setUp() {
+ consumerInterceptor = new Kafka37ConsumerInterceptor();
+ consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
+
+ List topics = new ArrayList();
+ topics.add("test");
+ topics.add("test-1");
+ consumerEnhanceRequiredInfo.setTopics(topics);
+ List brokers = new ArrayList();
+ brokers.add("localhost:9092");
+ brokers.add("localhost:19092");
+ consumerEnhanceRequiredInfo.setBrokerServers(brokers);
+ consumerEnhanceRequiredInfo.setGroupId("test");
+
+ messages = new HashMap>();
+ TopicPartition topicPartition = new TopicPartition("test", 1);
+ List records = new ArrayList();
+ ConsumerRecord consumerRecord = new ConsumerRecord("test", 1, 0, "1", "1");
+ consumerRecord.headers()
+ .add(
+ SW8CarrierItem.HEADER_NAME,
+ "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
+ .getBytes()
+ );
+ records.add(consumerRecord);
+ messages.put(topicPartition, records);
+ }
+
+ @Test
+ public void testConsumerWithoutMessage() throws Throwable {
+ consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null);
+ consumerInterceptor.afterMethod(
+ consumerInstance, null, new Object[0], new Class[0], new ConsumerRecords(messages));
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(0));
+ }
+
+ @Test
+ public void testConsumerWithMessage() throws Throwable {
+ consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null);
+ consumerInterceptor.afterMethod(consumerInstance, null, new Object[0], new Class[0], messages);
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(1));
+
+ TraceSegment traceSegment = traceSegments.get(0);
+ assertNotNull(traceSegment.getRef());
+ assertTraceSegmentRef(traceSegment.getRef());
+
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+ assertConsumerSpan(spans.get(0));
+ }
+
+ private void assertConsumerSpan(AbstractTracingSpan span) {
+ SpanAssert.assertLayer(span, SpanLayer.MQ);
+ SpanAssert.assertComponent(span, KAFKA_CONSUMER);
+ SpanAssert.assertTagSize(span, 2);
+ SpanAssert.assertTag(span, 0, "localhost:9092;localhost:19092");
+ SpanAssert.assertTag(span, 1, "test;test-1");
+ }
+
+ private void assertTraceSegmentRef(TraceSegmentRef ref) {
+ MatcherAssert.assertThat(SegmentRefHelper.getParentServiceInstance(ref), is("instance"));
+ MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
+ MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("3.4.5"));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index bc2aa1f68c..4ba8783058 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -62,6 +62,7 @@
httpasyncclient-4.x-plugin
kafka-commons
kafka-plugin
+ kafka-3.7.x-plugin
servicecomb-plugin
hystrix-1.x-plugin
sofarpc-plugin