
一、连接机制 连接的创建是在StreamAllocation对象统筹下完成的,我们前面面也说过它早在RetryAndFollowUpInterceptor就被创建了,StreamAllocation对象,主要用来管理两个关键角色:

  • RealConnection:真正建立连接的对象,利用Socket建立连接。
  • ConnectionPool:连接池,用来管理和复用连接。
/** Opens a connection to the target server and proceeds to the next interceptor. */ public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; }@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. //我们需要网络来满足这个request。可能用于验证条件GET。 boolean doExtensiveHealthChecks = !request.method().equals("GET"); //创建输出流 HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); //建立连接 RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } }

  • HttpCodec:用来编码HTTP requests和解码HTTP responses
  • RealConnection:连接对象,负责发起与服务器的连接。
三、创建连接 在上面的ConnectInterceptor类中我们可以知道,ConnectInterceptor用来完成连接,而真正的连接是在RealConnection中实现的,连接由连接池ConnectionPool管理,连接池最多保存5个地址连接的keep-alive,每个keep-alive时长为5分钟,并有异步线程清理无效的连接。
  • HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
  • RealConnection connection = streamAllocation.connection();
streamAllocation.newStream(client, chain, doExtensiveHealthChecks)方法最终调用findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled)方法来建立连接。
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. //1、查看是否有完好的连接 releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); if (this.connection != null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; }if (result == null) { // Attempt to get a connection from the pool.尝试从池中获取连接 //2、连接池中是否用可用的连接,有则使用 Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { //如果找到一个已经分配或池连接,就完成了。 // If we found an already-allocated or pooled connection, we're done. return result; }// If we need a route selection, make one. This is a blocking operation. //如果我们需要选择一条路线,就选一条。这是一个阻塞操作。 boolean newRouteSelection = false; //线程的选择,多IP操作 if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } //3、如果没有可用连接,则自己创建一个 synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } }if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } }// If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }// Do TCP + TLS handshakes. This is a blocking operation.进行TCP + TLS握手。这是一个阻塞操作。 //4、开始TCP以及TLS握手操作 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); //5、将新创建的连接,放在连接池中 Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }

  • Socket没有关闭
  • 输入流没有关闭
  • 输出流没有关闭
  • Http2连接没有关闭

public final class RealConnection extends Http2Connection.Listener implements Connection { public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) { if (protocol != null) throw new IllegalStateException("already connected"); //线路选择 RouteException routeException = null; List connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } else { if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) { throw new RouteException(new UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")); } } //开始连接 while (true) { try { //如果是通道模式,则建立通道连接 if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener); if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. break; } } else { //否则进行Socket连接,一般都是属于这种情况 connectSocket(connectTimeout, readTimeout, call, eventListener); } //建立https连接 establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol); break; } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; http2Connection = null; eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e); if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); }if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } }if (route.requiresTunnel() && rawSocket == null) { ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: " + MAX_TUNNEL_ATTEMPTS); throw new RouteException(exception); }if (http2Connection != null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } }

/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ //在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作都完成了吗 private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); //根据代理类型的不同处理Socket rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); eventListener.connectStart(call, route.socketAddress(), proxy); rawSocket.setSoTimeout(readTimeout); try { //建立Socket连接 Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; }// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { //获取输入/输出流 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } catch (NullPointerException npe) { if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) { throw new IOException(npe); } } } }

Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException { socket.connect(address, connectTimeout); }

  • 我们知道在负责的网络环境下,频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)是非常消耗网络资源和浪费时间的,HTTP中的keepalive连接对于降低延迟和提升速度有非常重要的作用。
  • 复用连接就需要对连接进行管理,这里就引入了连接池的概念。
  • Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),连接池有ConectionPool实现,对连接进行回收和管理。
public final class ConnectionPool { /** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. */ private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp ConnectionPool", true)); /** * The maximum number of idle connections for each address. */ private final int maxIdleConnections; private final long keepAliveDurationNs; //清理连接,在线程池executor里调用。 private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { //执行清理,并返回下次需要清理的时间。 long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { //在timeout时间内释放锁 ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; }

ConectionPool在内部维护了一个线程池,来清理连,清理任务由cleanup()方法完成,它是一个阻塞操作,首先执行清理,并返回下次需要清理的间隔时间,调用调用wait() 方法释放锁。等时间到了以后,再次进行清理,并返回下一次需要清理的时间,循环往复。接下来我们来看下cleanup()方法的具体实现。
public final class ConnectionPool { /** * Performs maintenance on this pool, evicting the connection that has been idle the longest if * either it has exceeded the keep alive limit or the idle connections limit. * * Returns the duration in nanos to sleep until the next scheduled call to this method. Returns * -1 if no further cleanups are required. */ long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; // Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { //遍历所有的连接,标记处不活跃的连接。 for (Iterator i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); // If the connection is in use, keep searching.如果正在使用连接,请继续搜索。 //1、查询此连接内部的StreanAllocation的引用数量。 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; }idleConnectionCount++; // If the connection is ready to be evicted, we're done.如果连接准备好被清除,我们就完成了 //2、标记空闲连接。 long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } }if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). //我们找到了与驱逐的联系。 将其从列表中删除,然后将其关闭(在同步块之外)。 //3、如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理掉。 connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { // A connection will be ready to evict soon.一个连接将随时准备退出。 //4、返回此连接的到期时间,供下次进行清理。 return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. //所有连接都在使用中。至少在我们再次运行之前,它会一直存在。 //5、全部都是活跃连接,5分钟时候再进行清理。 return keepAliveDurationNs; } else { // No connections, idle or in use.没有连接,空闲或正在使用。 //6、没有任何连接,跳出循环。 cleanupRunning = false; return -1; } } //7、关闭连接,返回时间0,立即再次进行清理。 closeQuietly(longestIdleConnection.socket()); // Cleanup again immediately. return 0; } }

在RealConnection里有个StreamAllocation虚引用列表,每创建一个StreamAllocation,就会把它添加进该列表中,如果留关闭以后就将StreamAllocation 对象从该列表中移除,正是利用利用这种引用计数的方式判定一个连接是否为空闲连接,
/** Current streams carried by this connection. */ public final List allocations = new ArrayList<>();

4.3、pruneAndGetAllocationCount(RealConnection connection, long now)方法
public final class ConnectionPool { /** * Prunes any leaked allocations and then returns the number of remaining live allocations on * {@code connection}. Allocations are leaked if the connection is tracking them but the * application code has abandoned them. Leak detection is imprecise and relies on garbage * collection. */ private int pruneAndGetAllocationCount(RealConnection connection, long now) { //虚引用列表 List references = connection.allocations; //遍历虚引用列表 for (int i = 0; i < references.size(); ) { Reference reference = references.get(i); //如果虚引用StreamAllocation正在被使用,则跳过进行下一次循环, if (reference.get() != null) { //引用计数 i++; continue; }// We've discovered a leaked allocation. This is an application bug. StreamAllocation.StreamAllocationReference streamAllocRef = (StreamAllocation.StreamAllocationReference) reference; String message = "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace); //否则移除该StreamAllocation引用 references.remove(i); connection.noNewStreams = true; // If this was the last allocation, the connection is eligible for immediate eviction. // 如果所有的StreamAllocation引用都没有了,返回引用计数0 if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return 0; } } //返回引用列表的大小,作为引用计数 return references.size(); } }

  • 构造方法中设置了每个地址的最大空闲连接数maxIdleConnections以及默认每个连接的存活时间keepAliveDurationNs

public final class ConnectionPool { //最大的空闲连接数--每个地址的最大空闲连接数 private final int maxIdleConnections; //连接持续时间 private final long keepAliveDurationNs; /** * Create a new connection pool with tuning parameters appropriate for a single-user application. * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity. */ //使用适合单用户应用程序的调整参数创建新的连接池。此池中的调整参数在将来的OkHttp版本中可能会发生变化。 // 目前,该池最多可容纳个空闲连接,这些连接将在5分钟不活动后被驱逐。 //默认每个地址的最大连接数是5个 //默认每个连接的存活时间为5分钟 public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); }public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration); // Put a floor on the keep alive duration, otherwise cleanup will spin loop. if (keepAliveDuration <= 0) { throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } } }

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { static final List DEFAULT_PROTOCOLS = Util.immutableList( Protocol.HTTP_2, Protocol.HTTP_1_1); public Builder() { ... //连接池 管理HTTP和HTTP / 2连接的重用以减少网络延迟。 //默认每个地址的最大连接数是5个 //默认每个连接的存活时间为5分钟 connectionPool = new ConnectionPool(); } }

public final class ConnectionPool { //连接池中维护了一个双端队列Deque来存储连接 private final Deque connections = new ArrayDeque<>(); //将连接加入到双端队列 void put(RealConnection connection) { assert (Thread.holdsLock(this)); //没有任何连接时,cleanupRunning = false; // 即没有任何链接时才会去执行executor.execute(cleanupRunnable); // 从而保证每个连接池最多只能运行一个线程。 if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); } }

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { static { Internal.instance = new Internal() { ... @Override public void put(ConnectionPool pool, RealConnection connection) { pool.put(connection); } ... }; } }

  • put方法在ConnectInterceptor----》intercept----》streamAllocation.newStream----》findHealthyConnection----》创建新链接后调用。

