Microservice framework TraceId scheme

Microservice framework TraceId scheme

  • 1. Background and purpose
  • 2. MDC Mechanism
  • 3. TraceId scheme under dubbo framework
    • 3.1 A user requests link tracking
      • 3.1.1 web layer
      • 3.1.2 RPC call layer
    • 3.2 Thread pool
    • 3.3 Others

1. Background and purpose

In the project, as the business of the project becomes more and more complex, and the micro-service of the project, etc., the phenomenon such as: the same operation log is scattered in the ordinary project, which causes great trouble to troubleshoot the problem by querying the log. Therefore, there is an urgent need for a means to trace the current link operation log.

By implementing the traceId function, obtain the following log information of the current operation link:

  1. The current operation http request has traceId log information on the web side
  2. The current operating java server has the dubbo consumer log information with traceId
  3. The log information of the dubbo provider with traceId on the currently operating java server
  4. Other service log information that can be strung together, such as MQ, asynchronous threads, etc…

2. MDC mechanism

MDC (Mapped Diagnostic Context) is a mechanism provided by log4j and logback to facilitate logging under multi-threaded conditions. MDC can be regarded as a Map bound to the current thread, similar to ThreadLocal, which can add key-value pairs to it. The content contained in the MDC can be accessed by code executing in the same thread. The child thread of the current thread will inherit the content of the MDC in its parent thread (only initialized once when the child thread is created does not apply to the thread pool). When you need to record logs, you only need to get them from MDC.

3. TraceId scheme under dubbo framework

First define a general TraceId generation tool class, as follows:

import org.springframework.util.StringUtils;

public class TraceIdUtils {<!-- -->
    public static final String X_TRACE_ID = "X-TraceId";
    /**
     * Store traceId through ThreadLocal to ensure that the same thread can get the same traceId
     */
    private static final ThreadLocal<String> TRACE_ID_LOCAL = new ThreadLocal();

    private TraceIdUtils() {<!-- -->
    }

    // Generate a traceId
    public static String genTraceId() {<!-- -->
        String traceId = genTraceIdNotCached();
        TRACE_ID_LOCAL.set(traceId);
        return traceId;
    }

    public static String genTraceIdNotCached() {<!-- -->
        // TODO Note that TraceId is generally generated by the Snowflake Algorithm, which is convenient and simple, and directly uses the timestamp (actual production needs to define the Snowflake Algorithm for generation)
        return String. valueOf(System. currentTimeMillis());
    }

    // get traceId
    public static String getTraceId() {<!-- -->
        return (String)TRACE_ID_LOCAL. get();
    }

    // get traceId
    public static String gen2GetTraceId() {<!-- -->
        if (TRACE_ID_LOCAL. get() == null) {<!-- -->
            return genTraceId();
        }
        return (String)TRACE_ID_LOCAL. get();
    }


    public static void clear() {<!-- -->
        TRACE_ID_LOCAL. remove();
    }
}

3.1 A user requests link tracking

3.1.1 web layer

Define Interceptor, and add

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class WebTraceIdInterceptor implements HandlerInterceptor {<!-- -->
    private static final Logger log = LoggerFactory. getLogger(WebTraceIdInterceptor. class);

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {<!-- -->
        String traceId = MDC.get(TraceIdUtils.X_TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {<!-- -->
            // Generate a traceId
            traceId = TraceIdUtils.genTraceIdWithSW();
        }
        TraceIdUtils.setTraceId(traceId);
        MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {<!-- -->
        TraceIdUtils. clear();
        MDC. clear();
    }
}

@Configuration
public class WebConfig implements WebMvcConfigurer {<!-- -->

    @Bean
    public WebTraceIdInterceptor webTraceIdInterceptor() {<!-- -->
        return new WebTraceIdInterceptor();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {<!-- -->
        // add web traceId interceptor
        registry. addInterceptor(webTraceIdInterceptor())
        .addPathPatterns("/**") // intercept all requests
        .order(Integer.MIN_VALUE); // The smaller the order, the higher the priority
    }
}

3.1.2 RPC call layer

It is processed through Filter and dubbo attachments parameters.

  • Defines BaseFilter : used to handle common logic.
  • ConsumerTraceLogFilter: Consumer Filter
  • ProviderTraceLogFilter: service provider Filter
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONSValidator;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Result;
import org.springframework.util.ClassUtils;

import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class BaseFilter {<!-- -->
    private static final Integer SUB_LENGTH = 2000;

    public List<Object> obtainMethodArguments(Object[] args) {<!-- -->
        if (args == null) {<!-- -->
            return Collections.emptyList();
        }
        List<Object> params = Arrays. asList(args);
        return params. stream(). filter(arg -> !isExclude(arg)). collect(Collectors. toList());
    }

    public boolean isExclude(Object arg) {<!-- -->
        if (arg == null) {<!-- -->
            return true;
        } else if (ClassUtils. isAssignable(File. class, arg. getClass())) {<!-- -->
            return true;
        } else if (ClassUtils. isAssignable(InputStream. class, arg. getClass())) {<!-- -->
            return true;
        } else {<!-- -->
            return ClassUtils.isAssignable(OutputStream.class, arg.getClass());
        }
    }

    public String subLength(Result result) {<!-- -->
        final String resultVal = JSON.toJSONString(result.getValue());
        final JSONValidator validator = JSONValidator. from(resultVal);
        String str = resultVal;
        if (validator.validate()) {<!-- -->
            final JSONValidator.Type type = validator.getType();
            if (JSONSValidator.Type.Array.equals(type) & amp; & amp; resultVal.length() > SUB_LENGTH) {<!-- -->
                final JSONArray objects = JSON. parseArray(str);
                str = this. subLength(resultVal);
                str = str + "json array size=" + objects. size();
            }
        }
        return this. subLength(str);
    }

    public String subLength(String str) {<!-- -->
        if (StringUtils. isBlank(str)) {<!-- -->
            return str;
        } else {<!-- -->
            return str.length() < SUB_LENGTH ? str : str.substring(0, SUB_LENGTH).concat("...");
        }
    }
}

import com.alibaba.fastjson.JSON;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Activate(group = {<!-- -->Constants. CONSUMER}, order = -100)
public class ConsumerTraceLogFilter extends BaseFilter implements Filter {<!-- -->
    private static final Logger log = LoggerFactory. getLogger(ConsumerTraceLogFilter. class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {<!-- -->
        String traceId = MDC.get(TraceIdUtils.X_TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {<!-- -->
            // Generate a traceId
            traceId = TraceIdUtils.genTraceIdWithSW();
        }
        try {<!-- -->
            if (StringUtils.isEmpty(traceId)) {<!-- -->
                traceId = String.valueOf(System.currentTimeMillis());
            }
            invocation.getAttachments().put(TraceIdUtils.X_TRACE_ID, traceId);
            long begin = System. currentTimeMillis();
            // execute method
            final Result result = invoker.invoke(invocation);
            String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments()));
            log.info("consumer rpc {}#{} cost={}ms params = {} result={}",
                    invoker.getInterface().getName(),
                    invocation. getMethodName(),
                    System.currentTimeMillis() - begin,
                    params,
                    result.getValue() == null ? null : this.subLength(result));
            return result;
        } finally {<!-- -->
            TraceIdUtils.setTraceId(traceId);
            MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
        }
    }
}

import com.alibaba.fastjson.JSON;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Activate(group = {<!-- -->Constants. PROVIDER}, order = -100)
public class ProviderTraceLogFilter extends BaseFilter implements Filter {<!-- -->
    private static final Logger log = LoggerFactory. getLogger(ProviderTraceLogFilter. class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {<!-- -->
        try {<!-- -->
            String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments()));
            String interfaceName = invoker. getInterface(). getName();
            String methodName = invocation. getMethodName();
            String traceId = invocation.getAttachments().get(TraceIdUtils.X_TRACE_ID);
            if (StringUtils.isEmpty(traceId)) {<!-- -->
                traceId = TraceIdUtils.genTraceIdWithSW();
                log.warn("rpc producer traceId is empty and actively generates interfaceName={}, methodName={}", interfaceName, methodName);
            }
            TraceIdUtils.setTraceId(traceId);
            MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
            long begin = System. currentTimeMillis();
            Result result = invoker.invoke(invocation);
            log.info("provider rpc {}#{} cost={}ms params = {} result={}",
                    interfaceName,
                    methodName,
                    System.currentTimeMillis() - begin,
                    params,
                    result.getValue() == null ? null : this.subLength(result));
            return result;
        } finally {<!-- -->
            TraceIdUtils. clear();
            MDC. clear();
        }
    }
}

3.2 Thread pool

Writing a TaskDecorator

import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;

import java.util.Map;

public class TraceIdTaskDecorator implements TaskDecorator {<!-- -->

    @Override
    public Runnable decorate(Runnable runnable) {<!-- -->
        // Get the current thread calling MDC context
        Map<String, String> contextMap = MDC. getCopyOfContextMap();
        return () -> {<!-- -->
            try {<!-- -->
                if (contextMap != null) {<!-- -->
                    // Set the current thread context map
                    MDC.setContextMap(contextMap);
                }
                String traceId = MDC.get(TraceIdUtils.X_TRACE_ID);
                if (StringUtils.isBlank(traceId)) {<!-- -->
                    traceId = TraceIdUtils.genTraceIdWithSW();
                    MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
                }
                // Put it in threadLocal, the asynchronous thread RPC calls ConsumerTraceLogFilter is obtained from ThreadLocal
                TraceIdUtils.setTraceId(traceId);
                MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
                runnable. run();
            } finally {<!-- -->
                TraceIdUtils. clear();
                MDC. clear();
            }
        };
    }
}

Add TaskDecorator
Method 1: Use org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new TraceIdTaskDecorator());

Method 2: Override the ThreadPoolTaskExecutor.execute(Runable) method

private ExecutorService pool = new ThreadPoolExecutor(1,
            5,
            60L,
            TimeUnit. SECONDS,
            new ArrayBlockingQueue<>(10),
            new DefaultThreadFactory("fefault-thread-factory")) {<!-- -->
        @Override
        public void execute(Runnable command) {<!-- -->
        // Override the execute method
            Runnable decorated = new TraceIdTaskDecorator().decorate(command);
            super. execute(decorated);
        }
    };

3.3 Other

It can be processed by defining an aspect interceptor, which can be used in other scenarios such as job and mq consumer.

1. Define TraceId annotation

@Target({<!-- -->ElementType.METHOD})
@Retention(RetentionPolicy. RUNTIME)
@Inherited
public @interface TraceId {<!-- -->
}

2. Define aspect

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Aspect
@Component
public class TraceIdAspect {<!-- -->

    @Pointcut("@annotation(xxx.xxx.TraceId)")
    public void pointcut() {<!-- -->}

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {<!-- -->
        try {<!-- -->
            String traceId = MDC.get(TraceIdUtils.X_TRACE_ID);
            if (StringUtils.isEmpty(traceId)) {<!-- -->
                // Generate a traceId
                traceId = TraceIdUtils.genTraceIdWithSW();
            }
            TraceIdUtils.setTraceId(traceId);
            MDC.put(TraceIdUtils.X_TRACE_ID, traceId);
            return proceedingJoinPoint. proceed();
        } finally {<!-- -->
            TraceIdUtils. clear();
            MDC. clear();
        }
    }
}

3. Add annotations to the method that needs to add log traceId

@Component
@Slf4j
public class TestTraceId {<!-- -->
@TraceId
    public void test(Message msg) {<!-- -->
    log.info("xxxxxxxxx");
    }
}