乐虎游戏|乐虎国际登录|欢迎你

OkHttp源码深入解读

日期:2020-03-01编辑作者:计算机资讯

简介

目前在HTTP协议请求库中,OKHttp应当是非常火的,使用也非常的简单。网上有很多文章写了关于OkHttp的特点以及使用,而本章主要带领大家阅读OkHttp的源码,让大家对OKhttp的工作原理有所了解,当然源码的代码量是非常大的,这里我只是抓住主线和重点部分,至于细节部分,大家随着我抛出的线来跟进基本是没什么问题的。这篇文章要干嘛,引用一句话:

read the fucking source code

<a href=";

OkHttp优点

OkHttp是一个高效的Http客户端,有如下的特点:

  • 支持HTTP2/SPDY黑科技

  • socket自动选择最好路线,并支持自动重连

  • 拥有自动维护的socket连接池,减少握手次数

  • 拥有队列线程池,轻松写并发

  • 拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)

  • 基于Headers的缓存策略(不仅可以缓存数据,就连响应头都给缓存了)


目录:
  • OkHttp介绍
  • 粗绘请求流程
  • RealCall方法execute
  • getResponseWithInterceptorChain调用链
  • RetryAndFollowUpInterceptor
  • ConnectInterceptor获取连接
  • CallServerInterceptor网络请求
  • RealConnection
  • StreamAllocation
  • HttpCodec(Http1Codec)
  • 同步/异步请求

本文基于compile 'com.squareup.okhttp3:okhttp:3.6.0'分析

源码涉及的主要几个对象
  • Call:对请求的封装,有异步请求和同步请求。

  • Dispatcher:任务调度器

  • Connection:是RealConnection的父类接口,表示对JDK中的物理socket进行了引用计数封装,用来控制socket连接

  • HttpCodec:对Http请求进行编码,对Http响应进行解码,由于Http协议有基于HTTP1.0和Http2.0的两种情况,Http1Code代表基于Http1.0协议的方式,Http2Code代表基于Http2.0协议的方式。

  • StreamAllocation: 用来控制Connections/Streams的资源分配与释

  • RouteDatabase:用来保存连接的错误路径,以便能提升连接的效率。

  • RetryAndFollowUpInterceptor 负责失败重试以及重定向的拦截器

  • BridgeInterceptor: 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转为用户友好的响应的

  • CacheInterceptor: 负责读取缓存直接返回、更新缓存

  • ConnectInterceptor: 负责和服务器建立连接的

  • CallServerInterceptor:负责向服务器发送请求数据、从服务器读取响应数据

OkHttp介绍:

特点:

  • 支持连接同一地址的连接共享同一个socket
  • 支持Socket连接池,减少请求延迟
  • 使用拦截器模式,将流程拆分
  • 透明的GZIP压缩

0.写在前面

以前觉得会使用一个库就行了,知其然不知其所以然,在这里,记录一下学习的过程

源码解析

源码开始之前我先贴一段OkHttp请求网络的实例

   OkHttpClient mOkHttpClient = new OkHttpClient();

        final Request request = new Request.Builder()
                .url("https://www.jianshu.com/u/b4e69e85aef6")
                .addHeader("user_agent","22222")
                .build();
          Call call = mOkHttpClient.newCall(request);
          call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                if(response != null )
                Log.i(TAG, "返回服务端数据:"+ String.valueOf(response.body().string()));
            }
        });

(1)OkHttp网络请求流程

Call call = mOkHttpClient.newCall(request);

首先会new一个Call对象出来,但其实真正new出来的对象是NewCall对象

 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

然后会执行call的enqueue方法

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

该方法中首先判断请求有没有被执行,如果请求已经执行,那么直接抛出异常,如果请求没有执行,就会执行Dispatcher对象的enqueue方法,Dispatcher的enqueue方法的源码如下所示:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

如果正在运行的异步请求数量小于最大的并发数,且正在运行的客户端实际数量请求小于规定的每个主机最大请求数量,那么就把该请求放进正在运行的异步请求队列中,否则就把该请求放进将要执行的异步请求队列中。


(2) Dispatcher任务调度

Dispatcher的各个参数的说明如下:

  //支持的最大并发请求数量
  private int maxRequests = 64;
 //每个主机的最大请求数量
  private int maxRequestsPerHost = 5;

  //请求线程池
  private @Nullable ExecutorService executorService;

  //将要运行的异步请求队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在运行的异步请求队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //正在运行的同步请求队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  • maxRequests :OkHttp支持的最大并发请求数量

  • maxRequestsPerHost :每个主机的最大请求数量

  • readyAsyncCalls :将要运行的异步请求队列

  • runningAsyncCalls :正在运行的异步请求队列

  • runningSyncCalls :正在运行的同步请求队列

继续看看Dispatcher的executorService方法,如下:

 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher初始化了一个线程池,核心线程的数量为0 ,最大的线程数量为Integer.MAX_VALUE,空闲线程存在的最大时间为60秒,这个线程类似于CacheThreadPool,比较适合执行大量的耗时比较少的任务。同时我们Dispatcher也可以来设置自己线程池。

Dispatcher我们大概了解之后,回到之前说的,call的enqueue方法其实执行的使Dispatcher的enqueue方法,Dispatcher之后会把call放进请求队列中,最终执行由线程池来执行请求任务。下面来看看RealCall里究竟执行了什么任务。

  @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
       ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

RealCall通过执行getResponseWithInterceptorChain()返回Response,如果请求被取消则在进行OnFailue回调,如果请求成功则进行onResponse的回调。
这里要注意两点:

  • 请求如果被取消,其回调实在onFailue中进行回调的

  • enqueue方法的回调是在子线程中完成的


(3) 拦截器

那么RealCall 的getResponseWithInterceptorChain方法中究竟干了些什么呢,它是如何返回Response的呢?

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//1
    interceptors.add(retryAndFollowUpInterceptor);//2
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//3
    interceptors.add(new CacheInterceptor(client.internalCache()));//4
    interceptors.add(new ConnectInterceptor(client));//5
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());//6 
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//7

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);//8
  }
  1. 在配置 OkHttpClient 时设置的 interceptors ()

  2. 负责失败重试以及重定向的RetryAndFollowUpInterceptor

  3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转为用户友好的响应的 BridgeInterceptor

  4. 负责读取缓存直接返回、更新缓存的 CacheInterceptor

  5. 负责和服务器建立连接的 ConnectInterceptor

  6. 配置 OkHttpClient 时设置的 networkInterceptors

  7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor

  8. 在 return chain.proceed(originalRequest),中开启链式调用

RealInterceptorChain的proceed方法源码如下:

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

理解了这段代码其实整个OkHttp核心流程你就基本掌握了,开始看的时候大家可能头都大了,可是当你debug一下你就豁然开朗了。这段代码核心在下面这部分:

 // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(ne
xt);

首先来了解一下拦截器吧,拦截器是一种能够监控、重写,重试调用的机制。通常情况下,拦截器用来添加、移除、转换请求和响应的头部信息。比如将域名替换为IP地址,在请求头中移除添加host属性;也可以添加我们应用中的一些公共参数,比如设备id、版本号,等等。

拦截器的基本代码结构如下:

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

}
}
  • 拦截器的intercept方法中持有一个Chain对象,上面的RealInterceptorChain其实就是一个Chain的实现类,然后chain对象的request方法可以拿到Request对象,proceed方法可以拿到Response对象,也就是说我们可以通过实现Interceptor,定义一个拦截器对象,然后拿到请求和Response对象,对Request和Response进行修改。

  • 事实上OkHttp就是通过定义许多拦截器一步一步地对Request进行拦截处理(从头至尾),直到请求返回网络数据,后面又倒过来,一步一步地对Response进行拦截处理,最后拦截的结果就是回调的最终Response。(从尾至头)

  • 回头再看RealInterceptorChain的proceed方法,通过顺序地传入一个拦截器的集合,创建一个RealInterceptorChain,然后拿到之前OkHttp创建的各种拦截器,并调用其interrupt方法,并返回Response对象。其调用顺序如下:

图片 1

okhttp拦截器.png

再来看看各个拦截器的源码:

  1. 在配置 OkHttpClient 时设置的 interceptors ()

  2. 负责失败重试以及重定向的RetryAndFollowUpInterceptor

  3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转为用户友好的响应的 BridgeInterceptor

  4. 负责读取缓存直接返回、更新缓存的 CacheInterceptor

  5. 负责和服务器建立连接的 ConnectInterceptor

  6. 配置 OkHttpClient 时设置的 networkInterceptors

  7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor


  • RetryAndFollowUpInterceptor:进行连接失败重新连接,以及重定向
 @Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
...

  followUpCount = 0;
  Response priorResponse = null;
 while (true) {
    if (canceled) {
      streamAllocation.release();
 throw new IOException("Canceled");
  }

    Response response;
 boolean releaseConnection = true;
 try {
      response = realChain.proceed(request, streamAllocation, null, null);
 ...

   Request followUp = followUpRequest(response);   
if (followUp == null) {
      if (!forWebSocket) {
        streamAllocation.release();
  }
      return response;
  }

...

 if (++followUpCount > MAX_FOLLOW_UPS) {
  streamAllocation.release();
 throw new ProtocolException("Too many follow-up requests: " + followUpCount); }
...

   request = followUp;
   priorResponse = response;
  }
}

整段代码就是在一个死循环

  • 可以看出重连接的次数最多为20次

  • 重定向功能的逻辑在followUpRequest方法中,这个方法会根据响应头中的location字段获取重定向的url,并通过requestBuilder重新new一个Request对象,并改变request的response的值,然后重新进行拦截。

  • BridgeInterceptor:对请求头和响应头进行修改
  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }
  • CacheInterceptor:读取缓存和更新缓存的操作

拦截request并读取缓存,该操作在proceed方法之前执行,也就是在请求的时候进行缓存判断。

 @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

判断是否应该更新缓存

 // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
  • ConnectInterceptor:与服务器进行连接
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

实际上建立连接就是创建了一个 HttpCodec 对象,它将在后面的步骤中被使用,那它又是何方神圣呢?它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和 Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。

在 Http1Codec中,它利用 Okio 对 Socket 的读写操作进行封装,它对 java.io和 java.nio 进行了封装,让我们更便捷高效的进行 IO 操作。

而创建 HttpCodec 对象的过程涉及到 StreamAllocation、RealConnection代码较长,这个过程概括来说,就是找到一个可用的 RealConnection,再利用 RealConnection 的输入输出(BufferedSource 和 BufferedSink)创建 HttpCodec 对象,供后续步骤使用。

  • CallServerInterceptor:发送请求和接收数据
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
  1. 向服务器发送 request header;

  2. 如果有 request body,就向服务器发送;

  3. 读取 response header,先构造一个 Response对象;

  4. 如果有 response body,就在 3 的基础上加上 body 构造一个新的 Response对象;

这里我们可以看到,核心工作都由 HttpCodec 对象完成,而 HttpCodec 实际上利用的是 Okio,而 Okio 实际上还是用的 Socket,所以没什么神秘的,只不过一层套一层,层数有点多。

其实 Interceptor的设计也是一种分层的思想,每个 Interceptor 就是一层。为什么要套这么多层呢?分层的思想在 TCP/IP 协议中就体现得淋漓尽致,分层简化了每一层的逻辑,每层只需要关注自己的责任(单一原则思想也在此体现),而各层之间通过约定的接口/协议进行合作(面向接口编程思想),共同完成复杂的任务,这是典型的责任链设计模式

责任链模式是一种对象的行为模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。

  • OkHttp的整个运行流程图

图片 2

okhttp_full_process.png


(4)OkHttp的复用连接池

Http有一种叫做keepalive connections的机制,而okHttp支持5个并发socket连接,默认keepalive时间为5分钟,接下来我们学习okHttp是如何复用连接的。

  • 主要变量与构造方法
    连接池的类位于okHttp.ConnectionPool,它的主要变量如下:
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

主要变量说明一下:

  • executor线程池:类似于CachedThreadPool,需要注意的是这种线程池的工作队列采用了没有容量的SynchronousQueue。

  • Deque 双向队列:双端队列同时具有队列和栈的性质,经常在缓存中被使用,里面维护了RealConnection也就是Socket物理连接的包装。

  • RouteDatabase :它用来记录连接失败的路线名单,当连接失败时就会把失败的路线加进去。

  • ConnectionPool的构造方法如下所示:

 public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通过构造方法可以看出CollectionPool默认空闲的socket最大连接数为5个,socket的keepalive时间为5分钟。CollectionPool实在OkHttpClient实例化的时候创建的

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;

    ...
    this.connectionPool = builder.connectionPool;
    ...

  }
  • 缓存操作

ConnectionPool提供对Deque<RealConnection>进行操作的方法分别为put,get,connectionBecameIdle和evictAll
这几个操作,分别对应放入连接,获取连接,移除连接和移除所有连接操作。这里我们只举例说明put和get操作。

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

再添加到Deque之前首先要清理空闲线程,这个后面会讲到。再来看看get操作:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

遍历connections缓存列表。当某个连接计数小于限制的大小,并且request的地址和缓存列表中此连接的地址完全匹配时,则直接复用缓存列表中的connection作为request的连接。

  • 自动回收连接
    OkHttp时根据StreamAllocation引用计数是否为0来实现自动回收连接的。我们在put操作前首先要调用executor.execute(cleanupRunnable)来清理闲置的线程。我们来查看cleanupRunnable到底做了什么?
private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

线程不断地调用clearup方法进行清理,并返回下次需要清理的间隔时间,然后调用wait方法进行等待以释放锁与时间片。当等待时间到了后,再次进行清理,并返回下次需要清理的间隔时间,如此循环下去。接下来看看clearup方法,如下所示:

  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;


    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();


        if (pruneAndGetAllocationCount(connection, now) > 0) {//注释<1>
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;


        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {//注释<2>

        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {

        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {

        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;//注释<3>
      }
    }

    closeQuietly(longestIdleConnection.socket());
    return 0;
  }

clearup方法所做的事情非常简单总结就是,根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记空闲的连接。

  • 注释<2>:如果空闲连接keepAlive时间超过5分钟,或者空闲连接数超过5个,则从Deque中移除此连接。接下来更具空闲连接或者活跃连接来返回下次需要清理的时间数:
    如果空闲连接大于0,则返回此连接即将到期的时间;
    如果都是活跃连接且大于0,则返回默认的keepAlive时间5分钟;

  • 注释<3>:如果没有任何连接,则跳出循环并返回-1;

  • 注释<1>:通过pruneAndGetAllocationCount方法来判断连接是否闲置。如果pruneAndGetAllocationCount方法的返回值大于0则是活跃连接,否则就是空闲连接。接下来查看pruneAndGetAllocationCount方法,如下所示:

 private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {//注释<1>
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

pruneAndGetAllocationCount方法首先遍历传进来的RealConnection的StreamAllocation;如果StreamAllocation未被使用,则接下来遍历下一个StreamAllocation;如果StreamAllocation未被使用,则从列表中移除。在上面代码注释1处,如果列表为空,则说明此连接没有引用了,这时返回0,表示此连接时空闲连接;否则就返回非0的数,表示此连接时活跃连接。那么StreamAllocation是什么?怎么才能判断StreamAllocation使用与否?接着往下看。

  • 引用计数
    在OkHttp的高层代码调用中,使用了类似于引用计数的方式跟踪socket流的调用。这里的计数对象是StreamAllocation,它被反复执行acquire和release操作,这两个方法其实是在改变RealConnection中 List<Reference<StreamAllocation>>的大小。acquire方法和release方法,如下所示:
 public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

RealConnection是socket物理连接的包装,它里面维护了
List<Reference<StreamAllocation>>的引用。List中StreamAllocation的数量也是socket被引用的计数。如果计数为0,则说明此连接没有被复用,也就是空闲的,需要通过下文的算法实现回收;如果计数不为0,则表示上层代码仍然在引用,就无需关闭连接。

可以看出此连接池复用的核心就是用Deque<RealConnection>来存储连接,通过put,getconnectionBecameIdle和evictAll几个操作来对Deque进行操作,另外通过判断连接中的计数对象StreamAllocation来进行自动回收连接。

创作不易,如果本文对您有用的话,记得点一个赞哦


(1)参考文章:https://blog.piasy.com/2016/07/11/Understand-OkHttp/
(2)《Android进阶之光》

粗绘请求流程

注意:这里我选择OkHttp源码版本是 3.8.0。为了方便大家能够和文章同步,最好保持版本一致,我看过老版本和新的版本还是有点不同的。

官网给出的示例

OkHttpClient client = new OkHttpClient();String run(String url) throws IOException { Request request = new Request.Builder() .url .build(); Response response = client.newCall.execute(); return response.body().string();}

我们就从这里入口,来一步一步的跟进。

  1. 首先是创建一个OkHttpClient,Http请求工厂,也就是只要需要发Http请求,那都得找他。内部当然后很多的成员变量和方法,这里我们先不做介绍,等用到时再解释。
  2. 我们继续看client.newCall。找到源码
 @Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); }

很简单,创建了一个RealCall,这里我就称为一个请求。Request不说大家能理解,里面封装了各种请求的信息。创建过程也很简单,做一些成员变量赋值和初始化。

 RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); // TODO: this is unsafe publication and not threadsafe. this.eventListener = eventListenerFactory.create; }

这里注意retryAndFollowUpInterceptor;变量,后面会用到。

  1. 调用了RealCall的execute()方法并返回Response结果。

1.基本用法

下面是发送一个异步请求:

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
RequestBody requestBody = new FormBody.Builder()
        .add("type", "top")
        .add("key", "52880cdc1ede6a63d0be1f1816f136a7")
        .build();
Request request = new Request.Builder()
        .url("http://v.juhe.cn/toutiao/index")
        .post(requestBody)
        .build();
okHttpClient.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.e(TAG, "onFailure: " + e.getMessage());
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        if (response.isSuccessful()) {
            Log.e(TAG, "onResponse: " + response.body().string());
        } else {
            throw new IOException("Unexpected code " + response);
        }
    }
});

RealCall方法execute

前面我们知道了大致的请求流程,下面我们重点看

 @Override public Response execute() throws IOException { synchronized  { if  throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed; Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished; } }
  1. 首先我们发现try的前后调用了Dispatcher的方法:
client.dispatcher().executed;client.dispatcher().finished;

分别是将Call加入到Dispatcher中的同步队列中,结束后,移除队列。

  1. 调用getResponseWithInterceptorChain获取Response。

接下来我们就重点看getResponseWithInterceptorChain方法

2.okhttp的特性

1.支持HTTP2/SPDY黑科技
2.socket自动选择最好路线,并支持自动重连
3.拥有自动维护的socket连接池,减少握手次数
4.拥有队列线程池,轻松写并发
5.拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)
6.基于Headers的缓存策略

getResponseWithInterceptorChain调用链

图片 3okhttp.png

 Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors; interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar; interceptors.add(new CacheInterceptor(client.internalCache; interceptors.add(new ConnectInterceptor; if (!forWebSocket) { interceptors.addAll(client.networkInterceptors; } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

这里的代码就很关键了,设计的很巧妙。可能有点绕,这里我讲一下几个关键的类。

3.源码分析

1.建造者模式创建OkhttpClient

public OkHttpClient build() {
      return new OkHttpClient(this);
}

OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    this.proxy = builder.proxy;
    this.protocols = builder.protocols;
    this.connectionSpecs = builder.connectionSpecs;
    this.interceptors = Util.immutableList(builder.interceptors);
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
    this.proxySelector = builder.proxySelector;
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;
    this.internalCache = builder.internalCache;
    this.socketFactory = builder.socketFactory;

    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = systemDefaultTrustManager();
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
    this.pingInterval = builder.pingInterval;
}

2.创建Request

public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
 }

Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
}

3.执行异步请求时通过newCall()将Request转化为call

@Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
}

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

4.加入请求队列,Call的enqueue(Callback responseCallback)是抽象方法,唯一实现类是RealCall,看下enqueue方法

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
 }

5.又调用了Dispatcher的enqueue方法

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

如果当前还能执行一个并发请求,就加入到runningAsyncCalls队列并且立即执行,否则加入 readyAsyncCalls 队列,而正在执行的请求执行完毕之后,会调用 promoteCalls() 函数,来把 readyAsyncCalls 队列中的 AsyncCall “提升”为 runningAsyncCalls,并开始执行。

再看下executorService()方法,创建了一个线程池

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

这里的AsyncCall是RealCall的内部类,它继承了NamedRunnable,而NamedRunnable实现了Runnable接口,所以最后会执行到AsyncCall的execute方法

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

在run方法里调用了execute这个抽象方法

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
 }

6.调用getResponseWithInterceptorChain方法获取Response

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    // 配置 OkHttpClient 时设置的 interceptors
    interceptors.addAll(client.interceptors());
    // 负责失败重试以及重定向的 RetryAndFollowUpInterceptor
    interceptors.add(retryAndFollowUpInterceptor);
    // 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为        
    // 用户友好的响应的 BridgeInterceptor
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // 负责读取缓存直接返回、更新缓存的 CacheInterceptor
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // 负责和服务器建立连接的 ConnectInterceptor
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      // 配置 OkHttpClient 时设置的 networkInterceptors
      interceptors.addAll(client.networkInterceptors());
    }
    // 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    // 开启链式调用
    return chain.proceed(originalRequest);
  }

7.看下RealInterceptorChain的proceed方法

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      Connection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    // 如果我们已经有一个stream,确定即将到来的request会使用它
    if (this.httpCodec != null && !sameConnection(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    // 如果我们已经有一个stream, 确定chain.proceed()唯一的call
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    // 调用链中下一个拦截器
    // 实例化下一个拦截器对应的RealIterceptorChain对象,这个对象会在传递给当前/  
    // 的拦截器
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    // 得到当前的拦截器
    Interceptor interceptor = interceptors.get(index);
    // 调用当前拦截器的intercept()方法,并将下一个拦截器的RealIterceptorChain对  
    // 象传递下去
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
}

8.除了我们在OkhttpClient中设置的interceptor,比如LoggingInterceptor,第一个调用的就是RetryAndFollowUpInterceptor,主要负责失败重试及重定向

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
        // 这里又调用了RealInterceptorChain的proceed方法,调用了下一个拦截器
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(followUp.url()), callStackTrace);
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
}

9.接着看BridgeInterceptor的intercept方法,主要负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应

@Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    // 处理请求头
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    // 透明Gzip压缩
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    // 调用下一个Interceptor
    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
}

10.CacheInterceptor的intercept方法

@Override public Response intercept(Chain chain) throws IOException {
    // 通过request获取缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    // 根据request得到缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 如果没有网络也没有缓存,请求失败
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    // 如果没有网络直接返回缓存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      // 调用下一个拦截器
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        // 更新缓存
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    // 缓存新的response
    if (HttpHeaders.hasBody(response)) {
      CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
      response = cacheWritingResponse(cacheRequest, response);
    }

    return response;
}

11.ConnectInterceptor的intercept方法,主要负责和服务器建立连接

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

实际上建立连接就是创建了一个HttpCodec对象,它是对 HTTP协议操作的抽象,有两个实现:Http1Codec和Http2Codec,分别对应 HTTP/1.1 和 HTTP/2版本的实现。
在Http1Codec中,它利用 Okio 对Socket的读写操作进行封装。创建HttpCodec对象的过程涉及到StreamAllocation、RealConnection,这个过程就是找到一个可用的RealConnection,再利用RealConnection的输入输出(BufferedSource和BufferedSink)创建HttpCodec对象,供后续步骤使用。

12.NetworkInterceptors,就是配置OkHttpClient时设置的 NetworkInterceptors

13.最后看CallServerInterceptor的intercept方法,主要负责发送和接收数据

@Override public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    // 检查请求方法,通过HttpCodec处理request
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      // Write the request body, unless an "Expect: 100-continue" expectation failed.
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
    // 获取响应
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}
Chain
  1. 获取Request
  2. 执行proceed

总结

Interceptor 是 OkHttp 最核心的一个东西,它把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求,这是典型的责任链模式。

RealInterceptorChain

Chain的实现

  1. 包含了完成请求需要的类,包括StreamAllocation、HttpCodec、RealConnection、Request等。这里必要重要的就是可以实现了Chain的request()来获取Request。
  2. 控制Interceptor的调用,调用Interceptor的拦截方法intercept后,就封装下一个RealInterceptorChain并指定index。声明下一个将要被调用的Interceptor。这部分逻辑主要在proceed方法中。我们看核心代码
 // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get; Response response = interceptor.intercept;

首先会获取当前index的Interceptor。然后执行对应的intercept

方法。同时出入的参数是新创建的RealInterceptorChain。而新创建的RealInterceptorChain对应的index+1。如果执行新创建的RealInterceptorChain的proceed方法,那么interceptors的第index+1个Interceptor的intercept会被执行。依次循环下去。

总结: RealInterceptorChain就是对请求中个中重要对象的封装,执行Interceptor的intercept

的调用,确定下一个RealInterceptorChain。保证所有的Interceptor依次执行intercept

参考

<a href=" OkHttp</a>
<a href=";

Interceptor

前面讲到了RealInterceptorChain会执行Interceptor的intercept方法,同时传入下一个RealInterceptorChain。那么intercept方法究竟做了什么事呢,因为Interceptor的实现很多,这里我们挑一个系统的实现类看看,比如:BridgeInterceptor,这个代码虽然长,但逻辑想对简单

 @Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString; } long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header == null) { requestBuilder.header("Host", hostHeader(userRequest.url); } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url; if (!cookies.isEmpty { requestBuilder.header("Cookie", cookieHeader; } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent; } Response networkResponse = chain.proceed(requestBuilder.build; HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers; Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source; Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody))); } return responseBuilder.build(); }

里面逻辑我们现在可能还看不懂,我们看中间最核心的一句话,。Response networkResponse = chain.proceed(requestBuilder.build;有没有觉得顿时豁然开朗。realChain是传入的参数,而执行proceed方法,就又回到了前面我们讲RealInterceptorChain的流程。那前后RealInterceptorChain有什么区别呢?那就是index在不断的增加,同时对应的Interceptor也就不同。

那么Interceptor有什么用呢?

我们刚才只关注了中间的chain.proceed(requestBuilder.build;。而在此前后我们可以做很多的逻辑操作了,比如:

对Request进行一些请求头的判断,处理和完善。对Response进行一些处理,如在有gzip的情况下数据的处理等。

总结:Interceptor这里我称之为拦截器。Okhttp将请求的流程,从封装请求头,获取连接,发请求数据,读请求数据等等。拆分成一个个Interceptor。每一个Interceptor有着自己单一的功能,而下层的Interceptor为上层的Interceptor服务,有没有觉得有点像我们的网络TCP/IP的模型,哈哈。其实这种思想让我们的请求变的更加清晰,并且扩展性很好。每一层也就是Interceptor可以有自己的实现。同时我们可以定义自己的Interceptor。 而Interceptor的顺序执行就由RealInterceptorChain完成。

到这里我们就讲了整个请求的大体执行框架和模式。这部分一定要好好的理解,方便后面的学习。

RetryAndFollowUpInterceptor

这个拦截器用来做重连接和重定向的。其中逻辑有以下:

创建StreamAllocation
 @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url, callStackTrace); int followUpCount = 0; Response priorResponse = null; while  { if  { streamAllocation.release(); throw new IOException("Canceled"); } Response response = null; boolean releaseConnection = true; try { response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); ...省略剩余代码

看到了吧new StreamAllocation了吧。这里第一个疑惑解决,StreamAllocation的创建地方。这里还要多讲一个地方就是构造参数ConnectionPool。我们看到是从OkHttpClient传了的。而在OkHttpclient创建时候创建了ConnectionPool。

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { ...省略 public static final class Builder { public Builder() { ...省略connectionPool = new ConnectionPool(); ...省略

后面用到ConnectionPool大家就别再疑惑了。

创建StreamAllocation在这里,那当然释放也是在这类里:

失败重连接
 while  { if  { streamAllocation.release(); throw new IOException("Canceled"); } Response response = null; boolean releaseConnection = true; try { response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), false, request)) { throw e.getLastConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, requestSendStarted, request)) throw e; releaseConnection = false; continue; } finally { // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { streamAllocation.streamFailed; streamAllocation.release(); } }

我们看到response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);是在一个无限循环中,如果出现异常,并且满足重连接,就会再次调用。

本文由乐虎游戏发布于计算机资讯,转载请注明出处:OkHttp源码深入解读

关键词:

RXJava的使用<二>

vankeservice 前言 RxJava和Retrofit也火了一段时间了,不过最近一直在学习ReactNative和Node相关的姿势,一直没有时间研究这...

详细>>

Retrofit--使用Retrofit时怎样去设置OKHttp

builder.cookieJar(new CookieJar() { private final HashMapHttpUrl, ListCookie cookieStore = new HashMap(); @Override public void saveFromResponse(HttpUr...

详细>>

先练哪个字帖才能快速入门?

文/晨风暮溪 不晓得我们在练字的经过中有未有蒙受那么些主题材料 问:先练哪个字帖手艺赶快入门? 图表源于互联...

详细>>

Mac & iOS 如何优雅的使用exchange邮箱

虽说小编笔者不是混工具圈的,不过人在江湖混,哪能不说大话用个 Gmail 呢。 本人如今职场小白。二零一八年在Un...

详细>>