日志追踪traceid (django怎么实现日志链路追踪)

一,使用traceId概述

平时出现问题查询日志是程序员的解决方式,日志一般会从服务器的日志文件,然后查找自己需要的日志,或者日志输出到es中,在es中进行搜索日志,可是对于目前流行的微服务或者单体服务,将日志串起来去查看并不是一件容易的事情,一般微服务会调用多个系统,有http请求的,有mq的等会产生大量的日志,根据日志快速定位到具体的问题才是我们想要的解决方案,毕竟要用最短的时间找到问题所在,并快速解决。目前的elk搜集日志,也只是把所有的日志搜集起来,并没有将具体的日志按照请求串起来,所以这个目前需要在日志中添加traceId进行日志的追踪。

二,请求的源头

1,http请求

思路

在最开始请求系统时候生成一个全局唯一的traceId,放在http 请求header中,系统接收到请求后,从header中取出这个traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要将traceId往下传递,从而形成一条链路。

实现

@Service
    public class TraceInterceptor implements HandlerInterceptor {
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
            String traceId = request.getHeader(ElkConstants.TRACE_ID);
            if (StringUtils.isNotEmpty(traceId)) {
                MDC.put(ElkConstants.TRACE_ID, traceId);
            } else {
                MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
            }
            return true;
        }

        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
            MDC.remove(ElkConstants.TRACE_ID);
        }
    }
@Configuration
    public class InterceptorConfig implements WebMvcConfigurer {

        @Resource
        private TraceInterceptor traceInterceptor;

        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(traceInterceptor).addPathPatterns("/**");
        }
    }
public class ElkConstants {


    /**
     * TRACE_ID
     */
    public static final String TRACE_ID = "traceId";
}

2,定时任务Task

思路

在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期

实现

@Component
@Slf4j
public class SyncTask extends BaseTask {

    /**
     * 定时任务
     */
    public void sync() {
        //设置traceId
        setTraceId();
        //自己的业务逻辑
    }
}
public abstract class BaseTask {

    public void setTraceId() {
        MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
    }
}

3,微服务(Feign)

思路

在请求的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期

实现

@Configuration
public class FeignInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        requestTemplate.header(ElkConstants.TRACE_ID, (String) MDC.get(ElkConstants.TRACE_ID));
    }
}

4,Mq的方式

思路

使用mq进行消息发送的时候,可以将请求发送消息的traceId从mdc中取出来,我们可以放到消息体中,当做一个字段,然后在消息端消费的时候,从消息体中获取到traceId,并放入到MDC中,伴随着此次的请求

实现(kafka为列子,其他也可以按照思路进行不同的实现)

@Component
public class KafkaSender implements MessageSender {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private KafkaConfig kafkaConfig;

    private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);

    /**
     * 异步发送
     *
     * @param mqSendEvent
     * @return
     */
    @Override
    public boolean sendAsyncMessage(MqSendEvent mqSendEvent) {
       try {
            mqSendEvent.setTraceId(MDC.get("traceId"));
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(mqSendEvent.getTopic(),
                    JSON.toJSONString(mqSendEvent));
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("kafka sendAsyncMessage error, ex = {}, topic = {}, data = {}", ex, mqSendEvent.getTopic(),
                            JSON.toJSONString(mqSendEvent));
                }

                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    log.info("kafka sendAsyncMessage success topic = {}, data = {}", mqSendEvent.getTopic(),
                            JSON.toJSONString(mqSendEvent));
                }
            });
        } finally {
            MDC.clear();
        }
        return true;
    }
}

public interface MessageSender {

    /**
     * 异步发送
     *
     * @param mqSendEvent
     */
    public boolean sendAsyncMessage(MqSendEvent mqSendEvent);
}
public class MqSendEvent implements Serializable {
    //topic
    private String topic;
    //数据
    private String data;
    //traceId数据
    private String traceId;

    public MqSendEvent(String topic, String data) {
        this.topic = topic;
        this.data = data;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    public String getTraceId() {
        return traceId;
    }

    public void setTraceId(String traceId) {
        this.traceId = traceId;
    }
}
@Component
@Slf4j
public class UserRegisterListener extends MessageBaseListener {

    /**
     * 监听消息
     *
     * @param consumerRecord
     * @param ack
     */
    @Override
    @KafkaListener(topics = {MessageConstants.REGISTER_USER_TOPIC}, containerFactory = MessageConstants.CONSUMER_CONTAINER_FACTORY_NAME)
    protected void receiverMessage(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        try {
            log.info("consumer message record:{}", consumerRecord);
            Optional<?> optional = Optional.ofNullable(consumerRecord.value());
            if (optional.isPresent()) {
                Object value = optional.get();
                MqSendEvent mqSendEvent = JSON.parseObject((String) value, MqSendEvent.class);
                JSONObject jsonObject = JSON.parseObject(mqSendEvent.getData());
                MDC.put(ElkConstants.TRACE_ID, mqSendEvent.getTraceId());
             //具体业务逻辑
            }
        } finally {
            MDC.clear();
        }
        ack.acknowledge();
    }
}

实现

三,MDC实现链路追踪的原理

1,概述

MDC(Mapped Diagnostic Contexts),是Slf4J类日志系统中实现分布式多线程日志数据传递的重要工具,用户可利用MDC将一些运行时的上下文数据打印出来。目前只有log4j和logback提供原生的MDC支持

2,使用(具体不同场景使用见(二,请求源头))

在logback配置文件中配置MDC容器中的变量%X{trackId}

 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{trackId} [%15.15t] %class{39}.%method[%L] : %m%n</pattern>
            <charset>UTF-8</charset>f
        </encoder>
    </appender>

3,源码分析

MDC中的put方法

    public static void put(String key, String val) throws IllegalArgumentException {
        if (key == null) {
            throw new IllegalArgumentException("key parameter cannot be null");
        } else if (mdcAdapter == null) {
            throw new IllegalStateException("MDCAdapter cannot be null. See also http://www.slf4j.org/codes.html#null_MDCA");
        } else {
            mdcAdapter.put(key, val);
        }
    }

MDCAdapter接口

public interface MDCAdapter {
    //设置当前线程MDC上下文中指定key的值和value的值
    void put(String var1, String var2);
	// 获取当前线程MDC上下文中指定key的值
    String get(String var1);
	// 移除当前线程MDC上下文中指定key的值
    void remove(String var1);
	// 清空MDC上下文
    void clear();
	// 获取MDC上下文
    Map<String, String> getCopyOfContextMap();
	// 设置MDC上下文
    void setContextMap(Map<String, String> var1);
}

LogbackMDCAdapter实现

public class LogbackMDCAdapter implements MDCAdapter {
    final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal();
    private static final int WRITE_OPERATION = 1;
    private static final int MAP_COPY_OPERATION = 2;
    //threadlocal线程级别的,这就是为什么需要remove或者clear的原因所在,
    //防止内存泄露,具体为啥,可以研究一下ThreadLocal底层原理就明白了
    final ThreadLocal<Integer> lastOperation = new ThreadLocal();

    public LogbackMDCAdapter() {
    }
    ......

可以看到LogbackMDCAdapter声明了类型为ThreadLocal的map。ThreadLocal 提供了线程本地的实例。ThreadLocal变量在线程之间隔离而在方法或类间能够共享,是属于线程单独私有的,线程之间相互隔离,到这里就可以大致理解为其实使用的就是ThreadLocal的私有线程的特性,大概就可以明白其中的原理了。

LogbackMDCAdapter 的put方法

    public void put(String key, String val) throws IllegalArgumentException {
        if (key == null) {
            throw new IllegalArgumentException("key cannot be null");
        } else {
            //复制当前线程的threadLocal
            Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
            //设置当前操作为写操作,并返回上一次的操作
            Integer lastOp = this.getAndSetLastOperation(1);
            //上一次不是读操作或者null,已经初始化了,有内容的话在里面设置内容
            if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
                oldMap.put(key, val);
            } else {
                // 复制一个当前线程的副本
                Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
                newMap.put(key, val);
            }

        }
    }

    private Integer getAndSetLastOperation(int op) {
        //设置写操作,并返回上一次操作
        Integer lastOp = (Integer)this.lastOperation.get();
        this.lastOperation.set(op);
        return lastOp;
    }

    private boolean wasLastOpReadOrNull(Integer lastOp) {
        //判断操作类型 是null或者读操作
        return lastOp == null || lastOp == 2;
    }

创建线程安全的map放到threadLocal中

    private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
        Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
        if (oldMap != null) {
            synchronized(oldMap) {
                newMap.putAll(oldMap);
            }
        }

        this.copyOnThreadLocal.set(newMap);
        return newMap;
    }

get方法

    public String get(String key) {
        //从当前线程获取map
        Map<String, String> map = (Map)this.copyOnThreadLocal.get();
        return map != null && key != null ? (String)map.get(key) : null;
    }

remove方法

    public void remove(String key) {
        if (key != null) {
            Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
            if (oldMap != null) {
                //设置为写操作
                Integer lastOp = this.getAndSetLastOperation(1);
                if (this.wasLastOpReadOrNull(lastOp)) {
                    //读操作或者null的时候,复制新map并移除当前key
                    Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
                    newMap.remove(key);
                } else {
                    oldMap.remove(key);
                }

            }
        }
    }

clear方法

    public void clear() {
        //设置为写操作
        this.lastOperation.set(1);
        //移除复制
        this.copyOnThreadLocal.remove();
    }

四,MDC使用总结

mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小