diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml new file mode 100644 index 0000000000..cf10d852f1 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/pom.xml @@ -0,0 +1,54 @@ + + + + + + apm-sdk-plugin + org.apache.skywalking + 9.3.0-SNAPSHOT + + 4.0.0 + + apm-kafka-3.7.x-plugin + + + 3.7.0 + + + + + org.apache.skywalking + apm-kafka-commons + ${project.version} + provided + + + org.apache.skywalking + apm-kafka-plugin + ${project.version} + provided + + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + provided + + + \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java new file mode 100644 index 0000000000..cca615cdc1 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/Kafka37ConsumerInterceptor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka37; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.kafka.ConsumerEnhanceRequiredInfo; +import org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor; + +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; + +/** + * kafak3.7 removed the method named pollForFetches with return type Fetch. + * When the kafka version < 3.7, the enhance method could be: org.apache.kafka.clients.consumer.KafkaConsumer#pollForFetches. + * But when the kafka version >= 3.7, the enhance method should be: org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration). + * And the return type was also changed, from org.apache.kafka.clients.consumer.internals.Fetch to org.apache.kafka.clients.consumer.ConsumerRecords. + */ +public class Kafka37ConsumerInterceptor extends KafkaConsumerInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + if (ret instanceof ConsumerRecords) { + ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + ConsumerRecords consumerRecords = (ConsumerRecords) ret; + if (consumerRecords.count() == 0) { + return ret; + } + for (ConsumerRecord consumerRecord : consumerRecords) { + if (consumerRecord == null) { + continue; + } + ContextCarrier contextCarrier = new ContextCarrier(); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + Iterator
iterator = consumerRecord.headers().headers(next.getHeadKey()).iterator(); + if (iterator.hasNext()) { + next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8)); + } + } + String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(); + AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier).start(requiredInfo.getStartTime()); + activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER); + SpanLayer.asMQ(activeSpan); + Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers()); + Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics()); + activeSpan.setPeer(requiredInfo.getBrokerServers()); + ContextManager.stopSpan(); + } + } + return ret; + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java new file mode 100644 index 0000000000..7b0a35e8e3 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka37/define/Kafka37ConsumerInstrumentation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka37.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class Kafka37ConsumerInstrumentation extends KafkaConsumerInstrumentation { + + public static final String ENHANCE_CLASS_37 = "org.apache.kafka.clients.consumer.KafkaConsumer"; + + public static final String INTERCEPTOR_CLASS_37 = "org.apache.skywalking.apm.plugin.kafka37.Kafka37ConsumerInterceptor"; + + // Kafka 3.7.x's pull message method's name is "poll" + public static final String ENHANCE_METHOD_37 = "poll"; + + // Kafka 3.7.x's pull message method's return type is "ConsumerRecords" + public static final String ENHANCE_RETURN_TYPE_37 = "org.apache.kafka.clients.consumer.ConsumerRecords"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS_37); + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD_37).and(returns(named(ENHANCE_RETURN_TYPE_37))); + } + + @Override + public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS_37; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000..38dd523050 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka37.define.Kafka37ConsumerInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java new file mode 100644 index 0000000000..cef56ed627 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-3.7.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/Kafka37ConsumerInterceptorTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.plugin.kafka37.Kafka37ConsumerInterceptor; +import org.hamcrest.MatcherAssert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_CONSUMER; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +@RunWith(TracingSegmentRunner.class) +public class Kafka37ConsumerInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo; + + private Kafka37ConsumerInterceptor consumerInterceptor; + + private EnhancedInstance consumerInstance = new EnhancedInstance() { + @Override + public Object getSkyWalkingDynamicField() { + return consumerEnhanceRequiredInfo; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) value; + } + }; + + private Map> messages; + + @Before + public void setUp() { + consumerInterceptor = new Kafka37ConsumerInterceptor(); + consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo(); + + List topics = new ArrayList(); + topics.add("test"); + topics.add("test-1"); + consumerEnhanceRequiredInfo.setTopics(topics); + List brokers = new ArrayList(); + brokers.add("localhost:9092"); + brokers.add("localhost:19092"); + consumerEnhanceRequiredInfo.setBrokerServers(brokers); + consumerEnhanceRequiredInfo.setGroupId("test"); + + messages = new HashMap>(); + TopicPartition topicPartition = new TopicPartition("test", 1); + List records = new ArrayList(); + ConsumerRecord consumerRecord = new ConsumerRecord("test", 1, 0, "1", "1"); + consumerRecord.headers() + .add( + SW8CarrierItem.HEADER_NAME, + "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=" + .getBytes() + ); + records.add(consumerRecord); + messages.put(topicPartition, records); + } + + @Test + public void testConsumerWithoutMessage() throws Throwable { + consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null); + consumerInterceptor.afterMethod( + consumerInstance, null, new Object[0], new Class[0], new ConsumerRecords(messages)); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(0)); + } + + @Test + public void testConsumerWithMessage() throws Throwable { + consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null); + consumerInterceptor.afterMethod(consumerInstance, null, new Object[0], new Class[0], messages); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(1)); + + TraceSegment traceSegment = traceSegments.get(0); + assertNotNull(traceSegment.getRef()); + assertTraceSegmentRef(traceSegment.getRef()); + + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + assertConsumerSpan(spans.get(0)); + } + + private void assertConsumerSpan(AbstractTracingSpan span) { + SpanAssert.assertLayer(span, SpanLayer.MQ); + SpanAssert.assertComponent(span, KAFKA_CONSUMER); + SpanAssert.assertTagSize(span, 2); + SpanAssert.assertTag(span, 0, "localhost:9092;localhost:19092"); + SpanAssert.assertTag(span, 1, "test;test-1"); + } + + private void assertTraceSegmentRef(TraceSegmentRef ref) { + MatcherAssert.assertThat(SegmentRefHelper.getParentServiceInstance(ref), is("instance")); + MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3)); + MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("3.4.5")); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index bc2aa1f68c..4ba8783058 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -62,6 +62,7 @@ httpasyncclient-4.x-plugin kafka-commons kafka-plugin + kafka-3.7.x-plugin servicecomb-plugin hystrix-1.x-plugin sofarpc-plugin