OkHttp拦截器 (四) ConnectInterceptor拦截器分析

一:intercept()方法: 先看源码:

@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); //建立Http网络请求所有需要的网络组建,在RetryAndFollowUpInterceptor中创建的StreamAllocation,此处从拦截器链中获取 StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. // 我们需要网络来满足这个要求。可能用于验证条件GET。 boolean doExtensiveHealthChecks = !request.method().equals("GET"); // 创建输出流 // HttpCodec:用来编码HTTP requests和解码HTTP responses HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); // 建立连接 // RealConnection:连接对象,负责发起与服务器的连接。这个RealConnection对象是用来进行实际的网络IO传输的。 RealConnection connection = streamAllocation.connection(); //执行下一个拦截器,返回response return realChain.proceed(request, streamAllocation, httpCodec, connection); } }

可用看到intercept中的处理主要有以下几步骤:
1,从拦截器中获取StreamAlloction对象,在第一个拦截器RetryAndFollowUpInterceptor中就已经创建
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace);


2,执行StreamAlloction对象的newStream()方法创建HttpCodec,用于处理编码Request和解码Response;

3,接着通过调用StreamAllocation对象的 connection() 方法获取到RealConnection对象,这个RealConnection对象是用来进行实际的网络IO传输的。

4,调用拦截器链的proceed方法,执行下一个拦截器返回response对象。

从以上intercept()方法的流程看,主要逻辑是在StreamAlloction对象中。
二,StreamAlloction 分析 先看newStream:
/** * 主要创建了RealConnection对象(用于进行实际的网络IO传输)和HttpCodec对象(用于处理编码Request和解码Response), * 并将HttpCodec对象返回。 * @param client * @param chain * @param doExtensiveHealthChecks * @return */ public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); //设置的连接超时时间 int readTimeout = chain.readTimeoutMillis(); //读取超时 int writeTimeout = chain.writeTimeoutMillis(); //写入超时//Web socket ping 间隔 (毫秒) 定时通知服务器,为心跳连接做准备, // 如果pingIntervalMillis 设置为0的时候 心跳executor是不会执行的 int pingIntervalMillis = client.pingIntervalMillis(); // 连接失败是否重试 boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { // 创建RealConnection对象 生成实际的网络连接类,RealConnection利用Socket建立连接 RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); // 创建HttpCodec对象 通过网络连接的实际类生成网络请求和网络响应的编码类 HttpCodec resultCodec = resultConnection.newCodec(client, chain, this); synchronized (connectionPool) { codec = resultCodec; // 返回HttpCodec对象 return resultCodec; } } catch (IOException e) { throw new RouteException(e); } }

主要创建了RealConnection对象(用于进行实际的网络IO传输)和HttpCodec对象(用于处理编码Request和解码Response),并将HttpCodec对象返回。

findHealthyConnection()方法用于创建RealConnection对象:
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. * 找到一个连接,如果它是正常的,则返回它。如果不正常,则重复此过程,直到找到正常连接 * * findHealthyConnection主要做的事情: * 1. 开启一个while循环,通过调用findConnection()方法获取RealConnection对象赋值给candidate; * 2. 如果candidate 的successCount 为0,直接返回candidate,while循环结束; * 3 .调用candidate的isHealthy()方法,进行“健康检查”,如果candidate是一个不“健康”的对象, * 其中不“健康”指的是Socket没有关闭、或者它的输入输出流没有关闭,则对调用noNewStreams()方法进行销毁处理,接着继续循环。 * * */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { /** * 第一步: 开启while循环,通过调用findConnection方法获取RealConnection对象赋值给cadidate */ while (true) { // 获取RealConnection对象 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); /** * 第二步: 如果candidate的successCount==0,直接返回candidate,while循环结束; */ // If this is a brand new connection, we can skip the extensive health checks. // 同步代码块判断RealConnection对象的successCount是否为0 // synchronized (connectionPool) { if (candidate.successCount == 0) {//等于0的时候表示整个网络请求已经结束了 //如果为0则返回 return candidate; } }/** * 第三步: * 调用candidate的isHealthy()方法,进行“健康检查”,如果candidate是一个不“健康”的对象, * 其中不“健康”指的是Socket没有关闭、或者它的输入输出流没有关闭, * 则对调用noNewStreams()方法进行销毁处理,接着继续循环。 */ // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. // 对连接池中不健康的链接做销毁处理 if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); //回收网络请求资源 continue; //跳出这次循环,接着下一次循环 }return candidate; } }

在findHealthyConnection方法中获取RealConnection对象是通过findConnection方法获取。
下面在分析findConnection方法
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. * findConnection方法做了一下操作 * 1.StreamAllocation的connection如果可以复用则复用; * 2.如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回; * 3.如果连接池里没有,则new一个RealConnection对象; * 4.调用RealConnection的connect()方法发起请求; *connect-->connectSocket()进行socket连接-->Platform.get().connectSocket()-->socket.connect(address, connectTimeout); *(此时进行了三次握手),握手完成后调用establishProtocol()。 * * 5.将RealConnection对象存进连接池中,以便下次复用; * 6.返回RealConnection对象 */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. // // 翻译上面的注释:尝试使用已分配的连接。 我们在这里需要小心,因为我们已经分配的连接可能已经被限制在创建新的流中 releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); // 如果connection 不为null,也就是有完好的连接(RealConnection),则复用,赋值给result if (this.connection != null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; }/** * 如果result==null,也就是说this.connection==null,说明上面找不到可用复用的从连接池中获取可用的连接 */ if (result == null) { // Attempt to get a connection from the pool. // 从连接池中获取,调用其get()方法 Internal.instance.get(connectionPool, address, this, null); // 找到对应的RealConnection对象 if (connection != null) { //更改标志位,赋值给result foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); }if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // 已经找到RealConnection对象,直接返回 // If we found an already-allocated or pooled connection, we're done. return result; }// If we need a route selection, make one. This is a blocking operation. // 如果我们需要选择一条路线,就选一条。这是一个阻塞操作。 boolean newRouteSelection = false; // 线路的选择,多IP操作 if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } //如果没有可用连接,则自己创建一个 synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. // 获取路由表中的信息,并且缓存起来 // 现在我们有一组IP地址,再次尝试从池中获取连接。这可能由连接合并而匹配 List routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); // 第二次获取到连接 if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } //没有获取到 if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. // 建一个连接并立即将其分配给该分配。 这使得异步cancel()可以中断我们即将进行的握手 route = selectedRoute; refusedStreamCount = 0; // 如果从缓存中没有找到连接,则new一个新的连接 result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } }// If we found a pooled connection on the 2nd time around, we're done. // 如果我们第二次从连接池发现一个连接,我们就完成了 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }// Do TCP + TLS handshakes. This is a blocking operation. 进行实际的网络连接 // 连接具体方法 开始TCP以及TLS握手操作,这是阻塞操作http的三次握手操作 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); // 将新创建的连接,放入连接池中 Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. // 将RealConnection 连接放入连接池 Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. // 如果同时创建了到同一地址的另一个多路复用连接,则释放此连接并获取该连接 if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }

以上代码主要做的事情有:
  1. StreamAllocation的connection如果可以复用则复用;
  2. 如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回;
  3. 如果连接池里没有,则new一个RealConnection对象;
  4. 调用RealConnection的connect()方法发起请求;
  5. 将RealConnection对象存进连接池中,以便下次复用;
  6. 返回RealConnection对象。

三:ConnectionPool连接池介绍 在findConnection方法中从连接池中获取RealConnection对象是通过Internal的get()方法,存进去的时候调用了put()方法
//第一次获取 Internal.instance.get(connectionPool, address, this, null); //第二次获取 Internal.instance.get(connectionPool, address, this, route);

//存入的时候调用put方法 Internal.instance.put(connectionPool, result);

Internal是一个抽象类,里面定义了一个静态变量instance
public abstract class Internal { ... public static Internal instance; ... }

instance的实例化在OkHttpClient的静态代码块中:
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { ......static { Internal.instance = new Internal() {......@Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) { return pool.get(address, streamAllocation, route); }......@Override public void put(ConnectionPool pool, RealConnection connection) { pool.put(connection); }......}; }

这里我们可以看到实际上Internal的get()方法和put()方法是调用了ConnectionPool的get()方法和put()方法
1:连接池ConnectionPool的get方法
ConnectionPool的get方法
/** * Returns a recycled connection to {@code address}, or null if no such connection exists. The * route is null if the address has not yet been routed. * 通过遍历connections(用于存放用于存放RealConnection的ArrayDeque队列), * 调用RealConnection的isEligible()方法判断其是否可用,如果可用就会调用streamAllocation的acquire()方法, * 并返回connection。 */ @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { /** * isEligible方法判断是否可用 */ if (connection.isEligible(address, route)) { //调用acquire streamAllocation.acquire(connection, true); return connection; } } return null; }

StreamAllocation 的acquire方法:
/** * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to * {@link #release} on the same connection. */ public void acquire(RealConnection connection, boolean reportedAcquired) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); //赋值给全局变量 this.connection = connection; this.reportedAcquired = reportedAcquired; //创建StreamAllocationReference对象并添加到allocations集合中 connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); }

连接池get方法总结:
  • 1.先是从连接池中获取的RealConnection对象赋值给StreamAllocation的成员变量connection;
  • 2.创建StreamAllocationReference对象(StreamAllocation对象的弱引用),
    并添加到RealConnection的allocations集合中,到时可以通过allocations集合的大小来判断网络连接次数是否超过OkHttp指定的连接次数。
2 连接池ConnectionPool的put方法
put方法将连接RealConnection添加到连接池。在添加之前会先执行清理任务。
/** * put()方法在将连接添加到连接池之前,会先执行清理任务,通过判断cleanupRunning是否在执行, * 如果当前清理任务没有执行,则更改cleanupRunning标识,并执行清理任务cleanupRunnable。 * * @param connection */ void put(RealConnection connection) { assert (Thread.holdsLock(this)); /** * cleanupRunning 清理是否在执行标志 */ if (!cleanupRunning) { cleanupRunning = true; // executor是线程池,此处异步执行清理任务cleanupRunnable executor.execute(cleanupRunnable); } //将连接添加到连接池 connections.add(connection); }

下面是cleanupRunnable
private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { //cleanup方法对连接池进行清理,返回进行下次清理的间隔时间 long waitNanos = cleanup(System.nanoTime()); //如果返回的数据间隔为-1,则会接受循环 if (waitNanos == -1) return; //如果大于1会调用wait进行等待 if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { //进行等待 ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } };

cleanupRunnable方法主要调用cleanup对连接进行清理,
cleanup()方法通过for循环遍历connections队列,记录最大空闲时间和空闲时间最长的连接;如果存在超过空闲保活时间或空闲连接数超过最大空闲连接数限制的连接,则从connections中移除,最后执行关闭该连接的操作。
四:RealConnection的连接 在上面的findConnection方法中新建RealConnection实例后会调用RealConnection的connect方法建立连接
RealConnection的connect方法:
public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) { if (protocol != null) throw new IllegalStateException("already connected"); // 路线选择 RouteException routeException = null; List connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } else { if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) { throw new RouteException(new UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")); } } // 开始连接 while (true) { try { //建立隧道连接 if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener); if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. // 我们无法连接隧道,但正确关闭了我们的资源 break; } } else { // 建立普通连接 connectSocket(connectTimeout, readTimeout, call, eventListener); } //建立协议 // 不管是建立隧道连接,还是建立普通连接,都少不了建立协议这一步。 // 这一步是在建立好了TCP连接之后,而在TCP能被拿来收发数据之前执行的。 // 它主要为数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协议协商等 establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol); //完成连接 break; } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; http2Connection = null; eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e); if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); }if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } }if (route.requiresTunnel() && rawSocket == null) { ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: " + MAX_TUNNEL_ATTEMPTS); throw new RouteException(exception); }if (http2Connection != null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } }

连接分为两种,一种隧道连接,一种普通连接

普通连接:
先来看普通连接:调用connectSocket(connectTimeout, readTimeout, call, eventListener);
1: connectSocket(connectTimeout, readTimeout, call, eventListener)源码如下:
/** * 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作。 */ /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); //根据代理类型的不同处理Socket rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); eventListener.connectStart(call, route.socketAddress(), proxy); //设置超时时间 rawSocket.setSoTimeout(readTimeout); try { //获取指定的平台去进行连接 Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; }// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { //得到连接器的输入流对象 source = Okio.buffer(Okio.source(rawSocket)); //得到连接器的输出流对象 sink = Okio.buffer(Okio.sink(rawSocket)); } catch (NullPointerException npe) { if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) { throw new IOException(npe); } } }

注意Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout)调用Platform的connectSocket
如下所示Platform的connectSocket源码:
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException { 最终调用java的connect socket.connect(address, connectTimeout); }

最终调用了java的Socket的connect方法建立连接。

隧道连接:
在上面的connect方法中调用connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
connectTunnel()隧道链接
/** * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a * proxy server can issue an auth challenge and then close the connection. * 是否通过代理隧道建立HTTPS连接的所有工作。这里的问题是代理服务器可以发出一个验证质询,然后关闭连接。 */ private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call, EventListener eventListener) throws IOException { //构造一个 建立隧道连接请求 Request tunnelRequest = createTunnelRequest(); HttpUrl url = tunnelRequest.url(); for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) { // 与HTTP代理服务器建立TCP连接 connectSocket(connectTimeout, readTimeout, call, eventListener); // 创建隧道。这主要是将 建立隧道连接 请求发送给HTTP代理服务器,并处理它的响应 tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url); if (tunnelRequest == null) break; // Tunnel successfully created.// The proxy decided to close the connection after an auth challenge. We need to create a new // connection, but this time with the auth credentials. closeQuietly(rawSocket); rawSocket = null; sink = null; source = null; eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null); } }

隧道创建方法createTunnel(),返回一个Request对象:
/** * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create * the proxy connection. This may need to be retried if the proxy requires authorization. * 要通过HTTP代理建立HTTPS连接,请发送未加密的CONNECT请求以创建代理连接。 如果代理需要授权,则可能需要重试。 */ private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest, HttpUrl url) throws IOException { // Make an SSL Tunnel on the first message pair of each SSL + proxy connection. // 在每个SSL + 代理连接的第一个消息对上创建一个SSL隧道。 String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1"; while (true) { Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink); source.timeout().timeout(readTimeout, MILLISECONDS); sink.timeout().timeout(writeTimeout, MILLISECONDS); tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine); tunnelConnection.finishRequest(); Response response = tunnelConnection.readResponseHeaders(false) .request(tunnelRequest) .build(); // The response body from a CONNECT should be empty, but if it is not then we should consume // it before proceeding. long contentLength = HttpHeaders.contentLength(response); if (contentLength == -1L) { contentLength = 0L; } Source body = tunnelConnection.newFixedLengthSource(contentLength); Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS); body.close(); switch (response.code()) { case HTTP_OK: // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If // that happens, then we will have buffered bytes that are needed by the SSLSocket! // This check is imperfect: it doesn't tell us whether a handshake will succeed, just // that it will almost certainly fail because the proxy has sent unexpected data. if (!source.buffer().exhausted() || !sink.buffer().exhausted()) { throw new IOException("TLS tunnel buffered too many bytes!"); } return null; case HTTP_PROXY_AUTH: tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response); if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy"); if ("close".equalsIgnoreCase(response.header("Connection"))) { return tunnelRequest; } break; default: throw new IOException( "Unexpected response code for CONNECT: " + response.code()); } } }

【OkHttp拦截器 (四) ConnectInterceptor拦截器分析】

    推荐阅读