掘金 后端 ( ) • 2024-06-15 14:31

theme: vue-pro

前言

本文基于SkyWalking8.6.0分析SkyWalking客户端(javaagent)的tracing部分。

  1. 理解SkyWalking中trace相关模型;
  2. trace跨进程传播原理;
  3. trace跨线程传播原理;
  4. 跨线程span开启和停止;

一、Segment和Span

1、TraceSegment

A {@link TraceSegment} means the segment, which exists in current {@link Thread}. And the distributed trace is formed by multi {@link TraceSegment}s, because the distributed trace crosses multi-processes, multi-threads.

在OpenTelemetry中Trace由n个Span构成;

在SkyWalking中Trace由m个Segment构成,每个Segment包含n个Span。

每个Segment包含一个线程中的Span。

当TraceSegment构造时,会分配全局唯一segmentId和traceId,链路中第一个Segment的traceId是链路的traceId。

GlobalIdGenerator#generate:segmentId和traceId的生成算法。

id=进程id(uuid).线程id.线程自增序列

其中线程自增序列(nextSeq)=时间戳+4位自增序列(0-9999)

即通过segmentId和traceId可以反向定位segment和trace生成时间。

TraceSegment#relatedGlobalTrace:

如果当前Segment非链路中第一个Segment,traceId会被覆盖为链路上下文中的traceId。

2、AbstractTracingSpan

AbstractTracingSpan是Span的抽象实现,包含了众多关键属性。

  1. spanId:span在segment下的唯一标识,从0开始自增;
  2. parentSpanId:父span的id,-1代表当前span是segment中的第一个span;
  3. operationName:操作名,如SpringMVC的{POST}/api/v1/x,RabbitMQ消费者的RabbitMQ/Topic/{exchange}/Queue/{routingKey}/Consumer;
  4. tags:标签,如http请求有http.method、status_code,db有db.statement,mq有mq.broker、mq.topic,部分标签支持搜索(searchableTracesTags);
  5. startTime/endTime:span的开始时间和结束时间,注意这里都是客户端时间
  6. componentId:组件id,官方支持的组件见ComponentsDefine,如11-Feign、14-SpringMVC;

AbstractTracingSpan#start:记录开始时间。

AbstractTracingSpan#finish:记录结束时间,并将span加入segment。

3、Span类型

span有三种类型:Local/Entry/Exist。

LocalSpan

一个普通的span,线程内部操作。

比如:

  1. 自定义LocalSpan,apm-toolkit-trace提供的Trace注解
  2. 跨线程的Runnable是LocalSpan,apm-toolkit-trace提供了TraceCrossThread注解
  3. SpringSchedule是LocalSpan;

StackBasedTracingSpan

Entry和Exist类型Span都继承StackBasedTracingSpan,特点是基于栈的数据结构可以多次start和finish

StackBasedTracingSpan解决一次用户代码调用经过多个skywalking插件的问题,比如一个普通springboot应用,入口流量会经过tomcat和springmvc插件,但是只需要记录一个EntrySpan。

StackBasedTracingSpan持有stackDepth代表当前栈的深度,每次start栈深度+1,每次finish栈深度-1。

StackBasedTracingSpan复写了父类AbstractTracingSpan的finish方法当且仅当span最后一次finish后,即最后一个插件执行完成后,才真正执行finish,记录span结束时间并加入segment。

EntrySpan

EntrySpan一般是流量入口,一个Segment(线程)中最多只有一个EntrySpan。

EntrySpan维护了一个currentMaxDepth,用于记录到达的最大深度。

EntrySpan#start:

首次start记录startTime每次调用start都会清空一些span的属性,如componentId(Tomcat是1,SpringMVC是14)。

EntrySpan#tag:

以tag为例,只有最后一次调用Span#start的业务才能记录tag数据

如Tomcat插件+SpringMVC插件:

  1. Tomcat:请求先经过Tomcat插件,创建EntrySpan,记录开始和结束时间;
  2. SpringMVC:在经过Tomcat之后,经过SpringMVC插件,这里不会再创建新EntrySpan,而是清空并覆盖Tomcat记录的数据(operationName、tag、componentId等);

ExitSpan

ExitSpan一般是流量出口,比如一次rpc调用、db查询。

ExistSpan#start:首次开启ExistSpan,记录开始时间。

ExitSpan#tag:只有首次开启ExistSpan的业务能记录tag、componentId等信息。

举例,如RestTemplate使用OkHttp客户端,对于用户来说只有一个get请求调用,但是实际经过了skywalking两个插件拦截:okhttp和RestTemplate。

由于先走RestTemplate插件,所以RestTemplate插件开启ExitSpan,记录开始时间、结束时间、tag、componentId等信息。

RestTemplate template = new RestTemplate();
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
template.setRequestFactory(factory);
template.getForEntity("http://service/api", String.class);

4、TraceSegmentRef

TraceSegment和Span可以通过TraceSegmentRef关联到上游segment的一个span。

TraceSegment会关联1个ref。

Span在rpc场景下只会关联1个ref,在批量消费场景下会关联n个ref。

通过TraceSegmentRef将不同进程线程之间的Segment/Span进行连接。

跨进程,通过ContextCarrier构造。

跨线程,通过ContextSnapshot构造。

二、TracingContext

TracingContext管理当前线程的Segment和Span,Span通过一个栈数据结构维护。

TracingContext构造会创建Segment,分配trace_id和segment_id。

TracingContext维护的是一个LinkedList的物理Span栈,这些Span最终都要收集在segment里发送给OAP,常常是1个EntrySpan入口流量+n个ExitSpan出口流量。

Entry/ExitSpan是一个插件栈,比如对于一次rpc调用,可能经过多个插件拦截,需要确定由哪个插件来设置span的属性。

1、创建Span

EntrySpan

TracingContext#createEntrySpan:创建EntrySpan

  1. 默认SPAN_LIMIT_PER_SEGMENT=300,一个segment中只会有300个span;
  2. 找栈顶span;
  3. 如果栈顶span是EntrySpan,覆盖operationName并重新start(清空span属性);
  4. 如果栈顶span非EntrySpan或栈空,创建新的EntrySpan,start并入栈;
  5. 每个新span的spanId由TracingContext通过从0自增的spanIdGenerator管理;
  6. span之间的父子关系,通过parentSpanId维护,栈底span的parentSpanId是-1;

ExitSpan

TracingContext#createExitSpan:类似EntrySpan,优先取栈顶ExitSpan覆盖。

LocalSpan

TracingContext#createLocalSpan:LocalSpan直接创建启动并入栈。

2、查询

当前segmentId

TracingContext#getSegmentId:

当前激活的Span

TracingContext#activeSpan:从activeSpanStack栈中获取栈顶Span。

当前spanId

TracingContext#getSpanId:

3、停止Span

TracingContext#stopSpan:

  1. 校验栈顶span与入参span一致;
  2. 如果是NoopSpan(不采集空实现),span出栈;
  3. 如果是Entry/Exist/Local,执行span的finish方法,如果span#finish成功,即Span中所有插件都完成,当前span出栈;
  4. 执行TracingContext#finish;

TracingContext#finish当TracingContext中所有Span都出栈后,segment会被发送给OAP

注:关于Segment的序列化和发送OAP细节,下一章再分析。

三、ContextManager

ContextManager用ThreadLocal管理当前线程的TracingContext

注:RuntimeContext用于plugin中传递上下文参数,底层是个map。

和TracingContext暴露了类似方法,只不过是静态方法,可以通过ThreadLocal管理当前线程的TracingContext。查询方法不再赘述,通过ThreadLocal拿到TracingContext即可。

1、创建Span

EntrySpan

ContextManager#createEntrySpan:

对于非跨进程的情况,没有ContextCarrier,执行ContextManager#getOrCreate创建TracingContext,再通过TracingContext创建EntrySpan。

注:OPERATION_NAME_THRESHOLD默认150,如果operationName超出长度会被截断到该阈值,比如接口名太长会被截断;

ContextManager#getOrCreate:如果当前线程还没创建TracingContext,则委派ContextManagerExtendService创建放入ThreadLocal。

ContextManagerExtendService#createTraceContext:

  1. 默认KEEP_TRACING=false,如果客户端与oap的连接断开(如果客户端是将trace发送到kafka,这里是和kafka的连接状态),不收集trace;
  2. 对于IGNORE_SUFFIX指定的部分后缀,不采集trace,如jpg;
  3. SamplingService决定是否采集Trace,通过SAMPLE_N_PER_3_SECS可配置采样率,即3s内采样n个trace,默认SAMPLE_N_PER_3_SECS=-1代表全量采样

ExitSpan

ContextManager#createExitSpan有两个重载方法。

  1. 带ContextCarrier,常见的是远程调用,比如feign、dubbo、producer;
  2. 不带ContextCarrier,常见的是db操作,比如mysql、redis;

逻辑同EntrySpan。

LocalSpan

ContextManager#createLocalSpan:同EntrySpan

2、停止Span

ContextManager#stopSpan:建议使用有Span入参的重载方法。

当TracingContext的activeSpanStack清空后,ThreadLocal移除当前TracingContext。

四、TracingContext跨进程传播

1、CorrelationContext

TracingContext除了存储segment和span外,还有CorrelationContext

public class TracingContext {
    private final CorrelationContext correlationContext;
    // ...
}

CorrelationContext是面向业务的扩展信息,用map存储数据,可以在链路上传播业务数据

apm-toolkit-traceTraceContext工具类,可以在服务消费方注入业务id。

TraceContext.putCorrelation("order_id", order.getId());

在服务提供方可以获取业务id。

Optional<String> orderId = TraceContext.getCorrelation("order_id");

CorrelationContextPutInterceptor拦截putCorrelation,将kv存储到当前TracingContext的CorrelationContext中。

CorrelationContextGetInterceptor拦截getCorrelation,从TracingContext的CorrelationContext中获取kv。

CorrelationContext#put:

除了跨进程传播业务数据外,通过配置在agent侧配置correlation.auto_tag_keys=业务key,还能自动在当前span用业务key打tag。

在OAP侧配置searchableTracesTags=业务key,就能通过tag=业务key查询trace。

2、ExtensionContext

TracingContext还有ExtensionContext

public class TracingContext {
    private final ExtensionContext extensionContext;
    // ...
}

ExtensionContext是面向skywalking内部的扩展信息:

  1. skipAnalysis:是否跳过OAP analysis,默认false,这个属性暂时忽略,涉及OAP;
  2. sendingTimestamp:专门用于mq场景,producer记录发送时间,consumer侧收消息后计算时间差,用于统计接收消息延迟
public class ExtensionContext {
    private boolean skipAnalysis;
    private Long sendingTimestamp;
}

在consumer的EntrySpan里,会记录tag=transmission.latency。(单位毫秒)

3、ContextCarrier

进程之间通过ContextCarrier的数据结构传递TracingContext

ContextCarrier包含三部分:

  1. trace基础信息:如traceId、segmentId、spanId等;
  2. ExtensionContext:面向skywalking自己的扩展信息;
  3. CorrelationContext:面向业务的扩展信息;
public class ContextCarrier implements Serializable {
    // 链路traceId
    private String traceId;
    // 服务消费方 ExitSpan所属segmentId
    private String traceSegmentId;
    // 服务消费方 ExitSpan的spanId
    private int spanId = -1;
    // 服务消费方 服务名
    private String parentService = Constants.EMPTY_STRING;
    // 服务消费方 instanceId
    private String parentServiceInstance = Constants.EMPTY_STRING;
    // 服务消费方 operationName
    private String parentEndpoint;
    // 服务消费方得到服务提供方的ip:port
    private String addressUsedAtClient;
    // 面向OAP的扩展信息
    private ExtensionContext extensionContext = new ExtensionContext();
    // 面向用户的扩展信息
    private CorrelationContext correlationContext = new CorrelationContext();
}

服务消费方

DefaultHttpClientInterceptor#beforeMethod:以feign客户端为例

  1. 构造ContextCarrier
  2. ContextManager创建ExitSpan,将TracingContext注入ContextCarrier
  3. ContextCarrier.items,对ContextCarrier序列化
  4. headers.put,将ContextCarrier注入http请求头

ContextManager#createExitSpan:创建ExitSpan最后阶段,调用TracingContext#inject。

TracingContext#inject:将TracingContext中的属性注入ContextCarrier:

  1. traceId:链路id;
  2. traceSegmentId:当前TracingContext的Segment的id;
  3. spanId:取当前ExitSpan(activeSpan),该spanId注入ContextCarrier将来作为服务提供方segment的父spanId;
  4. parentService:agent配置的service_name;
  5. parentServiceInstance:当前实例id;
  6. parentEndpoint:当前TracingContext的Segment下的第一个span的operationName,比如在SpringMVC后的feign调用,是SpringMVC的operationName,但是在ServletFilter里的feign调用,是Tomcat的operationName;
  7. addressUsedAtClient:服务提供方的ip和port;

ContextCarrier#items:构造CarrierItem迭代器,用于后续将ContextCarrier的数据放入请求头。

每个CarrierItem是一个kv对,代表ContextCarrier的一个部分。

CarrierItem的实现类都会在构造时对关注的ContextCarrier中部分数据进行序列化。

SW8CarrierItem的key是sw8,value对应ContextCarrier序列化数据。

ContextCarrier#serialize:ContextCarrier序列化包含trace基本信息,每个字段用base64编码,用-拼接。

SW8CorrelationCarrierItem的key是sw8-correlation,value对应CorrelationContext序列化数据。

SW8ExtensionCarrierItem的key是sw8-x,value对应ExtensionContext序列化数据。

对于feign客户端,会迭代所有CarrierItem实现类的kv,注入http请求头。

服务提供方

TomcatInvokeInterceptor#beforeMethod:服务提供方以tomcat为例

  1. 构造ContextCarrier
  2. CarrierItem.setHeadValue:反序列化并注入ContextCarrier
  3. ContextManager创建EntrySpan,将ContextCarrier恢复到TracingContext;

迭代所有CarrierItem的setHeadValue方法,将请求头中的sw8、sw8-correlation sw8-x反序列化注入ContextCarrier。

ContextManager#createEntrySpan:

将ContextCarrier数据重新注入当前线程的TracingContext。

注:如果上游进程采集了trace,下游进程的trace数据一定会采样

TracingContext#extract:在服务提供者侧,ContextCarrier会构成一个TraceSegmentRef对象。ref对象会关联至segment和当前激活的EntrySpan,至此完成trace跨进程传播。

注:segment会关联1个ref,span在rpc场景下只会关联1个ref,span在批量消费场景下会关联n个ref。

TraceSegmentRef

五、TracingContext跨线程传播

ContextManager#capture:父线程,获取当前线程的TracingContext,执行capture。

TracingContext#capture:将TracingContext数据提取为ContextSnapshot

ContextManager#continued:子线程拿到snapshot,判断snapshot的segmentId与当前线程segmentId不一致,执行TracingContext#continue。

TracingContext#continued:利用ContextSnapshot构造TraceSegmentRef,关联ref至当前线程segment和激活Span。

如利用apm-toolkit-trace提供的RunnableWrapper实现跨线程传播。

本质是CallableOrRunnableActivation插件拦截了被TraceCrossThread注解修饰的类。

CallableOrRunnableConstructInterceptor

Runnable构造时,将当前线程的ContextSnapshot保存到Runnable的一个成员变量里。

注:skywalking插件增强的类都会实现EnhancedInstance接口,用一个Object成员变量 _$EnhancedClassField_ws用于存储扩展数据,这里setSkyWalkingDynamicField就是将snapshot保存到这个成员变量中。

CallableOrRunnableInvokeInterceptor

Runnable执行前,获取成员变量中的ContextSnapshot,注入当前线程的TracingContext。

六、跨线程Span

ContextManager通过ThreadLocal管理TracingContext,对于TracingContext内部的segment和span操作,都是同一线程下的操作。

在一些特殊情况下,一个Span可能在不同的线程中start和finish。

AbstractTracingSpan实现了AsyncSpan顶级接口。

AsyncSpan用于跨线程操作span,prepareForAsyncasyncFinish需要配对使用。

public interface AsyncSpan {
    AbstractSpan prepareForAsync();
    AbstractSpan asyncFinish();
}

比如SpringWebFlux中提供的WebClient客户端,发送请求和接收响应的线程不同。

final WebClient webClient = WebClient.builder().build();
public Mono<String> webClient() {
    return webClient.get().uri("https://www.baidu.com")
    .retrieve().bodyToMono(String.class);
}

WebFluxWebClientInstrumentation定义了拦截点,即ExchangeFunctions$DefaultExchangeFunctionexchange方法。

WebFluxWebClientInterceptor#beforeMethod:

  1. 创建ExitSpan;
  2. prepareForAsync;
  3. 停止ExitSpan;
  4. 将ExitSpan放入DefaultExchangeFunction的增强字段_$EnhancedClassField_ws;

WebFluxWebClientInterceptor#afterMethod:

从DefaultExchangeFunction的增强字段_$EnhancedClassField_ws中获取ExitSpan,在响应结束后执行span#asyncFinish。

AbstractTracingSpan#prepareForAsync底层调用TracingContext#awaitFinishAsync,更新TracingContext的isRunningInAsyncMode=true,并对于一个asyncSpanCounter进行自增。

ContextManager停止当前ExitSpan,当前线程TracingContext的span栈会移除ExitSpan。

TracingContext#stopSpan:停止span

TracingContext#finish:由于asyncSpanCounter不为0,TracingContext不会结束segment,也不会将segment发送给OAP。

AbstractTracingSpan#asyncFinish:另一个线程通过各种方式拿到ExitSpan(比如WebClient从DefaultExchangeFunction的增强字段_$EnhancedClassField_ws拿到)记录span结束时间。

注意,上个线程的TracingContext仍然被ExitSpan持有

TracingContext#asyncStop:asyncSpanCounter自减,再次调用finish。

asyncSpanCounter减少到0,就能真正结束segment,将segment发送至OAP。

对于Segment来说,由A线程创建,由B线程最终发送给OAP。

总结

模型概述

ContextManager

SkyWalking插件通过ContextManger获取或创建当前线程(ThreadLocal)TracingContext

TracingContext

TracingContext包含一个TraceSegment

TracingContext使用LinkedList栈管理n个Span

当所有Span出栈后,TraceSegment关闭,发送至OAP。

TraceSegment

Segment通过segmentId唯一标识,id生成算法=进程id(uuid).线程id.线程自增序列

其中线程自增序列(nextSeq)=时间戳+4位自增序列(0-9999)

如果Segment是trace中第一个Segment,还会生成traceId,生成算法与segmentId相同。

Segment持有n个结束的Span。

Span

Span代表一个操作,由插件创建和关闭,Span记录了众多有用的属性:

  1. operationName:重要,比如SpringMVC是接口名,如{POST}/api/v1/x;
  2. spanId:span在segment下的唯一标识,从0开始自增;
  3. parentSpanId:父span的id,-1代表当前span是segment中的第一个span;
  4. tags:标签,如http请求有http.method、status_code,db有db.statement,mq有mq.broker、mq.topic,部分标签支持搜索(searchableTracesTags);
  5. startTime/endTime:span的开始时间和结束时间,客户端时间
  6. componentId:组件id,如11-Feign、14-SpringMVC;

Span有两个核心方法:

  1. start:开启span,记录开始时间;
  2. finish:关闭span,记录结束时间,并加入所属Segment;

Span有三种类型:

  1. EntrySpan:入口Span,比如Tomcat、SpringMVC、Consumer;
  2. ExitSpan:出口Span,比如rpc、db;
  3. LocalSpan:本地Span,比如线程切换、SpringSchedule;

其中EntrySpan和ExitSpan比较特殊,是一个基于计数器实现的逻辑栈(StackBasedTracingSpan) ,可以多次start和finish。

TracingContext中同类型活跃Span(栈顶Span)不会重复创建而是复用,这是为了解决一次用户代码调用经过多个skywalking插件的问题

如一个普通springboot应用入口流量经过tomcat和springmvc,只会生成一个EntrySpan,tomcat记录span开始和结束时间,springmvc记录operationName、componentId、tag等信息。

如RestTemplate使用OkHttp进行远程调用,只会生成一个ExitSpan,所有信息都由RestTemplate记录。

无论Entry还是Exit,简单来说:

  1. 距离用户最近的插件,记录operationName、componentId、tag;
  2. 第一个经过的插件记录,记录开始和结束时间;

跨进程传播

CorrelationContext

TracingContext中还维护了一个CorrelationContext

CorrelationContext是面向业务的扩展信息,用map存储数据,可以在链路上传播业务数据

apm-toolkit-traceTraceContext工具类提供了CorrelationContext的读写方法。

TraceContext.putCorrelation("order_id", order.getId());
Optional<String> orderId = TraceContext.getCorrelation("order_id");

ExtensionContext

TracingContext中的ExtensionContext,是面向skywalking内部的扩展信息,也可以在链路上传播。

ExtensionContext有两个属性:

  1. skipAnalysis:是否跳过OAP analysis,默认false;
  2. sendingTimestamp:专门用于mq场景,producer记录发送时间,consumer侧收消息后计算时间差,用于统计接收消息延迟

ContextCarrier

进程间通过ContextCarrier序列化和传播数据。

ContextCarrier包含三部分:

  1. trace基础信息:如traceId、segmentId、spanId等;
  2. ExtensionContext:面向skywalking自己的扩展信息;
  3. CorrelationContext:面向业务的扩展信息;

客户端将TracingContext中的trace基础信息存储到ContextCarrier:

  1. traceId:链路id;
  2. traceSegmentId:当前TracingContext的Segment的id;
  3. spanId:当前ExitSpan的spanId;
  4. parentService:agent配置的service_name;
  5. parentServiceInstance:当前实例id;
  6. parentEndpoint:当前Segment下的第一个span的operationName;(注意span的operationName在不同插件会被覆盖,不一定是segment上报时的operationName)
  7. addressUsedAtClient:服务端的ip和port;

以http调用为例,ContextCarrier的三部分数据会通过请求头传递到服务端:

  1. trace基础信息:请求头=sw8;
  2. ExtensionContext:请求头=sw8-x;
  3. CorrelationContext:请求头=sw8-correlation;

服务端将ContextCarrier反序列化,原封不动地转换为一个TraceSegmentRef,注入自己的TracingContext。

TracingSegment关联1个TraceSegmentRef。

批量消费场景,Span关联n个TraceSegmentRef;普通rpc场景,Span关联1个TraceSegmentRef。

跨线程传播

跨线程和跨进程类似。

父线程将TracingContext的trace信息保存为ContextSnapshot

ContextSnapshot模型与ContextCarrier取数逻辑一致。

子线程拿到ContextSnapshot,转换为TraceSegmentRef,注入自己的TracingContext。

线程之间需要传递ContextSnapshot,比如通过skywalking后的类的 _$EnhancedClassField_ws成员变量。

跨线程Span

TracingContext通过ThreadLocal维护,本质上所有Segment、Span操作都在一个线程中。

有一些特殊情况,Span会跨线程,比如A线程开启Span,B线程停止Span。

比如一次rpc调用,发送请求和接收响应的线程不是同一个线程。

SkyWalking的所有Span都实现了AsyncSpan:

  1. prepareForAsync:开启异步span,TracingContext的asyncSpanCounter计数器+1;
  2. asyncFinish:停止异步span,TracingContext的asyncSpanCounter计数器-1;

虽然A线程的Span栈将Span弹出了,但是Span仍然关联着TracingContext,由另一个线程拿到这个异步Span(比如通过 _$EnhancedClassField_ws拿到),通过Span关联的TracingContext操作计数器。

当TracingContext开启异步Span后,只有asyncSpanCounter归零才会真正将Segment发送给OAP。

Agent的一些配置

  1. agent.span_limit_per_segment:默认300,一个segment中最多300个span;
  2. agent.operation_name_threshold:默认150,span的operationName超出150会被截断;
  3. agent.ignore_suffix:采样相关,以这些后缀为结尾的operationName不会采集Trace,默认配置如.jpg,.js等资源文件;
  4. agent. sample_n_per_3_secs:采样频率,配置n代表3s采样n个trace,默认-1全量采样
  5. agent.keep_tracing:采样相关,默认false,如果客户端与OAP断开连接(或kafka),则不采集Trace;
  6. correlation.auto_tag_keys:默认空,correlation的key支持自动打tag到当前span上;

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。