Skip to content

Commit 2ea9753

Browse files
committed
Fix Span not finished in gateway plugin when the gateway request timeout.
1 parent 4b24b7d commit 2ea9753

File tree

4 files changed

+51
-9
lines changed

4 files changed

+51
-9
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Release Notes.
1616
* Grpc plugin support trace client async generic call(without grpc stubs), support Method type: `UNARY``SERVER_STREAMING`.
1717
* Enhance Apache ShenYu (incubating) plugin: support trace `grpc`,`sofarpc`,`motan`,`tars` rpc proxy.
1818
* Add primary endpoint name to log events.
19+
* Fix Span not finished in gateway plugin when the gateway request timeout.
1920

2021
#### Documentation
2122

apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerResponseConnectionInterceptor.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
2525
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v3x.define.EnhanceObjectCache;
2626
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.SignalType;
2729
import reactor.netty.Connection;
2830
import reactor.netty.http.client.HttpClientResponse;
2931

@@ -39,7 +41,7 @@ public class HttpClientFinalizerResponseConnectionInterceptor implements Instanc
3941

4042
@Override
4143
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
42-
MethodInterceptResult result) throws Throwable {
44+
MethodInterceptResult result) {
4345
BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher> finalReceiver = (BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher>) allArguments[0];
4446
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
4547
allArguments[0] = (BiFunction<HttpClientResponse, Connection, Publisher>) (response, connection) -> {
@@ -53,20 +55,48 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
5355
cache.getSpan().errorOccurred();
5456
}
5557
Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code());
56-
cache.getSpan().asyncFinish();
57-
}
58-
59-
if (cache.getSpan1() != null) {
60-
cache.getSpan1().asyncFinish();
6158
}
59+
6260
return publisher;
6361
};
6462
}
6563

6664
@Override
6765
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
68-
Object ret) throws Throwable {
69-
return ret;
66+
Object ret) {
67+
Flux<?> responseFlux = (Flux<?>) ret;
68+
69+
responseFlux = responseFlux
70+
.doOnError(e -> {
71+
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
72+
if (cache == null) {
73+
return;
74+
}
75+
76+
if (cache.getSpan() != null) {
77+
cache.getSpan().errorOccurred();
78+
cache.getSpan().log(e);
79+
}
80+
})
81+
.doFinally(signalType -> {
82+
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
83+
if (cache == null) {
84+
return;
85+
}
86+
// receive the response. Stop the span.
87+
if (cache.getSpan() != null) {
88+
if (signalType == SignalType.CANCEL) {
89+
cache.getSpan().errorOccurred();
90+
}
91+
cache.getSpan().asyncFinish();
92+
}
93+
94+
if (cache.getSpan1() != null) {
95+
cache.getSpan1().asyncFinish();
96+
}
97+
});
98+
99+
return responseFlux;
70100
}
71101

72102
@Override

apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerInterceptorTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.powermock.modules.junit4.PowerMockRunner;
4141
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
4242
import org.reactivestreams.Publisher;
43+
import reactor.core.publisher.Flux;
4344
import reactor.netty.Connection;
4445
import reactor.netty.NettyOutbound;
4546
import reactor.netty.http.client.HttpClientRequest;
@@ -151,9 +152,13 @@ private void executeSendRequest() throws Throwable {
151152
Object[] responseConnectionArguments = new Object[]{originalResponseConnectionBiFunction};
152153
responseConnectionInterceptor
153154
.beforeMethod(enhancedInstance, null, responseConnectionArguments, null, null);
154-
responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, enhancedInstance);
155+
Flux flux = Flux.just(0);
156+
157+
flux = (Flux) responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, flux);
158+
155159
((BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher<Void>>) responseConnectionArguments[0])
156160
.apply(mockResponse, null);
161+
flux.blockFirst();
157162
}
158163

159164
private void assertUpstreamSpan(AbstractSpan span) {

test/plugin/scenarios/gateway-3.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@ server:
2020
spring:
2121
cloud:
2222
gateway:
23+
httpclient:
24+
connect-timeout: 2000
2325
routes:
2426
- id: provider_route
2527
uri: http://localhost:18070
2628
predicates:
2729
- Path=/provider/b/*
30+
- id: provider_timeout
31+
uri: http://1.2.3.4:18070
32+
predicates:
33+
- Path=/provider/timeout/*

0 commit comments

Comments
 (0)