okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))

Okhttp系列文章:
你想要的系列:网络请求框架OkHttp3全解系列 - (一)OkHttp的基本使用
你想要的系列:网络请求框架OkHttp3全解系列 - (二)OkHttp的工作流程分析
你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点)
你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点)
在本系列的上一篇文章你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点)中,我们分析了OkHttp拦截器链中的前三个拦截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor,它们在请求建立连接之前做了一些预处理。
如果请求经过这三个拦截器后,要继续往下传递,说明是需要进行网络请求的(缓存不能直接满足),也就是今天要分析的内容——剩下的两个拦截器:ConnectInterceptor、CallServerInterceptor,分别负责 连接建立、请求服务读写。
背景 - HTTP协议发展 在讲解拦截器之前,我们有必要了解http协议相关背景知识,因为okhttp的网络连接正是基于此实现的。HTTP协议经历了以下三个版本阶段。
HTTP1.0 【okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))】在HTTP1.0中,一次请求 会建立一个TCP连接,请求完成后主动断开连接。这种方法的好处是简单,各个请求互不干扰。
但每次请求都会经历 3次握手、2次或4次挥手的连接建立和断开过程——极大影响网络效率和系统开销。
okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))
文章图片

HTTP1.1 在HTTP1.1中,解决了HTTP1.0中连接不能复用的问题,支持持久连接——使用keep-alive机制:一次HTTP请求结束后不会立即断开TCP连接,如果此时有新的HTTP请求,且其请求的Host同上次请求相同,那么会直接复用TCP连接。这样就减少了建立和关闭连接的消耗和延迟。keep-alive机制在HTTP1.1中是默认打开的——即在请求头添加:connection:keep-alive。(keep-alive不会永久保持连接,它有一个保持时间,可在不同的服务器软件(如Apache)中设定这个时间)
okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))
文章图片

HTTP2.0 HTTP1.1中,连接的复用是串行的:一个请求建立了TCP连接,请求完成后,下一个相同host的请求继续使用这个连接。 但客户端想 同时 发起多个并行请求,那么必须建立多个TCP连接。将会产生网络延迟、增大网路开销。
并且HTTP1.1不会压缩请求和响应报头,导致了不必要的网络流量;HTTP1.1不支持资源优先级导致底层TCP连接利用率低下。在HTTP2.0中,这些问题都会得到解决,HTTP2.0主要有以下特性:
  • 新的二进制格式(Binary Format):http/1.x使用的是明文协议,其协议格式由三部分组成:request line,header,body,其协议解析是基于文本,但是这种方式存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合;基于这种考虑,http/2.0的协议解析决定采用二进制格式,实现方便且健壮
  • 多路复用(MultiPlexing):即连接共享,使用streamId用来区分请求,一个request对应一个stream并分配一个id,这样一个TCP连接上可以有多个stream,每个stream的frame可以随机的混杂在一起,接收方可以根据stream id将frame再归属到各自不同的request里面
  • 优先级和依赖(Priority、Dependency):每个stream都可以设置优先级和依赖,优先级高的stream会被server优先处理和返回给客户端,stream还可以依赖其它的sub streams;优先级和依赖都是可以动态调整的,比如在APP上浏览商品列表,用户快速滑动到底部,但是前面的请求已经发出,如果不把后面的优先级设高,那么当前浏览的图片将会在最后展示出来,显然不利于用户体验
  • header压缩:http2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小
  • 重置连接:很多APP里都有停止下载图片的需求,对于http1.x来说,是直接断开连接,导致下次再发请求必须重新建立连接;http2.0引入RST_STREAM类型的frame,可以在不断开连接的前提下取消某个request的stream
其中涉及了两个新的概念:
  • 数据流-stream:基于TCP连接之上的逻辑双向字节流,用于承载双向消息,对应一个请求及其响应。客户端每发起一个请求就建立一个数据流,后续该请求及其响应的所有数据都通过该数据流传输。每个数据流都有一个唯一的标识符和可选的优先级信息。
  • 帧-frame:HTTP/2的最小数据切片单位,承载着特定类型的数据,例如 HTTP 标头、消息负载,等等。 来自不同数据流的帧可以交错发送,然后再根据每个帧头的数据流标识符重新组装,从而在宏观上实现了多个请求或响应并行传输的效果。
    okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))
    文章图片
这里的 多路复用机制 就实现了 在同一个TCP连接上 多个请求 并行执行。
无论是HTTP1.1的Keep-Alive机制还是HTTP2.0的多路复用机制,在实现上都需要引入连接池来维护网络连接。下面就开始分析 OkHttp中的连接池实现——连接拦截器ConnectInterceptor。
ConnectInterceptor 连接拦截器ConnectInterceptor代码如下:
//打开到目标服务的连接、处理下一个拦截器 public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; }@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); } }

代码很少,主要是使用transmitter.newExchange获取Exchange实例,并作为参数调用拦截器链的proceed的方法。注意到前面分析过的拦截器调用的proceed方法是一个参数的,而这里是三个参数的。这是因为下一个拦截器(如果没有配置网络拦截器的话,就是CallServerInterceptor,也是最后一个)需要进行真正的网络IO操作,而 Exchange(意为交换)主要作用就是真正的IO操作:写入请求、读取响应(会在下一个拦截器做介绍)。
实际上获取Exchange实例的逻辑处理都封装在Transmitter中了。前面的文章提到过Transmitter,它是“发射器”,是把 请求 从应用端 发射到 网络层,它持有请求的 连接、请求、响应 和 流,一个请求对应一个Transmitter实例,一个数据流。下面就看下它的newExchange方法:
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { if (noMoreExchanges) { throw new IllegalStateException("released"); } if (exchange != null) { throw new IllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } }ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec); synchronized (connectionPool) { this.exchange = result; this.exchangeRequestDone = false; this.exchangeResponseDone = false; return result; } }

若是第一次请求,前面两个if是没走进去的。接着看到使用exchangeFinder的find方法获取到了ExchangeCodec实例,然后作为参数构建了Exchange实例,并返回。嗯,看起来也很简单的样子。 注意到这个方法里涉及了连接池RealConnectionPool、交换寻找器ExchangeFinder、交换编解码ExchangeCodec、交换管理Exchange这几个类(翻译成这样尽力了,意会吧)。
  • RealConnectionPool,连接池,负责管理请求的连接,包括新建、复用、关闭,理解上类似线程池。
  • ExchangeCodec,接口类,负责真正的IO操作—写请求、读响应,实现类有Http1ExchangeCodec、Http2ExchangeCodec,分别对应HTTP1.1协议、HTTP2.0协议。
  • Exchange,管理IO操作,可以理解为 数据流,是ExchangeCodec的包装,增加了事件回调;一个请求对应一个Exchange实例。传给下个拦截器CallServerInterceptor使用。
  • ExchangeFinder,(从连接池中)寻找可用TCP连接,然后通过连接得到ExchangeCodec。
ExchangeFinder ExchangeFinder的作用从名字就可以看出——Exchange寻找者,本质是为请求寻找一个TCP连接。如果已有可用连接就直接使用,没有则打开一个新的连接。 一个网络请求的执行,需要先有一个指向目标服务的TCP连接,然后再进行写请求、读响应的IO操作。ExchangeFinder是怎么寻找的呢?继续往下看~
我们先看exchangeFinder初始化的地方:
public void prepareToConnect(Request request) { ... this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()), call, eventListener); }

看到这里应该会想起上一篇文章中分析RetryAndFollowUpInterceptor时提到过,prepareToConnect这个方法作用是连接准备,就是创建了ExchangeFinder实例。主要到传入的参数有connectionPool、createAddress方法返回的Address、call、eventListener。connectionPool是连接池,稍后分析,先看下createAddress方法:
private Address createAddress(HttpUrl url) { SSLSocketFactory sslSocketFactory = null; HostnameVerifier hostnameVerifier = null; CertificatePinner certificatePinner = null; if (url.isHttps()) { sslSocketFactory = client.sslSocketFactory(); hostnameVerifier = client.hostnameVerifier(); certificatePinner = client.certificatePinner(); }return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector()); }

使用url和client配置 创建一个Address实例。Address意思是指向服务的连接的地址,可以理解为请求地址及其配置。Address有一个重要作用:相同Address的HTTP请求 共享 相同的连接。这可以作为前面提到的 HTTP1.1和HTTP2.0 复用连接 的请求的判断。
回头看exchangeFinder的find方法
public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); int readTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { //找到一个健康的连接 RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); //利用连接实例化ExchangeCodec对象,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec return resultConnection.newCodec(client, chain); } catch (RouteException e) { trackFailure(); throw e; } catch (IOException e) { trackFailure(); throw new RouteException(e); } }

主要就是通过findHealthyConnection方法获取连接RealConnection实例,然后用RealConnection的newCodec方法获取了ExchangeCodec实例,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec,然后返回。
findHealthyConnection方法名透露着 就是去寻找可用TCP连接的,而我们猜测这个方法内部肯定和连接池ConnectionPool有紧密的关系。接着跟进findHealthyConnection方法:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { //找连接 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // 是新连接 且不是HTTP2.0 就不用体检 synchronized (connectionPool) { if (candidate.successCount == 0 && !candidate.isMultiplexed()) { return candidate; } } // 体检不健康,继续找 if (!candidate.isHealthy(doExtensiveHealthChecks)) { //标记不可用 candidate.noNewExchanges(); continue; }return candidate; } }

循环寻找连接:如果是不健康的连接,标记不可用(标记后会移除,后面讲连接池会讲到),然后继续找。健康是指连接可以承载新的数据流,socket是连接状态。我们跟进findConnection方法,看看到底是怎么 找连接 的:
//为承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接 private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { //请求已被取消(Call的cancel方法->transmitter的cancel方法),抛异常 if (transmitter.isCanceled()) throw new IOException("Canceled"); hasStreamFailure = false; // 尝试使用 已给数据流分配的连接.(例如重定向请求时,可以复用上次请求的连接) releasedConnection = transmitter.connection; //有已分配的连接,但已经被限制承载新的数据流,就尝试释放掉(如果连接上已没有数据流),并返回待关闭的socket。 toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null; if (transmitter.connection != null) { // 不为空,说明上面没有释放掉,那么此连接可用 result = transmitter.connection; releasedConnection = null; }if (result == null) { // 没有已分配的可用连接,就尝试从连接池获取。(连接池稍后详细讲解) if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else if (nextRouteToTry != null) { selectedRoute = nextRouteToTry; //有可尝试的路由 nextRouteToTry = null; } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose); //(如果有)关闭待关闭的socketif (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); //(如果有)回调连接释放事件 } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); //(如果有)回调(从连接池)获取连接事件 } if (result != null) { // 如果有已分配可用连接 或 从连接池获取到连接,结束!没有 就走下面的新建连接过程。 return result; }// 如果需要路由信息,就获取。是阻塞操作 boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }List routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); if (newRouteSelection) { //现在有了IP地址,再次尝试从连接池获取。可能会因为连接合并而匹配。(这里传入了routes,上面的传的null) routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } //第二次连接池也没找到,就新建连接 if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } }// 如果第二次从连接池的尝试成功了,结束,因为连接池中的连接是已经和服务器建立连接的 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }// 第二次没成功,就把新建的连接,进行TCP + TLS 握手,与服务端建立连接. 是阻塞操作 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route()); //从失败名单中移除Socket socket = null; synchronized (connectionPool) { connectingConnection = null; // 最后一次尝试从连接池获取,注意最后一个参数为true,即要求 多路复用(http2.0) //意思是,如果本次是http2.0,那么为了保证 多路复用性,(因为上面的握手操作不是线程安全)会再次确认连接池中此时是否已有同样连接 if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // 如果获取到,就关闭我们创建里的连接,返回获取的连接 result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; // 那么这个刚刚连接成功的路由 就可以 用作下次 尝试的路由 nextRouteToTry = selectedRoute; } else { //最后一次尝试也没有的话,就把刚刚新建的连接存入连接池 connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); //把连接赋给transmitter } } closeQuietly(socket); //如果刚刚建立的连接没用到,就关闭eventListener.connectionAcquired(call, result); return result; }

代码看着很长,已经加了注释,方法目的就是 为 承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接。梳理如下:
  1. 首先会尝试使用 已给数据流分配的连接。(已分配连接的情况例如重定向时的再次请求,说明上次已经有了连接)
  2. 若没有 已分配的可用连接,就尝试从连接池中 匹配获取。因为此时没有路由信息,所以匹配条件:address一致——host、port、代理等一致,且 匹配的连接可以接受新的数据流。
  3. 若从连接池没有获取到,则取下一个代理的路由信息(多个Route,即多个IP地址),再次尝试从连接池获取,此时可能因为连接合并而匹配到。
  4. 若第二次也没有获取到,就创建RealConnection实例,进行TCP + TLS 握手,与服务端建立连接。
  5. 此时为了确保Http2.0连接的多路复用性,会第三次从连接池匹配。因为新建立的连接的握手过程是非线程安全的,所以此时可能连接池新存入了相同的连接。
  6. 第三次若匹配到,就使用已有连接,释放刚刚新建的连接;若未匹配到,则把新连接存入连接池并返回。
流程图如下:
okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))
文章图片

看到这里,小盆友,你是否有很多问号?
  • 开始若有了已分配的连接,但已经被限制承载新的数据流,是如何释放的呢?
  • 代理路由信息是如何获取的呢?
  • 如何从连接池获取连接的?三次有什么不同?
没关系,慢慢来,我们先看第二个问号,代理路由信息的获取。
RouteSelector 先来看下Route类:
public final class Route { final Address address; final Proxy proxy; //代理 final InetSocketAddress inetSocketAddress; //连接目标地址public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) { ... this.address = address; this.proxy = proxy; this.inetSocketAddress = inetSocketAddress; }

Route,通过代理服务器信息 proxy、连接目标地址 InetSocketAddress 来描述一条 连接服务器的具体路由。
  • proxy代理:可以为客户端显式配置代理服务器。否则,将使用ProxySelector代理选择器。可能会返回多个代理。
  • IP地址:无论是直连还是代理,打开socket连接都需要IP地址。 DNS服务可能返回多个IP地址尝试。
上面分析的findConnection方法中是使用routeSelection.getAll()获取Route集合routes,而routeSelection是通过routeSelector.next()获取,routeSelector是在ExchangeFinder的构造方法内创建的,也就是说routeSelector在RetryAndFollowUpInterceptor中就创建了,那么我们看下RouteSelector:
RouteSelector(Address address, RouteDatabase routeDatabase, Call call, EventListener eventListener) { this.address = address; this.routeDatabase = routeDatabase; //连接池中的路由黑名单(连接失败的路由) this.call = call; this.eventListener = eventListener; resetNextProxy(address.url(), address.proxy()); } //收集代理服务器 private void resetNextProxy(HttpUrl url, Proxy proxy) { if (proxy != null) { // 若指定了代理,那么就这一个。(就是初始化OkhttpClient时配置的) proxies = Collections.singletonList(proxy); } else { //没配置就使用ProxySelector获取代理(若初始化OkhttpClient时没有配置ProxySelector,会使用系统默认的) List proxiesOrNull = address.proxySelector().select(url.uri()); proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty() ? Util.immutableList(proxiesOrNull) : Util.immutableList(Proxy.NO_PROXY); } nextProxyIndex = 0; }

注意到RouteSelector的构造方法中传入了routeDatabase,是连接失败的路由黑名单(后面连接池也会讲到),并使用resetNextProxy方法获取代理服务器列表:若没有指定proxy就是用ProxySelector获取proxy列表(若没有配置ProxySelector会使用系统默认)。接着看next方法:
//收集代理的路由信息 public Selection next() throws IOException { if (!hasNext()) {//还有下一个代理 throw new NoSuchElementException(); }List routes = new ArrayList<>(); while (hasNextProxy()) { Proxy proxy = nextProxy(); //遍历proxy经DNS后的所有IP地址,组装成Route for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) { Route route = new Route(address, proxy, inetSocketAddresses.get(i)); if (routeDatabase.shouldPostpone(route)) {//此路由在黑名单中,存起来最后尝试 postponedRoutes.add(route); } else { routes.add(route); } }if (!routes.isEmpty()) { break; } }if (routes.isEmpty()) { // 若没有拿到路由,就尝试上面存的黑名单的路由 routes.addAll(postponedRoutes); postponedRoutes.clear(); } //routes包装成Selection返回 return new Selection(routes); }

next方法主要就是获取下一个代理Proxy的代理信息,即多个路由。具体是在resetNextInetSocketAddress方法中实现,主要是对代理服务地址进行DNS解析获取多个IP地址,这里就不展开了,具体可以参考OkHttp中的代理和路由。
好了,到这里 就解决了第二个问号。其他两个问号涉及 连接池RealConnectionPool、连接RealConnection,下面就来瞅瞅。
ConnectionPool ConnectionPool,即连接池,用于管理http1.1/http2.0连接重用,以减少网络延迟。相同Address的http请求可以共享一个连接,ConnectionPool就是实现了连接的复用。
public final class ConnectionPool { final RealConnectionPool delegate; //最大空闲连接数5,最大空闲时间5分钟 public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); }public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit); } //返回空闲连接数 public int idleConnectionCount() { return delegate.idleConnectionCount(); } //返回池子中的连接数 public int connectionCount() { return delegate.connectionCount(); } //关闭并移除所以空闲连接 public void evictAll() { delegate.evictAll(); } }

ConnectionPool看起来比较好理解,默认配置是最大空闲连接数5,最大空闲时间5分钟(即一个连接空闲时间超过5分钟就移除),我们也可以在初始化okhttpClient时进行不同的配置。需要注意的是ConnectionPool是用于应用层,实际管理者是RealConnectionPool。RealConnectionPool是okhttp内部真实管理连接的地方。
连接池对连接的管理无非是 存、取、删,上面的两个问号分别对应 删、取,跟进RealConnectionPool我们一个个看:

private final Deque connections = new ArrayDeque<>(); private final Runnable cleanupRunnable = () -> { //循环清理 while (true) { //清理 long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (RealConnectionPool.this) { try { //下一次清理之前的等待 RealConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } }; //存 void put(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); }

connections是用于存连接的队列Deque。看到在add之前 使用线程池executor执行了cleanupRunnable,意思是清理连接,为啥要清理呢?上面提到过 连接池有 最大空闲连接数、最大空闲时间的限制,所以不满足时是要进行清理的。并且注意到清理是一个循环,并且下一次清理前要等待waitNanos时间,啥意思呢?我们看下cleanup方法:
long cleanup(long now) { int inUseConnectionCount = 0; //正在使用的连接数 int idleConnectionCount = 0; //空闲连接数 RealConnection longestIdleConnection = null; //空闲时间最长的连接 long longestIdleDurationNs = Long.MIN_VALUE; //最长的空闲时间//遍历连接:找到待清理的连接, 找到下一次要清理的时间(还未到最大空闲时间) synchronized (this) { for (Iterator i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); //若连接正在使用,continue,正在使用连接数+1 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } //空闲连接数+1 idleConnectionCount++; // 赋值最长的空闲时间和对应连接 long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } //若最长的空闲时间大于5分钟 或 空闲数 大于5,就移除并关闭这个连接 if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { // else,就返回 还剩多久到达5分钟,然后wait这个时间再来清理 return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { //连接没有空闲的,就5分钟后再尝试清理. return keepAliveDurationNs; } else { // 没有连接,不清理 cleanupRunning = false; return -1; } } //关闭移除的连接 closeQuietly(longestIdleConnection.socket()); //关闭移除后 立刻 进行下一次的 尝试清理 return 0; }

思路还是很清晰的:
  • 有空闲连接的话,如果最长的空闲时间大于5分钟 或 空闲数 大于5,就移除关闭这个最长空闲连接;如果 空闲数 不大于5 且 最长的空闲时间不大于5分钟,就返回到5分钟的剩余时间,然后等待这个时间再来清理。
  • 没有空闲连接就等5分钟后再尝试清理。
  • 没有连接不清理。
其中判断连接正在使用的方法pruneAndGetAllocationCount我们来看下:
private int pruneAndGetAllocationCount(RealConnection connection, long now) { //连接上的数据流,弱引用列表 List> references = connection.transmitters; for (int i = 0; i < references.size(); ) { Reference reference = references.get(i); if (reference.get() != null) { i++; continue; }// 到这里,transmitter是泄漏的,要移除,且此连接不能再承载新的数据流(泄漏的原因就是下面的message) TransmitterReference transmitterRef = (TransmitterReference) reference; String message = "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace); references.remove(i); connection.noNewExchanges = true; //连接因为泄漏没有数据流了,那么可以立即移除了。所以设置 开始空闲时间 是5分钟前(厉害厉害!) if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return 0; } } //返回连接上的数据流数量,大于0说明正在使用。 return references.size(); }

逻辑注释已经标明了,很好理解。其中connection.transmitters,表示在此连接上的数据流,transmitters size大于1即表示多个请求复用此连接。
另外,在findConnection中,使用connectionPool.put(result)存连接后,又调用transmitter.acquireConnectionNoEvents方法,瞅下:
void acquireConnectionNoEvents(RealConnection connection) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; connection.transmitters.add(new TransmitterReference(this, callStackTrace)); }

先把连接赋给transmitter,表示数据流transmitter依附在这个connection上;然后connection.transmitters add 这个transmitter的弱引用,connection.transmitters表示这个连接承载的所有数据流,即承载的所有请求。
好了,存 讲完了,主要就是把连接存入队列,同时开始循环尝试清理过期连接。

//为transmitter 从连接池 获取 对应address的连接。若果routes不为空,可能会因为 连接合并(复用) 而获取到HTTP/2连接。 boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List routes, boolean requireMultiplexed) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (requireMultiplexed && !connection.isMultiplexed()) continue; if (!connection.isEligible(address, routes)) continue; transmitter.acquireConnectionNoEvents(connection); return true; } return false; }

存的方法名是put,但你发现 取 的方法名却不是get,transmitterAcquirePooledConnection意思是 为transmitter 从连接池 获取连接,实际上transmitter就代表一个数据流,也就是一个http请求。注意到,在遍历中 经过判断后也是transmitter的acquireConnectionNoEvents方法,即把匹配到的connection赋给transmitter。所以方法名还是很生动的。
继续看是如何匹配的:如果requireMultiplexed为false,即不是多路复用(不是http/2),那么就要看Connection的isEligible方法了,isEligible方法返回true,就代表匹配成功:
//用于判断 连接 是否 可以承载指向address的数据流 boolean isEligible(Address address, @Nullable List routes) { //连接不再接受新的数据流,false if (transmitters.size() >= allocationLimit || noNewExchanges) return false; //匹配address中非host的部分 if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false; //匹配address的host,到这里也匹配的话,就return true if (address.url().host().equals(this.route().address().url().host())) { return true; // This connection is a perfect match. }//到这里hostname是没匹配的,但是还是有机会返回true:连接合并 // 1. 连接须是 HTTP/2. if (http2Connection == null) return false; // 2. IP 地址匹配 if (routes == null || !routeMatchesAny(routes)) return false; // 3. 证书匹配 if (address.hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false; if (!supportsUrl(address.url())) return false; // 4. 证书 pinning 匹配. try { address.certificatePinner().check(address.url().host(), handshake().peerCertificates()); } catch (SSLPeerUnverifiedException e) { return false; }return true; // The caller's address can be carried by this connection. }private boolean routeMatchesAny(List candidates) { for (int i = 0, size = candidates.size(); i < size; i++) { Route candidate = candidates.get(i); if (candidate.proxy().type() == Proxy.Type.DIRECT && route.proxy().type() == Proxy.Type.DIRECT && route.socketAddress().equals(candidate.socketAddress())) { return true; } } return false; }

取的过程就是 遍历连接池,进行地址等一系列匹配,到这里第三个问号也解决了。

//移除关闭空闲连接 public void evictAll() { List evictedConnections = new ArrayList<>(); synchronized (this) { for (Iterator i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); if (connection.transmitters.isEmpty()) { connection.noNewExchanges = true; evictedConnections.add(connection); i.remove(); } } }for (RealConnection connection : evictedConnections) { closeQuietly(connection.socket()); } }

遍历连接池,如果连接上的数据流是空,那么就从连接池移除并且关闭。
我们回过头看下Transmitter的releaseConnectionNoEvents方法,也就第一个问号,如果连接不再接受新的数据流,就会调用这个方法:
//从连接上移除transmitter. @Nullable Socket releaseConnectionNoEvents() { assert (Thread.holdsLock(connectionPool)); int index = -1; //遍历 此数据流依附的连接 上的所有数据流,找到index for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) { Reference reference = this.connection.transmitters.get(i); if (reference.get() == this) { index = i; break; } }if (index == -1) throw new IllegalStateException(); //transmitters移除此数据流 RealConnection released = this.connection; released.transmitters.remove(index); this.connection = null; //如果连接上没有有数据流了,就置为空闲(等待清理),并返回待关闭的socket if (released.transmitters.isEmpty()) { released.idleAtNanos = System.nanoTime(); if (connectionPool.connectionBecameIdle(released)) { return released.socket(); } }return null; }

主要就是尝试释放连接,连接上没有数据流就关闭socket等待被清理。
好了,到这里连接池的管理就分析完了。
从连接的查找 到 连接池的管理,就是ConnectInterceptor的内容了。
CallServerInterceptor 哎呀,终于到最后一个拦截器了!
请求服务拦截器,也就是真正地去进行网络IO读写了——写入http请求的header和body数据、读取响应的header和body。
上面ConnectInterceptor主要介绍了如何 寻找连接 以及 连接池如何管理连接。在获取到连接后,调用了RealConnection的newCodec方法ExchangeCodec实例,然后使用ExchangeCodec实例创建了Exchange实例传入CallServerInterceptor了。上面提到过ExchangeCodec负责请求和响应的IO读写,我们先来看看ExchangeCodec创建过程——RealConnection的newCodec方法:
ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException { if (http2Connection != null) { return new Http2ExchangeCodec(client, this, chain, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1ExchangeCodec(client, this, source, sink); } }

http2Connection不为空就创建Http2ExchangeCodec,否则是Http1ExchangeCodec。而http2Connection的创建是连接进行TCP、TLS握手的时候,即在RealConnection的connect方法中,具体就是connect方法中调用的establishProtocol方法:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector, int pingIntervalMillis, Call call, EventListener eventListener) throws IOException { //针对http请求,如果配置的协议包含Protocol.H2_PRIOR_KNOWLEDGE,则开启Http2连接 if (route.address().sslSocketFactory() == null) { if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) { socket = rawSocket; protocol = Protocol.H2_PRIOR_KNOWLEDGE; startHttp2(pingIntervalMillis); return; }socket = rawSocket; protocol = Protocol.HTTP_1_1; return; } //针对https请求,会在TLS握手后,根据平台获取协议(),如果协议是Protocol.HTTP_2,则开启Http2连接 eventListener.secureConnectStart(call); connectTls(connectionSpecSelector); eventListener.secureConnectEnd(call, handshake); if (protocol == Protocol.HTTP_2) { startHttp2(pingIntervalMillis); } }private void startHttp2(int pingIntervalMillis) throws IOException { socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream. http2Connection = new Http2Connection.Builder(true) .socket(socket, route.address().url().host(), source, sink) .listener(this) .pingIntervalMillis(pingIntervalMillis) .build(); http2Connection.start(); }

好了,到这里不再深入了,继续了解可以参考HTTP 2.0与OkHttp。那么到这里,ExchangeCodec已经创建了,然后又包装成Exchange,最后传入了CallServerInterceptor。
下面就来看看这最后一个拦截器:
public final class CallServerInterceptor implements Interceptor { private final boolean forWebSocket; public CallServerInterceptor(boolean forWebSocket) { this.forWebSocket = forWebSocket; }@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); //上个拦截器传入的exchange Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); //写请求头 exchange.writeRequestHeaders(request); boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; //含body的请求 if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // 若请求头包含 "Expect: 100-continue" , 就会等服务端返回含有 "HTTP/1.1 100 Continue"的响应,然后再发送请求body. //如果没有收到这个响应(例如收到的响应是4xx),那就不发送body了。 if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); } //responseBuilder为null说明服务端返回了100,也就是可以继续发送body了 if (responseBuilder == null) { if (request.body().isDuplex()) {//默认是false不会进入 // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // 满足了 "Expect: 100-continue" ,写请求body BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { //没有满足 "Expect: 100-continue" ,请求发送结束 exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { //没有body,请求发送结束 exchange.noRequestBody(); } //请求发送结束 if (request.body() == null || !request.body().isDuplex()) { exchange.finishRequest(); } //回调 读响应头开始事件(如果上面没有) if (!responseHeadersStarted) { exchange.responseHeadersStart(); } //读响应头(如果上面没有) if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); } //构建response Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { //这里服务端又返回了个100,就再尝试获取真正的响应() response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } //回调读响应头结束 exchange.responseHeadersEnd(response); //这里就是获取响应body了 if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } //请求头中Connection是close,表示请求完成后要关闭连接 if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); } //204(无内容)、205(充值内容),body应该是空 if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; } }

你会发现,整个内容就是前面说的一句话:写入http请求的header和body、读取响应的header和body。这里就不再解释了。
这里我们可以看到,无论写请求还是读响应,都是使用Exchange对应的方法。上面也提到过Exchange理解上是对ExchangeCodec的包装,这写方法内部除了事件回调和一些参数获取外,核心工作都由 ExchangeCodec 对象完成,而 ExchangeCodec实际上利用的是 Okio,而 Okio 实际上还是用的 Socket。
ExchangeCodec的实现类 有对应Http1.1的Http1ExchangeCodec 和 对应Http2.0的Http2ExchangeCodec。其中Http2ExchangeCodec是使用Http2.0中 数据帧 的概念完成请求响应的读写。关于Http1ExchangeCodec、Http2ExchangeCodec具体实现原理涉及okio这不再展开。
最后一点,CallServerInterceptor的intercept方法中没有调用连接器链Chain的proceed方法,因为这是最后一个拦截器啦!
好了,到这里最后一个拦截器也分析完啦!
总结 本篇分析了ConnectInterceptor、CallServerInterceptor两个拦截器的作用和原理。ConnectInterceptor负责连接的获取,其中涉及到连接池的概念;CallServerInterceptor是真正的网络IO读写。ConnectInterceptor涉及的内容较多,它是Okhttp的核心。
结合上一篇,我们已经分析完了Okhttp内部所有的拦截器,最后给出Okhttp的整体架构图(图片来源):
okhttp|你想要的系列(网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点))
文章图片

到这里,Okhttp源码解析部分就真的结束了,可真是一个漫长的过程!
从使用方式到工作流程,再到具体拦截器,掌握了这四篇文章的内容,应该说得上对Okhttp是比较熟悉了。这里还计划会出第五篇终章,来介绍一些Okhttp的常见问题和高级使用方式,敬请期待!
最后的最后,欢迎留言讨论,如果你喜欢这一系列,或者觉得写得还不错,请帮忙 点赞、收藏和转发,感谢!
.
感谢与参考:
okhttp源码解析
通过ConnectInterceptor源码掌握OKHttp3网络连接原理 呕心沥血第十弹【十】
OkHttp源码深度解析
OkHttp源码解析 (三)——代理和路由
OkHttp 源码学习笔记(三) 数据交换的流 HTTPCodec
欢迎关注我的 公 众 号:

    推荐阅读