Skip to content

Commit 2027a98

Browse files
Support async profiler feature (#720)
1 parent 576550a commit 2027a98

File tree

16 files changed

+866
-3
lines changed

16 files changed

+866
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.network.trace.component.command;
20+
21+
import org.apache.skywalking.apm.network.common.v3.Command;
22+
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
23+
24+
import java.util.List;
25+
import java.util.Objects;
26+
27+
public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable<AsyncProfilerTaskCommand> {
28+
public static final Deserializable<AsyncProfilerTaskCommand> DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0);
29+
public static final String NAME = "AsyncProfilerTaskQuery";
30+
31+
/**
32+
* async-profiler taskId
33+
*/
34+
private final String taskId;
35+
/**
36+
* run profiling for duration (second)
37+
*/
38+
private final int duration;
39+
/**
40+
* async profiler extended parameters. Here is a table of optional parameters.
41+
*
42+
* <p>lock[=DURATION] - profile contended locks overflowing the DURATION ns bucket (default: 10us)</p>
43+
* <p>alloc[=BYTES] - profile allocations with BYTES interval</p>
44+
* <p>interval=N - sampling interval in ns (default: 10'000'000, i.e. 10 ms)</p>
45+
* <p>jstackdepth=N - maximum Java stack depth (default: 2048)</p>
46+
* <p>chunksize=N - approximate size of JFR chunk in bytes (default: 100 MB) </p>
47+
* <p>chunktime=N - duration of JFR chunk in seconds (default: 1 hour) </p>
48+
* details @see <a href="https://github.com/async-profiler/async-profiler/blob/master/src/arguments.cpp#L44">async-profiler argument</a>
49+
*/
50+
private final String execArgs;
51+
/**
52+
* task create time
53+
*/
54+
private final long createTime;
55+
56+
public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
57+
List<String> events, String execArgs, long createTime) {
58+
super(NAME, serialNumber);
59+
this.taskId = taskId;
60+
this.duration = duration;
61+
this.createTime = createTime;
62+
String comma = ",";
63+
StringBuilder sb = new StringBuilder();
64+
if (Objects.nonNull(events) && !events.isEmpty()) {
65+
sb.append("event=")
66+
.append(String.join(comma, events))
67+
.append(comma);
68+
}
69+
if (execArgs != null && !execArgs.isEmpty()) {
70+
sb.append(execArgs);
71+
}
72+
this.execArgs = sb.toString();
73+
}
74+
75+
public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
76+
String execArgs, long createTime) {
77+
super(NAME, serialNumber);
78+
this.taskId = taskId;
79+
this.duration = duration;
80+
this.execArgs = execArgs;
81+
this.createTime = createTime;
82+
}
83+
84+
@Override
85+
public AsyncProfilerTaskCommand deserialize(Command command) {
86+
final List<KeyStringValuePair> argsList = command.getArgsList();
87+
String taskId = null;
88+
int duration = 0;
89+
String execArgs = null;
90+
long createTime = 0;
91+
String serialNumber = null;
92+
for (final KeyStringValuePair pair : argsList) {
93+
if ("SerialNumber".equals(pair.getKey())) {
94+
serialNumber = pair.getValue();
95+
} else if ("TaskId".equals(pair.getKey())) {
96+
taskId = pair.getValue();
97+
} else if ("Duration".equals(pair.getKey())) {
98+
duration = Integer.parseInt(pair.getValue());
99+
} else if ("ExecArgs".equals(pair.getKey())) {
100+
execArgs = pair.getValue();
101+
} else if ("CreateTime".equals(pair.getKey())) {
102+
createTime = Long.parseLong(pair.getValue());
103+
}
104+
}
105+
return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime);
106+
}
107+
108+
@Override
109+
public Command.Builder serialize() {
110+
final Command.Builder builder = commandBuilder();
111+
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
112+
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
113+
.addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs))
114+
.addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
115+
return builder;
116+
}
117+
118+
public String getTaskId() {
119+
return taskId;
120+
}
121+
122+
public int getDuration() {
123+
return duration;
124+
}
125+
126+
public String getExecArgs() {
127+
return execArgs;
128+
}
129+
130+
public long getCreateTime() {
131+
return createTime;
132+
}
133+
}

apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public static BaseCommand deserialize(final Command command) {
2727
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
2828
} else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) {
2929
return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command);
30+
} else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) {
31+
return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command);
3032
}
33+
3134
throw new UnsupportedCommandException(command);
3235
}
3336

apm-sniffer/apm-agent-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@
143143
<artifactId>jmh-generator-annprocess</artifactId>
144144
<scope>test</scope>
145145
</dependency>
146+
<dependency>
147+
<groupId>tools.profiler</groupId>
148+
<artifactId>async-profiler</artifactId>
149+
</dependency>
146150
</dependencies>
147151
<dependencyManagement>
148152
<dependencies>
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.agent.core.asyncprofiler;
20+
21+
import com.google.protobuf.ByteString;
22+
import io.grpc.Channel;
23+
import io.grpc.stub.ClientCallStreamObserver;
24+
import io.grpc.stub.ClientResponseObserver;
25+
import io.grpc.stub.StreamObserver;
26+
import org.apache.skywalking.apm.agent.core.boot.BootService;
27+
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
28+
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
29+
import org.apache.skywalking.apm.agent.core.conf.Config;
30+
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
31+
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
32+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
33+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
34+
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
35+
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
36+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse;
37+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData;
38+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData;
39+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc;
40+
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus;
41+
42+
import java.io.IOException;
43+
import java.nio.ByteBuffer;
44+
import java.nio.channels.FileChannel;
45+
import java.util.Objects;
46+
import java.util.concurrent.TimeUnit;
47+
48+
import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE;
49+
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
50+
51+
@DefaultImplementor
52+
public class AsyncProfilerDataSender implements BootService, GRPCChannelListener {
53+
private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerDataSender.class);
54+
55+
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
56+
57+
private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub asyncProfilerTaskStub;
58+
59+
@Override
60+
public void prepare() throws Throwable {
61+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
62+
}
63+
64+
@Override
65+
public void boot() throws Throwable {
66+
67+
}
68+
69+
@Override
70+
public void onComplete() throws Throwable {
71+
72+
}
73+
74+
@Override
75+
public void shutdown() throws Throwable {
76+
77+
}
78+
79+
@Override
80+
public void statusChanged(GRPCChannelStatus status) {
81+
if (GRPCChannelStatus.CONNECTED.equals(status)) {
82+
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
83+
asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel);
84+
} else {
85+
asyncProfilerTaskStub = null;
86+
}
87+
this.status = status;
88+
}
89+
90+
public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException {
91+
if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) {
92+
return;
93+
}
94+
95+
int size = Math.toIntExact(channel.size());
96+
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
97+
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
98+
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
99+
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
100+
ClientCallStreamObserver<AsyncProfilerData> requestStream;
101+
102+
@Override
103+
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
104+
this.requestStream = requestStream;
105+
}
106+
107+
@Override
108+
public void onNext(AsyncProfilerCollectionResponse value) {
109+
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
110+
LOGGER.warn("JFR is too large to be received by the oap server");
111+
} else {
112+
ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE);
113+
try {
114+
while (channel.read(buf) > 0) {
115+
buf.flip();
116+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
117+
.setContent(ByteString.copyFrom(buf))
118+
.build();
119+
requestStream.onNext(asyncProfilerData);
120+
buf.clear();
121+
}
122+
} catch (IOException e) {
123+
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
124+
}
125+
}
126+
127+
requestStream.onCompleted();
128+
}
129+
130+
@Override
131+
public void onError(Throwable t) {
132+
status.finished();
133+
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
134+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
135+
}
136+
137+
@Override
138+
public void onCompleted() {
139+
status.finished();
140+
}
141+
});
142+
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
143+
.setService(Config.Agent.SERVICE_NAME)
144+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
145+
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
146+
.setContentSize(size)
147+
.setTaskId(task.getTaskId())
148+
.build();
149+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
150+
dataStreamObserver.onNext(asyncProfilerData);
151+
152+
status.wait4Finish();
153+
}
154+
155+
public void sendError(AsyncProfilerTask task, String errorMessage) {
156+
if (status != GRPCChannelStatus.CONNECTED) {
157+
return;
158+
}
159+
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
160+
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
161+
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
162+
).collect(new StreamObserver<AsyncProfilerCollectionResponse>() {
163+
@Override
164+
public void onNext(AsyncProfilerCollectionResponse value) {
165+
}
166+
167+
@Override
168+
public void onError(Throwable t) {
169+
status.finished();
170+
LOGGER.error(t, "Send async profiler task execute error fail with a grpc internal exception.");
171+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
172+
}
173+
174+
@Override
175+
public void onCompleted() {
176+
status.finished();
177+
}
178+
});
179+
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
180+
.setService(Config.Agent.SERVICE_NAME)
181+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
182+
.setTaskId(task.getTaskId())
183+
.setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR)
184+
.setContentSize(-1)
185+
.build();
186+
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
187+
.setMetaData(metaData)
188+
.setErrorMessage(errorMessage)
189+
.build();
190+
dataStreamObserver.onNext(asyncProfilerData);
191+
dataStreamObserver.onCompleted();
192+
status.wait4Finish();
193+
}
194+
}

0 commit comments

Comments
 (0)