Binlog printing principle in gRPC

Principle of Binlog printing in gRPC

gRPC supports outputting request call parameters, header and other information to a file in binary form

Use

The dependency of binlog is in grpc-services, so this dependency is required

  • Specified when creating the Channel
BinaryLog binaryLog = BinaryLogs.createBinaryLog(new TempFileSink(), "*");
this.channel = ManagedChannelBuilder.forAddress(host, port)
                                    .usePlaintext()
                                    .setBinaryLog(binaryLog)
                                    .build();

When creating, you need to specify the printing method. * means printing all methods. For specific specifications, please refer to Control Interface
You can also specify the method to be printed without specifying parameters when creating, by setting the environment variable GRPC_BINARY_LOG_CONFIG=*
If you need to specify the file location, you can override io.grpc.services.BinaryLogSink to specify the file location.

Implementation

When the method is called, it will be judged whether there is a binlog object set. If so, the method will be encapsulated, and a processor and listener will be added; then ServerMethodDefinition will be re-created; through the binary log interceptor io.grpc .services.BinlogHelper#getClientInterceptor intercepts the request and writes to the log

  • io.grpc.internal.ServerImpl.ServerTransportListenerImpl#startCall
 private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
        ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
        Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {<!-- -->

      // If binlog is not empty, that is, binlog needs to be recorded, add a request listener and method processor to record binlog
      ServerMethodDefinition<?, ?> wMethodDef = binlog == null ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);

      return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
    }
  • io.grpc.services.BinaryLogProvider#wrapMethodDefinition
 public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(ServerMethodDefinition<ReqT, RespT> oMethodDef) {
    // Get the binary log interceptor based on the method. If there is no such method, it will not be intercepted.
    ServerInterceptor binlogInterceptor = getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
    if (binlogInterceptor == null) {
      return oMethodDef;
    }

    MethodDescriptor<byte[], byte[]> binMethod = BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
    // Wrapping method, adding processor and listener
    ServerMethodDefinition<byte[], byte[]> binDef = InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
    //Create processor
    ServerCallHandler<byte[], byte[]> binlogHandler =
            InternalServerInterceptors.interceptCallHandlerCreate(binlogInterceptor, binDef.getServerCallHandler());
    //Create service method definition
    return ServerMethodDefinition.create(binMethod, binlogHandler);
  }
  • io.grpc.services.BinlogHelper#getClientInterceptor
 public ClientInterceptor getClientInterceptor(final long callId) {<!-- -->
    return new ClientInterceptor() {<!-- -->
      boolean trailersOnlyResponse = true;

      @Override
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
              final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {<!-- -->
        final String methodName = method.getFullMethodName();
        final String authority = next.authority();
        final Deadline deadline = min(callOptions.getDeadline(), Context.current().getDeadline());

        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {<!-- -->
          @Override
          public void start(final ClientCall.Listener<RespT> responseListener, Metadata headers) {<!-- -->
            final Duration timeout = deadline == null ? null
                    : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
            writer.logClientHeader(
                    seq.getAndIncrement(),
                    methodName,
                    authority, authority
                    timeout,
                    headers,
                    GrpcLogEntry.Logger.LOGGER_CLIENT,
                    callId,
                    /*peerAddress=*/ null);
            ClientCall.Listener<RespT> wListener =
                    new SimpleForwardingClientCallListener<RespT>(responseListener) {<!-- -->
                      @Override
                      public void onMessage(RespT message) {<!-- -->
                        writer.logRpcMessage(
                                seq.getAndIncrement(),
                                EventType.EVENT_TYPE_SERVER_MESSAGE,
                                method.getResponseMarshaller(),
                                message,
                                GrpcLogEntry.Logger.LOGGER_CLIENT,
                                callId);
                        super.onMessage(message);
                      }

                      @Override
                      public void onHeaders(Metadata headers) {<!-- -->
                        trailersOnlyResponse = false;
                        writer.logServerHeader(
                                seq.getAndIncrement(),
                                headers,
                                GrpcLogEntry.Logger.LOGGER_CLIENT,
                                callId,
                                getPeerSocket(getAttributes()));
                        super.onHeaders(headers);
                      }

                      @Override
                      public void onClose(Status status, Metadata trailers) {<!-- -->
                        SocketAddress peer = trailersOnlyResponse
                                ? getPeerSocket(getAttributes()) : null;
                        writer.logTrailer(
                                seq.getAndIncrement(),
                                status,
                                trailers,
                                GrpcLogEntry.Logger.LOGGER_CLIENT,
                                callId,
                                peer);
                        super.onClose(status, trailers);
                      }
                    };
            super.start(wListener, headers);
          }

          @Override
          public void sendMessage(ReqT message) {<!-- -->
            writer.logRpcMessage(
                    seq.getAndIncrement(),
                    EventType.EVENT_TYPE_CLIENT_MESSAGE,
                    method.getRequestMarshaller(),
                    message,
                    GrpcLogEntry.Logger.LOGGER_CLIENT,
                    callId);
            super.sendMessage(message);
          }

          @Override
          public void halfClose() {<!-- -->
            writer.logHalfClose(
                    seq.getAndIncrement(),
                    GrpcLogEntry.Logger.LOGGER_CLIENT,
                    callId);
            super.halfClose();
          }

          @Override
          public void cancel(String message, Throwable cause) {<!-- -->
            writer.logCancel(
                    seq.getAndIncrement(),
                    GrpcLogEntry.Logger.LOGGER_CLIENT,
                    callId);
            super.cancel(message, cause);
          }
        };
      }
    };
  }