OkHttp源码详解之二完结篇

1. 请大家思考几个问题

在开始本文之前,请大家思考如下几个问题。并请大家带着这几个问题,去本文寻找答案。如果你对下面几个问题的答案了如指掌那本文可以略过不看

  1. 在浏览器中输入一个网址,按回车后发生了什么?
  2. Okhttp的TCP连接建立发生在什么时候?
  3. Okhttp的Request请求发生在什么时候?
  4. Okio与Okhttp是在什么时候发生关联的?
  5. Okhttp的Interceptor和Chain是怎么串联起来的?
  6. Okhttp同步请求和异步请求分别是怎么实现的。异步请求有限制吗?

本文将围绕以上6个问题对Okhttp做一个简单的讲解,如有遗漏的知识点,欢迎在评论区指出,一起探讨成长。

2. Http请求的流程

首先来回答第一个问题“在浏览器中输入一个网址,按回车后发生了什么?”。Http权威指南一书给出的答案是发生了7件事情

  1. 浏览器从url中解析出主机名
  2. 浏览器将服务器的主机名转换成服务器的IP地址
  3. 浏览器将端口号从url中解析出来
  4. 浏览器建立一条与Web服务器的TCP连接
  5. 浏览器向服务器发送一条Http请求报文
  6. 服务器向浏览器回送一条Http响应报文
  7. 关闭连接,浏览器显示文档

以上七步是每一个Http请求必须要做的事情,Okhttp库要实现Http请求也不例外。这七个步骤的每一步,在Okhttp中都有体现。

  1. HttpUrl类负责步骤1和步骤3的主机名和端口解析
  2. Dns接口的具体实现负责步骤2的实现
  3. RealConnection类就是步骤4中的那条TCP连接
  4. CallServerInterceptor拦截器的intercept方法负责步骤5和步骤6的发送报文和接收报文
  5. ConnectPool连接池中提供了关闭TCP连接的方法
  6. Okio在哪操作?当然是往Socket的OutputStream写请求报文和从Socket的InputStram读取响应报文了

至此文章开头提出的6个问题,前5个都已经有了一个简单的回答。至于第六个问题,异步在Okhttp中用的缓存线程池,理论上缓存线程池,是当有任务到来,就会从线程池中取一个空闲的线程或者新建线程来处理任务,而且缓存线程池的线程数是Integer.MAX_VALUE。理论上是没有限制的。但是Dispatcher类在线程池的基础上,做了强制限制,最多可以同时处理的网络请求数64个,对于同一个主机名,最多可以同时处理5个网络请求。接下来我带大家从源码的角度来寻找这几个问题的答案

3.初识Okhttp

首先我们来写两个简单的例子来使用Okhttp完成get和post请求。

同步get请求

OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();
}

同步post请求

public static final MediaType JSON
    = MediaType.parse("application/json; charset=utf-8");

OkHttpClient client = new OkHttpClient();

String post(String url, String json) throws IOException {
  RequestBody body = RequestBody.create(JSON, json);
  Request request = new Request.Builder()
      .url(url)
      .post(body)
      .build();
  Response response = client.newCall(request).execute();
  return response.body().string();
}

异步get请求

OkHttpClient client = new OkHttpClient();

void run(String url) throws IOException {
    Request request = new Request.Builder()
            .url(url)
            .build();

    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {

        }

        @Override
        public void onResponse(Call call, Response response) throws IOException {
            System.out.println(response.body().string());
        }
    });
}

异步post请求

public static final MediaType JSON
            = MediaType.parse("application/json; charset=utf-8");

    OkHttpClient client = new OkHttpClient();

    void post(String url, String json) throws IOException {
        RequestBody body = RequestBody.create(JSON, json);
        Request request = new Request.Builder()
                .url(url)
                .post(body)
                .build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });
    }

Okhttp完成一次网络请求,过程如下

  1. 创建一个OkHttpClient对象client
  2. 创建一个Request对象request,并设置url
  3. 调用client.newCall(request)生成一个Call对象call
  4. 调用call.execute()[同步调用]或者call.enqueue(callback)[异步调用]完成网络请求

4. 同步和异步网络请求过程

同步调用过程如下

OkHttpClient.newCall(Request r)  => RealCall.execute() => RealCall.getResponseWithInterceptorChain()

异步调用过程如下

OkHttpClient.newCall(Request r) => RealCall.enqueue(Callback) => Dispatcher.enqueue(AsyncCall)
    => 线程池执行AsyncCall =>AsyncCall.execute() => RealCall.getResponseWithInterceptorChain()

从上图可以看出异步调用比同步调用步骤更长,也更复杂。在这里我把整个调用过程分成两个阶段,RealCall.getResponseWithInterceptorChain()称作“网络请求执行阶段”,其余部分称作“网络请求准备阶段”。由于“网络请求执行阶段”涉及到链式(Chain)调用以及各种拦截器的执行比“网络请求准备阶段”复杂太多。所以我们先来看“网络请求准备阶段”,这个阶段也需要分成同步和异步两种方式来讲解

首先我们来看下准备阶段的公共调用部分OkHttpClient.newCall(Request r)

 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

Call是什么,在我看来Call可以理解成是对网络请求封装,它可以执行网络请求,也可以取消网络请求。我们看下Call的类定义

public interface Call extends Cloneable {

  Request request();

  Response execute() throws IOException;


  void enqueue(Callback responseCallback);


  void cancel();


  boolean isExecuted();

  boolean isCanceled();


  Call clone();

  interface Factory {
    Call newCall(Request request);
  }
}
  1. request()返回的Request对象
  2. execute()同步执行网络操作
  3. enqueue(Callback responseCallback)异步执行网络操作
  4. cancel()取消网络操作

RealCall是Call的实现类。我们重点来看下execute()和enqueue(Callback responseCallback)

execute()方法

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      //这里只是把call存到Dispatcher的列表中
      client.dispatcher().executed(this);
      //真正执行网络操作
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

enqueue(Callback callback)

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

我们注意到AsyncCall类,首先它是RealCall类的非静态内部类,它持有RealCall的对象,它可以做任何RealCall能做的事情。同时它继承自NamedRunnable。NamedRunnable是Ruannable的子类。它主要有两个功能,1.可以被线程池执行 2.修改当前执行线程的线程名(方便debug)。由于这几个类都比较简单,而且篇幅有限,这里就不贴代码了。请自行查阅

当然到这里异步执行的准备阶段并没有结束,它是怎么被子线程执行的呢。这里我们就需要在Dispatcher类中寻找答案了。Dispatcher是干嘛用的,它的功能就是负责分发用户创建的网络请求,以及控制网络请求的个数,以及上一个网络请求结束后,要不要执行等待中的网络请求。下面我们来看下Dispatcher都有哪些成员变量,以及这些成员变量的作用

public final class Dispatcher {
  //最多可以同时请求数量
  private int maxRequests = 64;
  //每个host最大同时请求数量
  private int maxRequestsPerHost = 5;
  //当没有任务执行的回调 比如说执行10个任务,10个任务都执行完了会回调该接口
  private Runnable idleCallback;

  /** Executes calls. Created lazily. */
  //线程池 用的是缓存线程池(提高吞吐量,并且能及时回收无用的线程)
  private ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  //等待的异步任务队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在执行的异步任务队列 这个放的是Runnable
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //正在执行的同步任务队列 这个存储的是RealCall
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

 }

总结下几个比较重要的概念

  1. executorService 线程池并且是缓存线程池,最最大程度提高吞吐量,并且能回收空闲的线程
  2. Deque readyAsyncCalls异步请求等待队列,当异步请求数超过64个,或者单个主机名请求超过5个,网络任务(AsyncCall)会放到该队列里。当任务执行完毕,会调用promoteCalls()把满足条件的网络任务(AsyncCall)放到线程池中执行
  3. Deque runningAsyncCalls 正在执行的异步任务队列
  4. Deque runningSyncCalls 正在执行的同步任务队列

紧接着我们来看下Dispatcher的enqueue(AsyncCall call)

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
     //如果请求没有超过限制 用线程池执行网络请求
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      //如果超过了限制,把请求放到异步等待队列中去,什么时候被唤醒?finished后
      readyAsyncCalls.add(call);
    }
  }

通过线程池执行AsyncCall 最终调用的是AsyncCall的run(),而run()中调用的是AsyncCall的execute()

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

在execute()中调用了getResponseWithInterceptorChain(),也就是我们前面说的“网络请求执行阶段”。至此,同步和异步网络请求的“网络请求准备阶段”就讲解完了,接下来我们讲解“网络请求执行阶段”

5. Interceptor和Chain

Response getResponseWithInterceptorChain() throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    //用户自定义的拦截器
    interceptors.addAll(client.interceptors());
    //重试和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    //用户自定义的网络拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正请求服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //用Chain把List中的interceptors串起来执行
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

要理解上面的代码。我们需要搞清楚两个概念 Interceptor和Chain。我们先来看下它们的定义

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    Connection connection();
  }
}

可以举一个比较形象的例子来解释它们的关系。就以我们IT行业来讲。老板说想要一个APP,然后把需求转化成任务流转给产品经理,产品经理构思了下,然后把任务流转给设计师,设计师一通设计之后,把设计图流转给码农,码农接到任务后就开始加班加点的编码,最终写出了APP,然后把APP交给设计师,让设计师查看界面是否美观,设计师再把APP流转给产品经理,产品经理觉得很满意,最终把APP交付给老板。老板看了很满意,说大家伙晚上加个鸡腿。虽然实际生产中并不是这样的一个流程,想了很久觉得还是这样讲,更好理解。对应到这个例子中,产品经理、设计师、程序员他们都是真正干活的家伙,他们对应的是Interceptor。Chain是什么,是老板吗?不是!!Chain只是一套规则,对应的就是例子里的流转流程。

image

image

对照图片,“网络请求执行阶段”会依次执行Interceptors里的Interceptor。Interceptor执行分成三步。第一步:处理请求 第二步:执行下个Interceptor 第三步:处理响应

 List<Interceptor> interceptors = new ArrayList<>();
    //用户自定义的拦截器
    interceptors.addAll(client.interceptors());
    //重试和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    //用户自定义的网络拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正请求服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

首先执行的就是retryAndFollowUpInterceptor

RetryAndFollowUpInterceptor

//第一部分 根据url解析出主机名 解析端口
Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

....省略

while(true){
    ....
    //第二部分处理下一个Interceptor
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    releaseConnection = false;
    ....
    //第三部分 处理响应,如果是重定向,用while循环,重新处理下一个Interceptor
     if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }



}

BridgeInterceptor和CacheInterceptor请读者自行分析

ConnectInterceptor中会解析主机DNS并且建立TCP连接,Socket的输入输出流通过Source和Sink处理

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();


    boolean doExtensiveHealthChecks = !request.method().equals("GET");

    //这里会解析DNS并且建立TCP连接,Socket的输入输出流会和Okio的Source和Sink关联
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

CallServerInterceptor 通过httpCodec的sink向socket发送请求,并且通过httpCodec的openResponseBody把socket的输入流写入到Response中

@Override public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      // Write the request body, unless an "Expect: 100-continue" expectation failed.
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))//真正把body写进去
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

6. 结束语

OkHttp库还是挺庞大的,涉及到很多Http的基础知识。这里只是讲解了OkHttp的一小部分。很多细节的东西也没有深入讲解。比如说Socket的建立,连接池的管理,HttpCodec如何解析输入输出流,路由的细节。希望有机会可以再深入的讲解一番

原文链接:加载失败,请重新获取