APM(Application Performance Management & Monitoring),应用性能管理监控,主要作用是实时监控系统性能,并定位具体问题。zipkin是Twitter基于Google的Dapper实现的一款APM系统,主要监控分布式调用链,记录调用延时。
架构
zipkin的架构比较简单,如下图所示:
-
collector:收集Span
-
storage:存储Span
-
API:查询调用链
-
UI:提供展示页面
这里的Span是zipkin记录数据的基本单元,通常代表调用链中的一次请求。其数据结构如下图所示:
一个完整的调用链包含至少一个Span,而这个调用链上的所有Span拥有相同且唯一的traceId。而每个Span都有各自不同的id,并且大部分Span还有parentId,这个parentId用于标识在此调用链中,自己的上一个Span的id,以便zipkin能将收集到Collector中的span组成调用链,如下所示:
在zipkin中,主要记录四种操作,也称为Annotation:
-
cs - Client Start,表示客户端发起请求
-
sr - Server Receive,表示服务端收到请求
-
ss - Server Send,表示服务端完成处理,并将结果发送给客户端
-
cr - Client Received,表示客户端获取到服务端返回信息
zipkin部署
根据zipkin官方文档的介绍,启动方式有如下三种:
-
docker启动:
docker run -d -p 9411:9411 openzipkin/zipkin
-
Java启动:
curl -sSL https://zipkin.io/quickstart.sh | bash -s java -jar zipkin.jar
-
源码启动:
# get the latest source git clone https://github.com/openzipkin/zipkin cd zipkin # Build the server and also make its dependencies ./mvnw -DskipTests --also-make -pl zipkin-server clean install # Run the server java -jar ./zipkin-server/target/zipkin-server-*exec.jar
启动之后,访问http://服务器地址:9411就可以看到如下页面:
zipkin示例
根据一个简单的示例了解zipkin的基本作用,zipkin为Java项目提供了对应的SDK——brave,支持http和grpc两种协议,下面的示例以HTTP协议为基础。
引入brave
新建一个springboot项目,pom文件的依赖如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-spancollector-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-web-servlet-filter</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-okhttp</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
</dependencies>
修改application.properties文件,添加如下内容:
com.zipkin.serviceName=service1
com.zipkin.url=http://zinkin服务器地址:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=8080
这些配置以com.zipkin为前缀的主要用于HttpCollector.Config,其最终是应用到HttpConnection中,用于配置往zipkin服务器发送span。
zipkin配置类
定义一个zipkin配置类,用于声明Collector、brave等类。
@Configuration
public class ZipkinConfiguration {
@Autowired
private ZipkinProperties properties;
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder()
.connectTimeout(properties.getConnectTimeout())
.readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled())
.flushInterval(properties.getFlushInterval())
.build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName());
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
@Bean
public BraveServletFilter braveServletFilter(Brave brave){
BraveServletFilter filter =
new BraveServletFilter(brave.serverRequestInterceptor(),
brave.serverResponseInterceptor(),
new DefaultSpanNameProvider());
return filter;
}
@Bean
public OkHttpClient okHttpClient(Brave brave){
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new BraveOkHttpRequestResponseInterceptor(brave.clientRequestInterceptor(), brave.clientResponseInterceptor(), new DefaultSpanNameProvider()))
.build();
return client;
}
}
ZipkinProperties用于获取application.properties文件中的配置:
@Configuration
@ConfigurationProperties(prefix = "com.zipkin")
public class ZipkinProperties {
private String serviceName;
private String url;
private int connectTimeout;
private int readTimeout;
private int flushInterval;
private boolean compressionEnabled;
// 省略了get和set方法
}
本例需要四个服务,用户调用service1,service1会去调用service2,service2会去调用service3和service4,这样形成一条调用链。为了简便,将此四个方法写在同一个controller中,通过改变application配置,分别生成4个不同的jar包。controller代码如下:
@RestController
@RequestMapping("/")
public class DemoController {
@Autowired
private OkHttpClient client;
private Random random = new Random();
@RequestMapping("service1")
public String service1() throws InterruptedException, IOException {
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:8020/service2").get().build();
Response response = client.newCall(request).execute();
return " [service1 sleep " + sleep+" ms]" + response.body().toString();
}
@RequestMapping("service2")
public String service2() throws InterruptedException, IOException {
StringBuilder result = new StringBuilder();
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:8030/service3").get().build();
Response response = client.newCall(request).execute();
result.append(response.body().toString());
request = new Request.Builder().url("http://localhost:8040/service4").get().build();
response = client.newCall(request).execute();
result.append(response.body().toString());
return " [service2 sleep " + sleep+" ms]" + result.toString();
}
@RequestMapping("service3")
public String service3() throws InterruptedException{
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service3 sleep " + sleep+" ms]";
}
@RequestMapping("service4")
public String service4() throws InterruptedException{
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service4 sleep " + sleep+" ms]";
}
}
通过修改application.properties中的serviceName和port,分别生成service1、service2、service3和service4这四个jar包。通过以下命令将其启动:
nohup java -jar service4.jar &
nohup java -jar service3.jar &
nohup java -jar service2.jar &
nohup java -jar service1.jar &
然后访问http://service1服务器地址:8080/service1,访问成功后,就可以在zipkin的查询页面上查询相应调用链的信息。通过该查询页面可以很清楚的知道调用链上每一步的耗时,Span的详细信息。
源码分析
针对以上的示例,对SpanCollector、Brave和BraveServletFilter进行分析。
SpanCollector
在zipkin的配置类ZipkinConfiguration中,定义了一个SpanCollector,其最终实现为HTTPSpanCollector。
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder()
.connectTimeout(properties.getConnectTimeout())
.readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled())
.flushInterval(properties.getFlushInterval())
.build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
config为HttpSpanCollector里抽象内部类,主要配置以Http方式收集span时的参数,不写就使用默认配置。
@AutoValue
public static abstract class Config {
public static Builder builder() {
return new AutoValue_HttpSpanCollector_Config.Builder()
.connectTimeout(10 * 1000)
.readTimeout(60 * 1000)
.compressionEnabled(false)
.flushInterval(1);
}
//other codes
}
创建HttpSpanCollector
public static HttpSpanCollector create(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, config, metrics);
}
// Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0.
HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) {
super(SpanCodec.JSON, metrics, config.flushInterval());
this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";
this.config = config;
}
url为收集zipkin服务器的地址,metrics为span上传或者丢失时的处理类。
/**
* Monitor {@linkplain SpanCollector} by implementing reactions to these events, e.g. updating suitable metrics.
*
* See DropwizardMetricsScribeCollectorMetricsHandlerExample in isSampled sources for an example.
*/
public interface SpanCollectorMetricsHandler {
/**
* Called when spans are submitted to SpanCollector for processing.
*
* @param quantity the number of spans accepted.
*/
void incrementAcceptedSpans(int quantity);
/**
* Called when spans become lost for any reason and won't be delivered to the target collector.
*
* @param quantity the number of spans dropped.
*/
void incrementDroppedSpans(int quantity);
}
HttpSpanCollector重写了sendSpans方法,最后通过sendSpans方法将span发送到zipkin服务器。
@Override
protected void sendSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout());
connection.setReadTimeout(config.readTimeout());
connection.setRequestMethod("POST");
connection.addRequestProperty("Content-Type", "application/json");
if (config.compressionEnabled()) {
connection.addRequestProperty("Content-Encoding", "gzip");
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
compressor.write(json);
}
json = gzipped.toByteArray();
}
connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(json.length);
connection.getOutputStream().write(json);
try (InputStream in = connection.getInputStream()) {
while (in.read() != -1) ; // skip
} catch (IOException e) {
try (InputStream err = connection.getErrorStream()) {
if (err != null) { // possible, if the connection was dropped
while (err.read() != -1) ; // skip
}
}
throw e;
}
}
HttpSpanCollector的Java Class Diagrams如下图:
HttpSpanCollector的父类AbstractSpanCollector实现了reportSpans方法,该方法调用了sendSpans。
@Override
protected void reportSpans(List<Span> drained) throws IOException {
byte[] encoded = codec.writeSpans(drained);
sendSpans(encoded);
}
而reportSpans方法由AbstractSpanCollector的父类FlushingSpanCollector实现的flush方法调用。
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;
int spanCount = drained.size();
try {
reportSpans(drained);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
}
}
FlushingSpanCollector中有一个阻塞队列和一个内部线程类。
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<Span>(1000);
@Nullable // for testing
private final Flusher flusher;
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}
@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}
Flusher内部起了一个线程池,每隔flushInterval秒就执行一次flush()方法。每次flush就将阻塞队列中的所有元素取出,然后放入一个ArrayList中,通过reportSpans上传。
brave
brave是为收集Span提供了便利。
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName());
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
传入builder中的参数是用来生成state的。
public Builder() {
this("unknown");
}
public Builder(String serviceName) {
try {
int ip = toInt(getLocalHostLANAddress());
state = new ThreadLocalServerClientAndLocalSpanState(ip, 0, serviceName);
} catch (UnknownHostException e) {
throw new IllegalStateException("Unable to get Inet address", e);
}
}
public Builder(int ip, int port, String serviceName) {
state = new ThreadLocalServerClientAndLocalSpanState(ip, port, serviceName);
}
public Builder(ServerClientAndLocalSpanState state) {
this.state = Util.checkNotNull(state, "state must be specified.");
}
state是ServerClientAndLocalSpanState,是个接口。
它们都继承了CommonSpanState接口。
public interface CommonSpanState {
Boolean sample();
Endpoint endpoint();
}
sample标记是否采样,endpoint记录节点信息:ip、port和service_name。ServerClientAndLocalSpanState的实现类ThreadLocalServerClientAndLocalSpanState,用ThreadLocal变量保存当前线程的追踪状态。
public final class ThreadLocalServerClientAndLocalSpanState implements ServerClientAndLocalSpanState {
private final static ThreadLocal<ServerSpan> currentServerSpan = new ThreadLocal<ServerSpan>() {
@Override
protected ServerSpan initialValue() {
return ServerSpan.create(null);
}
};
private final static ThreadLocal<Span> currentClientSpan = new ThreadLocal<Span>();
private final static ThreadLocal<Span> currentLocalSpan = new ThreadLocal<Span>();
//other codes
}
在brave里面,封装了一些trace、Interceptor和binder。
public class Brave {
private final ServerTracer serverTracer;
private final ClientTracer clientTracer;
private final LocalTracer localTracer;
private final ServerRequestInterceptor serverRequestInterceptor;
private final ServerResponseInterceptor serverResponseInterceptor;
private final ClientRequestInterceptor clientRequestInterceptor;
private final ClientResponseInterceptor clientResponseInterceptor;
private final AnnotationSubmitter serverSpanAnnotationSubmitter;
private final ServerSpanThreadBinder serverSpanThreadBinder;
private final ClientSpanThreadBinder clientSpanThreadBinder;
//other codes
private Brave(Builder builder) {
serverTracer = ServerTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
clientTracer = ClientTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
localTracer = LocalTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state))
.traceSampler(builder.sampler).build();
serverRequestInterceptor = new ServerRequestInterceptor(serverTracer);
serverResponseInterceptor = new ServerResponseInterceptor(serverTracer);
clientRequestInterceptor = new ClientRequestInterceptor(clientTracer);
clientResponseInterceptor = new ClientResponseInterceptor(clientTracer);
serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state));
serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state);
clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state);
}
}
Tracer用来记录追踪信息,通过server、client和local去处理不同的trace,这是brave提供的几个api,提供设置span信息,创建一个新span并获取其spanId等接口。
BraveServletFilter
BraveServletFilter是http模块提供的拦截器功能,它就是一个servlet过滤器。
@Bean
public BraveServletFilter braveServletFilter(Brave brave){
BraveServletFilter filter =
new BraveServletFilter(brave.serverRequestInterceptor(),
brave.serverResponseInterceptor(),
new DefaultSpanNameProvider());
return filter;
}
public BraveServletFilter(ServerRequestInterceptor requestInterceptor, ServerResponseInterceptor responseInterceptor, SpanNameProvider spanNameProvider) {
this.requestInterceptor = requestInterceptor;
this.responseInterceptor = responseInterceptor;
this.spanNameProvider = spanNameProvider;
}
requestInterceptor和responseInterceptor会分别处理request和response的TraceData数据,写到当前的tracer中,这些数据由RequestAdapter和ResponseAdapter去request和response中的headers中取出来。这些类都分别有server和client对应的实现类。以Server为例分析,ServerRequestInterceptor用于处理请求服务器的request,其处理方法为handle。
public void handle(ServerRequestAdapter adapter) {
serverTracer.clearCurrentSpan();
final TraceData traceData = adapter.getTraceData();
Boolean sample = traceData.getSample();
if (sample != null && Boolean.FALSE.equals(sample)) {
serverTracer.setStateNoTracing();
LOGGER.fine("Received indication that we should NOT trace.");
} else {
if (traceData.getSpanId() != null) {
LOGGER.fine("Received span information as part of request.");
SpanId spanId = traceData.getSpanId();
serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
spanId.nullableParentId(), adapter.getSpanName());
} else {
LOGGER.fine("Received no span state.");
serverTracer.setStateUnknown(adapter.getSpanName());
}
serverTracer.setServerReceived();
for(KeyValueAnnotation annotation : adapter.requestAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
}
}
该方法首先会清除当前线程中的span,然后通过ServerRequestAdapter去获取span,其实现如下:
@Override
public TraceData getTraceData() {
final String sampled = serverRequest.getHttpHeaderValue(BraveHttpHeaders.Sampled.getName());
if (sampled != null) {
if (sampled.equals("0") || sampled.toLowerCase().equals("false")) {
return TraceData.builder().sample(false).build();
} else {
final String parentSpanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.ParentSpanId.getName());
final String traceId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.TraceId.getName());
final String spanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.SpanId.getName());
if (traceId != null && spanId != null) {
SpanId span = getSpanId(traceId, spanId, parentSpanId);
return TraceData.builder().sample(true).spanId(span).build();
}
}
}
return TraceData.builder().build();
}
该方法会从request的header中取出相应的数据,然后生成一个traceData。最后将一系列信息如spanId,annotation绑定到该serverTrace中。
在handle方法中,如果是sample为false,即不取样,那么这里是不会生成span的。如果SpanId不为空,则说明有父span,执行serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId, spanId.nullableParentId(), adapter.getSpanName());
其方法具体如下:
public void setStateCurrentTrace(long traceId, long spanId, @Nullable Long parentSpanId, @Nullable String name) {
checkNotBlank(name, "Null or blank span name");
spanAndEndpoint().state().setCurrentServerSpan(ServerSpan.create(traceId, spanId, parentSpanId, name));
}
如果spanId为空,则说明没有父span,执行serverTracer.setStateUnknown(adapter.getSpanName());
其方法如下:
public void setStateUnknown(String spanName) {
checkNotBlank(spanName, "Null or blank span name");
long newTraceId = randomGenerator().nextLong();
if (!traceSampler().isSampled(newTraceId)) {
spanAndEndpoint().state().setCurrentServerSpan(ServerSpan.NOT_SAMPLED);
return;
}
spanAndEndpoint().state().setCurrentServerSpan(
ServerSpan.create(newTraceId, newTraceId, null, spanName));
}
创建ServerSpan的方法如下:
static ServerSpan create(long traceId, long spanId, @Nullable Long parentSpanId, String name) {
Span span = new Span();
span.setTrace_id(traceId);
span.setId(spanId);
if (parentSpanId != null) {
span.setParent_id(parentSpanId);
}
span.setName(name);
return create(span, true);
}
而对于ServerResponseInterceptor,它处理需要返回的response,其处理方法如下:
public void handle(ServerResponseAdapter adapter) {
// We can submit this in any case. When server state is not set or
// we should not trace this request nothing will happen.
LOGGER.fine("Sending server send.");
try {
for(KeyValueAnnotation annotation : adapter.responseAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
serverTracer.setServerSend();
} finally {
serverTracer.clearCurrentSpan();
}
}
在设置了annotation之后,调用setServerSend方法。
public void setServerSend() {
if (submitEndAnnotation(zipkinCoreConstants.SERVER_SEND, spanCollector())) {
spanAndEndpoint().state().setCurrentServerSpan(null);
}
}
submitEndAnnotation中设置了Span的类型或者说标签,并使用SpanCollector收集该Span。
boolean submitEndAnnotation(String annotationName, SpanCollector spanCollector) {
Span span = spanAndEndpoint().span();
if (span == null) {
return false;
}
Annotation annotation = Annotation.create(
currentTimeMicroseconds(),
annotationName,
spanAndEndpoint().endpoint()
);
span.addToAnnotations(annotation);
if (span.getTimestamp() != null) {
span.setDuration(annotation.timestamp - span.getTimestamp());
}
spanCollector.collect(span);
return true;
}
在这里会将最后的一些信息添加到span中,包括延时信息,然后调用collect方法提交span,这里的collect方法由之前提到的FlushingSpanCollector实现。
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}
最后提一下BraveServletFilter的第三个参数SpanNameProvider,其是一个为span提供name的接口。
public interface SpanNameProvider {
String spanName(HttpRequest request);
}
其默认实现为DefaultSpanNameProvider。
public class DefaultSpanNameProvider implements SpanNameProvider {
@Override
public String spanName(HttpRequest request) {
return request.getHttpMethod();
}
}
默认即为显示httpmethod,如get,post之类的。
总结
粗略地介绍了zipkin的架构和使用方法,并借助一个简单的示例演示了其功能,并简单分析了基于Http协议实现的span收集。只介绍了server端的拦截器处理,其实client端的也类似。都是通过在拦截到request请求时,记录初始信息,生成span,在业务处理完成之后,拦截response,计算延时等信息,将最后的span加入到spancollector的阻塞队列中,spancollector通过一个线程每隔一段时间将这些span信息发送给zipkin服务器。zipkin服务器通过spanId、traceId和parentId将这些span组成调用链并提供查询展示。