验证码: 看不清楚,换一张 查询 注册会员,免验证
  • {{ basic.site_slogan }}
  • 打开微信扫一扫,
    您还可以在这里找到我们哟

    关注我们

Java gRPC拦截器如何实现分布式日志链路追踪器

阅读:824 来源:乙速云 作者:代码code

Java gRPC拦截器如何实现分布式日志链路追踪器

跨进程链路追踪原理

想要实现跨进程间的分布式链路追踪,就要在发起远程调用的时候通过请求头或者公共的自定义域将链路参数放进去,然后服务端收到请求后将链路参数从请求头或者自定义域中或取出来,就这样一层一层的将链路参数传递下去直至调用结束。

JAVA的gRPC库io.grpc提供了在RPC调用中客户端和服务端的拦截器(Interceptor),通过客户端拦截器我们可以将链路追踪的参数放到gRPC调用的Metadata中,通过服务端拦截器能够从Metadata中获取到链路追踪所传递的参数;io.grpc提供的客户端拦截器和服务端拦截器分别是io.grpc.ClientInterceptorio.grpc.ServerInterceptor

代码实现

maven依赖

    
        
            io.grpc
            grpc-all
            ${grpc.version}
            provided
        
        
            net.devh
            grpc-server-spring-boot-starter
            ${grpc.starter.version}
            provided
        
        
            net.devh
            grpc-client-spring-boot-starter
            ${grpc.starter.version}
            provided
        
        
            io.github.redick01
            log-helper-spring-boot-starter-common
            1.0.3-RELEASE
        
    

拦截器实现

@Slf4j
@GrpcGlobalClientInterceptor
@GrpcGlobalServerInterceptor
public class GrpcInterceptor extends AbstractInterceptor implements ServerInterceptor, ClientInterceptor {
    // 链路追踪参数traceId
    private static final Metadata.Key TRACE = Metadata.Key.of("traceId", Metadata.ASCII_STRING_MARSHALLER)
    // 链路追踪参数spanId
    private static final Metadata.Key SPAN = Metadata.Key.of("spanId", Metadata.ASCII_STRING_MARSHALLER);
    // 链路追踪参数parentId
    private static final Metadata.Key PARENT = Metadata.Key.of("parentId", Metadata.ASCII_STRING_MARSHALLER);
    @Override
    public  ClientCall interceptCall(
            MethodDescriptor methodDescriptor, CallOptions callOptions,
            Channel channel) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        try {
            return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(methodDescriptor, callOptions)) {
                @Override
                public void start(Listener responseListener, Metadata headers) {
                    // 客户端传递链路追中数据,将数据放到headers中
                    String traceId = traceId();
                    if (StringUtils.isNotBlank(traceId)) {
                        headers.put(TRACE, traceId);
                        headers.put(SPAN, spanId());
                        headers.put(PARENT, parentId());
                    }
                    // 继续下一步
                    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) {
                        @Override
                        public void onHeaders(Metadata headers) {
                            // 服务端传递回来的header
                            super.onHeaders(headers);
                        }
                    }, headers);
                }
            };
        } finally {
            stopWatch.stop();
            log.info(LogUtil.marker(stopWatch.getTime()), "GRPC调用耗时");
        }
    }
    @Override
    public  Listener interceptCall(ServerCall serverCall,
            Metadata headers, ServerCallHandler serverCallHandler) {
        // 服务端从headers中获取到链路追踪参数
        String traceId = headers.get(TRACE);
        String spanId = headers.get(SPAN);
        String parentId = headers.get(PARENT);
        // 构建当前进程的链路追踪数据并体现在日志中
        Tracer.trace(traceId, spanId, parentId);
        log.info(LogUtil.marker(), "开始处理");
        return serverCallHandler.startCall(new ForwardingServerCall.SimpleForwardingServerCall(serverCall) {
            @Override
            public void sendHeaders(Metadata responseHeaders) {
                super.sendHeaders(responseHeaders);
            }
            @Override
            public void close(Status status, Metadata trailers) {
                super.close(status, trailers);
            }
        }, headers);
    }
}

客户端使用

客户端使用代码如下,该使用示例是在我开源的日志工具中的例子,我这里通过springboot自动装配将GrpcInterceptor交由spring容器管理。所以可以直接通过自动注入的方式使用。

@RestController
public class TestController {
    @GrpcClient("userClient")
    private UserServiceGrpc.UserServiceBlockingStub userService;
    @Autowired
    private GrpcInterceptor grpcInterceptor;
    //@LogMarker(businessDescription = "获取用户名")
    @GetMapping("/getUser")
    public String getUser()     {
        User user = User.newBuilder()
                .setUserId(100)
                .putHobbys("pingpong", "play pingpong")
                .setCode(200)
                .build();
        Channel channel = ClientInterceptors.intercept(userService.getChannel(), grpcInterceptor);
        userService = UserServiceGrpc.newBlockingStub(channel);
        User u = userService.getUser(user);
        return u.getName();
    }
}
分享到:
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: hlamps#outlook.com (#换成@)。
相关文章
{{ v.title }}
{{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
你可能感兴趣
推荐阅读 更多>