Principle of Binlog printing in gRPC
gRPC supports outputting request call parameters, header and other information to a file in binary form
Use
The dependency of binlog is in grpc-services
, so this dependency is required
- Specified when creating the Channel
BinaryLog binaryLog = BinaryLogs.createBinaryLog(new TempFileSink(), "*"); this.channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext() .setBinaryLog(binaryLog) .build();
When creating, you need to specify the printing method. *
means printing all methods. For specific specifications, please refer to Control Interface
You can also specify the method to be printed without specifying parameters when creating, by setting the environment variable GRPC_BINARY_LOG_CONFIG=*
If you need to specify the file location, you can override io.grpc.services.BinaryLogSink
to specify the file location.
Implementation
When the method is called, it will be judged whether there is a binlog object set. If so, the method will be encapsulated, and a processor and listener will be added; then ServerMethodDefinition
will be re-created; through the binary log interceptor io.grpc .services.BinlogHelper#getClientInterceptor
intercepts the request and writes to the log
- io.grpc.internal.ServerImpl.ServerTransportListenerImpl#startCall
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers, Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {<!-- --> // If binlog is not empty, that is, binlog needs to be recorded, add a request listener and method processor to record binlog ServerMethodDefinition<?, ?> wMethodDef = binlog == null ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); }
- io.grpc.services.BinaryLogProvider#wrapMethodDefinition
public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(ServerMethodDefinition<ReqT, RespT> oMethodDef) { // Get the binary log interceptor based on the method. If there is no such method, it will not be intercepted. ServerInterceptor binlogInterceptor = getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName()); if (binlogInterceptor == null) { return oMethodDef; } MethodDescriptor<byte[], byte[]> binMethod = BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor()); // Wrapping method, adding processor and listener ServerMethodDefinition<byte[], byte[]> binDef = InternalServerInterceptors.wrapMethod(oMethodDef, binMethod); //Create processor ServerCallHandler<byte[], byte[]> binlogHandler = InternalServerInterceptors.interceptCallHandlerCreate(binlogInterceptor, binDef.getServerCallHandler()); //Create service method definition return ServerMethodDefinition.create(binMethod, binlogHandler); }
- io.grpc.services.BinlogHelper#getClientInterceptor
public ClientInterceptor getClientInterceptor(final long callId) {<!-- --> return new ClientInterceptor() {<!-- --> boolean trailersOnlyResponse = true; @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {<!-- --> final String methodName = method.getFullMethodName(); final String authority = next.authority(); final Deadline deadline = min(callOptions.getDeadline(), Context.current().getDeadline()); return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {<!-- --> @Override public void start(final ClientCall.Listener<RespT> responseListener, Metadata headers) {<!-- --> final Duration timeout = deadline == null ? null : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); writer.logClientHeader( seq.getAndIncrement(), methodName, authority, authority timeout, headers, GrpcLogEntry.Logger.LOGGER_CLIENT, callId, /*peerAddress=*/ null); ClientCall.Listener<RespT> wListener = new SimpleForwardingClientCallListener<RespT>(responseListener) {<!-- --> @Override public void onMessage(RespT message) {<!-- --> writer.logRpcMessage( seq.getAndIncrement(), EventType.EVENT_TYPE_SERVER_MESSAGE, method.getResponseMarshaller(), message, GrpcLogEntry.Logger.LOGGER_CLIENT, callId); super.onMessage(message); } @Override public void onHeaders(Metadata headers) {<!-- --> trailersOnlyResponse = false; writer.logServerHeader( seq.getAndIncrement(), headers, GrpcLogEntry.Logger.LOGGER_CLIENT, callId, getPeerSocket(getAttributes())); super.onHeaders(headers); } @Override public void onClose(Status status, Metadata trailers) {<!-- --> SocketAddress peer = trailersOnlyResponse ? getPeerSocket(getAttributes()) : null; writer.logTrailer( seq.getAndIncrement(), status, trailers, GrpcLogEntry.Logger.LOGGER_CLIENT, callId, peer); super.onClose(status, trailers); } }; super.start(wListener, headers); } @Override public void sendMessage(ReqT message) {<!-- --> writer.logRpcMessage( seq.getAndIncrement(), EventType.EVENT_TYPE_CLIENT_MESSAGE, method.getRequestMarshaller(), message, GrpcLogEntry.Logger.LOGGER_CLIENT, callId); super.sendMessage(message); } @Override public void halfClose() {<!-- --> writer.logHalfClose( seq.getAndIncrement(), GrpcLogEntry.Logger.LOGGER_CLIENT, callId); super.halfClose(); } @Override public void cancel(String message, Throwable cause) {<!-- --> writer.logCancel( seq.getAndIncrement(), GrpcLogEntry.Logger.LOGGER_CLIENT, callId); super.cancel(message, cause); } }; } }; }