Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/plugins-jdk17-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
- resteasy-6.x-scenario
- gateway-4.x-scenario
- httpexchange-scenario
- activemq-artemis-2.x-scenario
steps:
- uses: actions/checkout@v2
with:
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Release Notes.
* Support tracing for async producing, batch sync consuming, and batch async consuming in rocketMQ-client-java-5.x-plugin.
* Convert the Redisson span into an async span.
* Rename system env name from `sw_plugin_kafka_producer_config` to `SW_PLUGIN_KAFKA_PRODUCER_CONFIG`.
* Support for ActiveMQ-Artemis messaging tracing.

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>9.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-activemq-artemis-jakarta-client-2.x-plugin</artifactId>
<name>activemq-artemis-jakarta-client-2.x-plugin</name>
<packaging>jar</packaging>

<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<artemis-jakarta-client.version>2.31.2</artemis-jakarta-client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jakarta-client</artifactId>
<version>${artemis-jakarta-client.version}</version>
<scope>provided</scope>
<exclusions>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;

import java.util.Map;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;

/**
* {@link MessageConsumerConstructorInterceptor} get enhance data from the constructor of {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer}
*/
public class MessageConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_PORT = "61616";
private static final String HOST_KEY = "host";
private static final String PORT_KEY = "port";

@Override
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
ActiveMQConnection connection = (ActiveMQConnection) allArguments[1];
ActiveMQDestination destination = (ActiveMQDestination) allArguments[5];
Map<String, Object> paramMap = connection.getSessionFactory().getConnectorConfiguration().getParams();
EnhanceInfo enhanceInfo = new EnhanceInfo();
enhanceInfo.setBrokerUrl(paramMap.getOrDefault(HOST_KEY, DEFAULT_HOST) + ":" + paramMap.getOrDefault(PORT_KEY
, DEFAULT_PORT));
enhanceInfo.setName(destination.getName());
enhanceInfo.setAddress(destination.getAddress());
enhanceInfo.setType(destination.getType());
objInst.setSkyWalkingDynamicField(enhanceInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;

import java.lang.reflect.Method;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;

/**
* {@link MessageConsumerInterceptor} create entry span when the method {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer#getMessage(long, boolean)} execute
*/
public class MessageConsumerInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "ActiveMQ/";
private static final String CONSUMER_OPERATION_NAME_SUFFIX = "/Consumer";
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
private static final String QUEUE = "Queue";
private static final String TOPIC = "Topic";

@Override
public void beforeMethod(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] classes,
final MethodInterceptResult methodInterceptResult) throws Throwable {
}

@Override
public Object afterMethod(final EnhancedInstance objInst,
final Method method,
final Object[] objects,
final Class<?>[] classes,
final Object ret) throws Throwable {
ActiveMQMessage message = (ActiveMQMessage) ret;
if (message == null) {
return ret;
}
ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
EnhanceInfo enhanceInfo = (EnhanceInfo) objInst.getSkyWalkingDynamicField();
boolean queue = isQueue(enhanceInfo.getType());
AbstractSpan activeSpan = ContextManager.createEntrySpan(
buildOperationName(queue, enhanceInfo.getName()),
contextCarrier
);
Tags.MQ_BROKER.set(activeSpan, enhanceInfo.getBrokerUrl());
if (queue) {
Tags.MQ_QUEUE.set(activeSpan, enhanceInfo.getName());
} else {
Tags.MQ_TOPIC.set(activeSpan, enhanceInfo.getName());
}
activeSpan.tag(MQ_MESSAGE_ID, message.getJMSMessageID());
activeSpan.setPeer(enhanceInfo.getBrokerUrl());
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
ContextManager.stopSpan(activeSpan);
return ret;
}

@Override
public void handleMethodException(final EnhancedInstance enhancedInstance,
final Method method,
final Object[] objects,
final Class<?>[] classes,
final Throwable t) {
ContextManager.activeSpan().log(t);
}

private ContextCarrier getContextCarrierFromMessage(ActiveMQMessage message) {
ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(message.getCoreMessage().getStringProperty(next.getHeadKey().replace("-", "_")));
}

return contextCarrier;
}

private boolean isQueue(ActiveMQDestination.TYPE type) {
return ActiveMQDestination.TYPE.QUEUE.equals(type) || ActiveMQDestination.TYPE.TEMP_QUEUE.equals(type);
}

private String buildOperationName(boolean isQueue, String name) {
if (isQueue) {
return OPERATION_NAME_PREFIX + QUEUE + "/" + name + CONSUMER_OPERATION_NAME_SUFFIX;
} else {
return OPERATION_NAME_PREFIX + TOPIC + "/" + name + CONSUMER_OPERATION_NAME_SUFFIX;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;

import java.util.Map;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;

/**
* {@link MessageProducerConstructorInterceptor} get enhance data from the constructor of {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer}
*/
public class MessageProducerConstructorInterceptor implements InstanceConstructorInterceptor {
private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_PORT = "61616";
private static final String HOST_KEY = "host";
private static final String PORT_KEY = "port";

@Override
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
ActiveMQConnection connection = (ActiveMQConnection) allArguments[0];
Map<String, Object> paramMap = connection.getSessionFactory().getConnectorConfiguration().getParams();
ActiveMQDestination destination = (ActiveMQDestination) allArguments[2];
EnhanceInfo enhanceInfo = new EnhanceInfo();
enhanceInfo.setBrokerUrl(paramMap.getOrDefault(HOST_KEY, DEFAULT_HOST) + ":" + paramMap.getOrDefault(PORT_KEY
, DEFAULT_PORT));
enhanceInfo.setName(destination.getName());
enhanceInfo.setAddress(destination.getAddress());
enhanceInfo.setType(destination.getType());
objInst.setSkyWalkingDynamicField(enhanceInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;

import jakarta.jms.Message;
import java.lang.reflect.Method;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;

/**
* {@link MessageProducerInterceptor} create exit span when the method {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer#doSendx}
* execute
*/
public class MessageProducerInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "ActiveMQ/";
private static final String PRODUCER_OPERATION_NAME_SUFFIX = "/Producer";
private static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
private static final String QUEUE = "Queue";
private static final String TOPIC = "Topic";

@Override
public void beforeMethod(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] classes,
final MethodInterceptResult methodInterceptResult) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
Message message = (Message) allArguments[1];
EnhanceInfo enhanceInfo = (EnhanceInfo) objInst.getSkyWalkingDynamicField();
boolean queue = isQueue(enhanceInfo.getType());
AbstractSpan activeSpan = ContextManager.createExitSpan(
buildOperationName(queue, enhanceInfo.getName()),
contextCarrier, enhanceInfo.getBrokerUrl()
);
contextCarrier.extensionInjector().injectSendingTimestamp();
Tags.MQ_BROKER.set(activeSpan, enhanceInfo.getBrokerUrl());
if (queue) {
Tags.MQ_QUEUE.set(activeSpan, enhanceInfo.getName());
} else {
Tags.MQ_TOPIC.set(activeSpan, enhanceInfo.getName());
}
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER);
CarrierItem next = contextCarrier.items();

while (next.hasNext()) {
next = next.next();
message.setStringProperty(next.getHeadKey().replace("-", "_"), next.getHeadValue());
}
}

@Override
public Object afterMethod(final EnhancedInstance enhancedInstance,
final Method method,
final Object[] allArguments,
final Class<?>[] classes,
final Object ret) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
Message message = (Message) allArguments[1];
activeSpan.tag(MQ_MESSAGE_ID, message.getJMSMessageID());
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(final EnhancedInstance enhancedInstance,
final Method method,
final Object[] objects,
final Class<?>[] classes,
final Throwable t) {
ContextManager.activeSpan().log(t);
}

private boolean isQueue(ActiveMQDestination.TYPE type) {
return ActiveMQDestination.TYPE.QUEUE.equals(type) || ActiveMQDestination.TYPE.TEMP_QUEUE.equals(type);
}

private String buildOperationName(boolean isQueue, String name) {
if (isQueue) {
return OPERATION_NAME_PREFIX + QUEUE + "/" + name + PRODUCER_OPERATION_NAME_SUFFIX;
} else {
return OPERATION_NAME_PREFIX + TOPIC + "/" + name + PRODUCER_OPERATION_NAME_SUFFIX;
}
}
}
Loading