diff --git a/CHANGES.md b/CHANGES.md index e45e028d24..cdc8681f77 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,6 +16,7 @@ Release Notes. * Grpc plugin support trace client async generic call(without grpc stubs), support Method type: `UNARY`、`SERVER_STREAMING`. * Enhance Apache ShenYu (incubating) plugin: support trace `grpc`,`sofarpc`,`motan`,`tars` rpc proxy. * Add primary endpoint name to log events. +* Fix Span not finished in gateway plugin when the gateway request timeout. #### Documentation diff --git a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientFinalizerResponseConnectionInterceptor.java b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientFinalizerResponseConnectionInterceptor.java index 1d67b7c0f6..9d26794b17 100644 --- a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientFinalizerResponseConnectionInterceptor.java +++ b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientFinalizerResponseConnectionInterceptor.java @@ -19,46 +19,74 @@ import java.lang.reflect.Method; import java.util.function.BiFunction; + +import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.skywalking.apm.agent.core.context.tag.Tags; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.define.EnhanceObjectCache; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SignalType; import reactor.netty.Connection; import reactor.netty.http.client.HttpClientResponse; public class HttpClientFinalizerResponseConnectionInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - MethodInterceptResult result) throws Throwable { + MethodInterceptResult result) { BiFunction finalReceiver = (BiFunction) allArguments[0]; EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); - allArguments[0] = new BiFunction() { - @Override - public Publisher apply(final HttpClientResponse response, final Connection connection) { - Publisher publisher = finalReceiver.apply(response, connection); - // receive the response. Stop the span. - if (cache.getSpan() != null) { - if (response.status().code() >= 400) { - cache.getSpan().errorOccurred(); - } - Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code()); - cache.getSpan().asyncFinish(); - } - - if (cache.getSpan1() != null) { - cache.getSpan1().asyncFinish(); + allArguments[0] = (BiFunction) (response, connection) -> { + Publisher publisher = finalReceiver.apply(response, connection); + // receive the response. + if (cache.getSpan() != null) { + if (response.status().code() >= HttpResponseStatus.BAD_REQUEST.code()) { + cache.getSpan().errorOccurred(); } - return publisher; + Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code()); } + return publisher; }; } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - Object ret) throws Throwable { - return ret; + Object ret) { + Flux responseFlux = (Flux) ret; + + responseFlux = responseFlux + .doOnError(e -> { + EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); + if (cache == null) { + return; + } + + if (cache.getSpan() != null) { + cache.getSpan().errorOccurred(); + cache.getSpan().log(e); + } + }) + .doFinally(signalType -> { + EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); + if (cache == null) { + return; + } + // do finally. Finish the span. + if (cache.getSpan() != null) { + if (signalType == SignalType.CANCEL) { + cache.getSpan().errorOccurred(); + } + cache.getSpan().asyncFinish(); + } + + if (cache.getSpan1() != null) { + cache.getSpan1().asyncFinish(); + } + }); + + return responseFlux; } @Override diff --git a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerResponseConnectionInterceptor.java b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerResponseConnectionInterceptor.java index 0ef159d44a..de484c86db 100644 --- a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerResponseConnectionInterceptor.java +++ b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerResponseConnectionInterceptor.java @@ -24,6 +24,8 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v3x.define.EnhanceObjectCache; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SignalType; import reactor.netty.Connection; import reactor.netty.http.client.HttpClientResponse; @@ -39,7 +41,7 @@ public class HttpClientFinalizerResponseConnectionInterceptor implements Instanc @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - MethodInterceptResult result) throws Throwable { + MethodInterceptResult result) { BiFunction finalReceiver = (BiFunction) allArguments[0]; EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); allArguments[0] = (BiFunction) (response, connection) -> { @@ -47,26 +49,54 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr if (cache == null) { return publisher; } - // receive the response. Stop the span. + // receive the response. if (cache.getSpan() != null) { if (response.status().code() >= HttpResponseStatus.BAD_REQUEST.code()) { cache.getSpan().errorOccurred(); } Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code()); - cache.getSpan().asyncFinish(); - } - - if (cache.getSpan1() != null) { - cache.getSpan1().asyncFinish(); } + return publisher; }; } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - Object ret) throws Throwable { - return ret; + Object ret) { + Flux responseFlux = (Flux) ret; + + responseFlux = responseFlux + .doOnError(e -> { + EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); + if (cache == null) { + return; + } + + if (cache.getSpan() != null) { + cache.getSpan().errorOccurred(); + cache.getSpan().log(e); + } + }) + .doFinally(signalType -> { + EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField(); + if (cache == null) { + return; + } + // do finally. Finish the span. + if (cache.getSpan() != null) { + if (signalType == SignalType.CANCEL) { + cache.getSpan().errorOccurred(); + } + cache.getSpan().asyncFinish(); + } + + if (cache.getSpan1() != null) { + cache.getSpan1().asyncFinish(); + } + }); + + return responseFlux; } @Override diff --git a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerInterceptorTest.java b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerInterceptorTest.java index 7723f02d57..b3bfbde116 100644 --- a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerInterceptorTest.java +++ b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v3x/HttpClientFinalizerInterceptorTest.java @@ -40,6 +40,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.netty.Connection; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClientRequest; @@ -151,9 +152,13 @@ private void executeSendRequest() throws Throwable { Object[] responseConnectionArguments = new Object[]{originalResponseConnectionBiFunction}; responseConnectionInterceptor .beforeMethod(enhancedInstance, null, responseConnectionArguments, null, null); - responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, enhancedInstance); + Flux flux = Flux.just(0); + + flux = (Flux) responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, flux); + ((BiFunction>) responseConnectionArguments[0]) .apply(mockResponse, null); + flux.blockFirst(); } private void assertUpstreamSpan(AbstractSpan span) { diff --git a/test/plugin/scenarios/gateway-2.1.x-scenario/config/expectedData.yaml b/test/plugin/scenarios/gateway-2.1.x-scenario/config/expectedData.yaml index f317ca6840..bcd4005215 100644 --- a/test/plugin/scenarios/gateway-2.1.x-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/gateway-2.1.x-scenario/config/expectedData.yaml @@ -19,6 +19,26 @@ segmentItems: segments: - segmentId: not null spans: + - operationName: /provider/timeout/error + parentSpanId: 0 + spanId: 1 + isError: true + spanType: Exit + tags: + - { key: url, value: not null } + - { key: http.method, value: GET } + - { key: http.status_code, value: '500' } + logs: + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - { key: stack, value: not null } + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - { key: stack, value: not null } - operationName: GET:/provider/b/testcase parentSpanId: -1 spanId: 0 @@ -42,21 +62,49 @@ segmentItems: segments: - segmentId: not null spans: - - operationName: /provider/b/testcase + - operationName: /provider/timeout/error parentSpanId: -1 spanId: 0 spanLayer: Http - startTime: nq 0 - endTime: nq 0 - componentId: 67 - isError: false + isError: true spanType: Entry - peer: not null tags: - - {key: url, value: 'http://localhost:8080/provider/b/testcase'} + - {key: url, value: 'http://localhost:8080/provider/timeout/error' } - {key: http.method, value: GET} - - {key: http.status_code, value: '200'} - skipAnalysis: 'false' + - {key: http.status_code, value: '500'} + - segmentId: not null + spans: + - operationName: SpringCloudGateway/sendRequest + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 61 + isError: true + spanType: Exit + peer: 1.2.3.4:18070 + skipAnalysis: false + tags: + - { key: url, value: not null } + logs: + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - { key: stack, value: not null } + - operationName: SpringCloudGateway/RoutingFilter + parentSpanId: -1 + spanId: 0 + startTime: nq 0 + endTime: nq 0 + componentId: 61 + spanType: Local + refs: + - { parentEndpoint: '/provider/timeout/error', networkAddress: '', refType: CrossThread, + parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not + null, parentService: not null, traceId: not null } + skipAnalysis: 'false' - segmentId: not null spans: - operationName: SpringCloudGateway/sendRequest @@ -87,3 +135,20 @@ segmentItems: parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: not null, traceId: not null} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: /provider/b/testcase + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 67 + isError: false + spanType: Entry + peer: not null + tags: + - { key: url, value: 'http://localhost:8080/provider/b/testcase' } + - { key: http.method, value: GET } + - { key: http.status_code, value: '200' } + skipAnalysis: 'false' \ No newline at end of file diff --git a/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml b/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml index da55c58a41..7639e7ce8a 100644 --- a/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml +++ b/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml @@ -20,8 +20,14 @@ server: spring: cloud: gateway: + httpclient: + connect-timeout: 2000 routes: - id: provider_route uri: http://localhost:18070 predicates: - Path=/provider/b/* + - id: provider_timeout + uri: http://1.2.3.4:18070 + predicates: + - Path=/provider/timeout/* diff --git a/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java b/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java index cb8b88ebad..923d7fd7ff 100644 --- a/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java +++ b/test/plugin/scenarios/gateway-2.1.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java @@ -19,12 +19,17 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; @RestController public class TestController { @RequestMapping("/provider/b/testcase") public String testcase() { + try { + new RestTemplate().getForEntity("http://localhost:8080/provider/timeout/error", String.class); + } catch (Exception e) { + } return "1"; } diff --git a/test/plugin/scenarios/gateway-3.x-scenario/config/expectedData.yaml b/test/plugin/scenarios/gateway-3.x-scenario/config/expectedData.yaml index f317ca6840..2eaea24149 100644 --- a/test/plugin/scenarios/gateway-3.x-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/gateway-3.x-scenario/config/expectedData.yaml @@ -19,6 +19,26 @@ segmentItems: segments: - segmentId: not null spans: + - operationName: /provider/timeout/error + parentSpanId: 0 + spanId: 1 + isError: true + spanType: Exit + tags: + - { key: url, value: not null } + - { key: http.method, value: GET } + - { key: http.status_code, value: '500' } + logs: + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - {key: stack, value: not null} + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - { key: stack, value: not null } - operationName: GET:/provider/b/testcase parentSpanId: -1 spanId: 0 @@ -42,20 +62,48 @@ segmentItems: segments: - segmentId: not null spans: - - operationName: /provider/b/testcase + - operationName: /provider/timeout/error parentSpanId: -1 spanId: 0 spanLayer: Http - startTime: nq 0 - endTime: nq 0 - componentId: 67 - isError: false + isError: true spanType: Entry - peer: not null tags: - - {key: url, value: 'http://localhost:8080/provider/b/testcase'} + - {key: url, value: 'http://localhost:8080/provider/timeout/error' } - {key: http.method, value: GET} - - {key: http.status_code, value: '200'} + - {key: http.status_code, value: '500'} + - segmentId: not null + spans: + - operationName: SpringCloudGateway/sendRequest + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 61 + isError: true + spanType: Exit + peer: 1.2.3.4:18070 + skipAnalysis: false + tags: + - { key: url, value: not null } + logs: + - logEvent: + - { key: event, value: error } + - { key: error.kind, value: not null } + - { key: message, value: not null } + - { key: stack, value: not null} + - operationName: SpringCloudGateway/RoutingFilter + parentSpanId: -1 + spanId: 0 + startTime: nq 0 + endTime: nq 0 + componentId: 61 + spanType: Local + refs: + - { parentEndpoint: '/provider/timeout/error', networkAddress: '', refType: CrossThread, + parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not + null, parentService: not null, traceId: not null } skipAnalysis: 'false' - segmentId: not null spans: @@ -87,3 +135,20 @@ segmentItems: parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: not null, traceId: not null} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: /provider/b/testcase + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 67 + isError: false + spanType: Entry + peer: not null + tags: + - { key: url, value: 'http://localhost:8080/provider/b/testcase' } + - { key: http.method, value: GET } + - { key: http.status_code, value: '200' } + skipAnalysis: 'false' diff --git a/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml b/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml index da55c58a41..7639e7ce8a 100644 --- a/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml +++ b/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectA-scenario/src/main/resources/application.yml @@ -20,8 +20,14 @@ server: spring: cloud: gateway: + httpclient: + connect-timeout: 2000 routes: - id: provider_route uri: http://localhost:18070 predicates: - Path=/provider/b/* + - id: provider_timeout + uri: http://1.2.3.4:18070 + predicates: + - Path=/provider/timeout/* diff --git a/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java b/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java index cb8b88ebad..923d7fd7ff 100644 --- a/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java +++ b/test/plugin/scenarios/gateway-3.x-scenario/gateway-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/gateway/projectB/controller/TestController.java @@ -19,12 +19,17 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; @RestController public class TestController { @RequestMapping("/provider/b/testcase") public String testcase() { + try { + new RestTemplate().getForEntity("http://localhost:8080/provider/timeout/error", String.class); + } catch (Exception e) { + } return "1"; }