网络请求|OkHttp源码分析 (一)

OkHttp 源码阅读 一 老生常谈,项目中用到最多的网络请求框架应该是ok了吧,那么对于ok的内部实现流程也要有充分的了解才行,这样才方便解决一些遇到的问题。其实在网上,随便一搜okhttp的原理,会有很多文章,但是看过后,要么太简短要么太复杂,有的看起来甚至是一头雾水,这样虽然自己看过,但是真正记到脑中的知识海了吗?所以想试试以自己的想法,自己的思维来理解一遍实现方式,来理解一下为什么短短的几行调用方式就可以实现一个网络请求。就写一下笔记记录下自己的理解。
要理解一个东西,知道它是什么----->知道它怎么用----->它是怎么实现的
它是什么 【网络请求|OkHttp源码分析 (一)】okhttp的GitHub地址
框架介绍
HTTP是现代应用程序网络的方式。这就是我们交换数据和媒体的方式。有效地执行HTTP可以使您的内容加载更快,并节省带宽。
OkHttp是默认情况下有效的HTTP客户端:
优点

  1. HTTP / 2支持允许对同一主机的所有请求共享一个套接字。
  2. 连接池可减少请求延迟(如果HTTP / 2不可用)。
  3. 透明的GZIP缩小了下载大小。
  4. 响应缓存可以完全避免网络重复请求。
这里不做详细概述,过。
它怎么用 请求方式同步请求和异步请求
同步请求
private fun okRequest1() {//创建client实例(两种方式,一种是默认,一种建造者模式构建) val client = OkHttpClient() //val client = OkHttpClient.Builder() //.connectTimeout(30,TimeUnit.SECONDS) //.build(); //创建请求信息 val request = Request.Builder() .url("https://www.wanandroid.com/friend/json") .build() try { //新开一个子线程,网络请求必须放在子线程中请求 GlobalScope.launch { //发起请求,并接收返回信息 val response = client.newCall(request).execute() println(TAG + response.body!!.string()) } } catch (e: IOException) { e.printStackTrace() }}

2020-07-29 09:50:22.285 20476-20526/com.example.yuanma I/System.out: Kr----->{"data":[{"icon":"","id":17,"link":"http://www.wanandroid.com/article/list/0?cid=176","name":"国内大牛博客集合","order":1,"visible":1}}

异步请求
private fun okRequest2() { //创建client实例(两种方式,一种是默认,一种建造者模式构建) val client = OkHttpClient() //val client = OkHttpClient.Builder() //.connectTimeout(30,TimeUnit.SECONDS) //.build(); //创建请求信息 val request = Request.Builder() .url("https://www.wanandroid.com/article/list/0/json") .build() try { //新开一个子线程,网络请求必须放在子线程中请求 GlobalScope.launch { //发起请求,并接收返回信息 client.newCall(request).enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { println(TAG + "onFailure : " + e.message) }override fun onResponse(call: Call, response: Response) { println(TAG + "onResponse : " + response.body!!.string()) } }) } } catch (e: IOException) { e.printStackTrace() }}

2020-07-29 10:12:53.527 21141-21199/com.example.yuanma I/System.out: Kr----->onResponse : {"data":{"curPage":1,"datas":[{"apkLink":"","audit":1,"author":"","canEdit":false,"chapterId":502,"chapterName":"自助","collect":false,"courseId":13,"desc":"","descMd":"","envelopePic":"","fresh":true,"id":14538,"link":"https://mp.weixin.qq.com/s/uiVYTTFZhYPsC2ykjQ6Tfw","niceDate":"48分钟前","niceShareDate":"48分钟前","origin":"","prefix":"","projectLink":"","publishTime":1595985848000,"realSuperChapterId":493,"selfVisible":0,"shareDate":1595985848000,"shareUser":"飞洋","superChapterId":494,"superChapterName":"广场Tab","tags":[],"title":"你想要的Android性能优化系列:内存优化!","type":0,"userId":31360,"visible":1,"zan":0}}

ok使用方式就先到这,下面看内部实现
okhttp内部实现 通过上面两种请求方式的比对,发现除了最后请求方式,上面的流程都是一样,都是先创建一个client实例,然后构建request实例,然后构建newCall 实例,然后通过call对象进行调用不同的方法,同步请求调用execute()方法,异步请求调用enqueue()方法。
okhttpClient实例的创建 那么就从第一步client实例构建开始看,我们进去这个类,不知道有没有看注释的习惯,建议看类的源码,首先从类注释看,这样我们可以得到一些有用的信息。
网络请求|OkHttp源码分析 (一)
文章图片

通过看这段注释,咦,是不是发现了什么关键字,对,就是每个okhttpClient都维护了自己的连接池和线程池,关键字 连接池和线程池,那么它们有什么用呢,下一句话讲了—> 可重用连接和线程可以减少等待时间和节省内存。这个先放着。
我们先看okhttpClient的构建,噢,典型的建造者模式
class Builder constructor() { internal var dispatcher: Dispatcher = Dispatcher()//调度器 internal var connectionPool: ConnectionPool = ConnectionPool()//连接池 internal val interceptors: MutableList = mutableListOf()//拦截器 internal val networkInterceptors: MutableList = mutableListOf()//网络拦截器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var retryOnConnectionFailure = true internal var authenticator: Authenticator = Authenticator.NONE internal var followRedirects = true internal var followSslRedirects = true internal var cookieJar: CookieJar = CookieJar.NO_COOKIES internal var cache: Cache? = null internal var dns: Dns = Dns.SYSTEM internal var proxy: Proxy? = null internal var proxySelector: ProxySelector? = null internal var proxyAuthenticator: Authenticator = Authenticator.NONE internal var socketFactory: SocketFactory = SocketFactory.getDefault() internal var sslSocketFactoryOrNull: SSLSocketFactory? = null//socket工厂 internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List = DEFAULT_CONNECTION_SPECS //连接配置 internal var protocols: List = DEFAULT_PROTOCOLS internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null internal var callTimeout = 0 internal var connectTimeout = 10_000 internal var readTimeout = 10_000 internal var writeTimeout = 10_000 internal var pingInterval = 0 internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE internal var routeDatabase: RouteDatabase? = nullinternal constructor(okHttpClient: OkHttpClient) : this() { this.dispatcher = okHttpClient.dispatcher this.connectionPool = okHttpClient.connectionPool this.interceptors += okHttpClient.interceptors this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure this.authenticator = okHttpClient.authenticator this.followRedirects = okHttpClient.followRedirects this.followSslRedirects = okHttpClient.followSslRedirects this.cookieJar = okHttpClient.cookieJar this.cache = okHttpClient.cache this.dns = okHttpClient.dns this.proxy = okHttpClient.proxy this.proxySelector = okHttpClient.proxySelector this.proxyAuthenticator = okHttpClient.proxyAuthenticator this.socketFactory = okHttpClient.socketFactory this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull this.x509TrustManagerOrNull = okHttpClient.x509TrustManager this.connectionSpecs = okHttpClient.connectionSpecs this.protocols = okHttpClient.protocols this.hostnameVerifier = okHttpClient.hostnameVerifier this.certificatePinner = okHttpClient.certificatePinner this.certificateChainCleaner = okHttpClient.certificateChainCleaner this.callTimeout = okHttpClient.callTimeoutMillis this.connectTimeout = okHttpClient.connectTimeoutMillis this.readTimeout = okHttpClient.readTimeoutMillis this.writeTimeout = okHttpClient.writeTimeoutMillis this.pingInterval = okHttpClient.pingIntervalMillis this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress this.routeDatabase = okHttpClient.routeDatabase }

调用OkhttpClient.Builder().build()
Requst的实例的创建 也是建造者模式的构建,默认GET请求
open class Builder { internal var url: HttpUrl? = null internal var method: String internal var headers: Headers.Builder internal var body: RequestBody? = null/** A mutable map of tags, or an immutable empty map if we don't have any. */ internal var tags: MutableMap, Any> = mutableMapOf()constructor() { this.method = "GET" this.headers = Headers.Builder() }internal constructor(request: Request) { this.url = request.url this.method = request.method this.body = request.body this.tags = if (request.tags.isEmpty()) { mutableMapOf() } else { request.tags.toMutableMap() } this.headers = request.headers.newBuilder() }open fun url(url: HttpUrl): Builder = apply { this.url = url }

总结一下:用构建者模式创建client和request的实例,里面封装了一些使用中必要的属性。
下面看call对象的构建
val call=client.newCall(request)/** Prepares the [request] to be executed at some point in the future. */ override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

点进去看到,这里call对象是由实现类RealCall定义的
网络请求|OkHttp源码分析 (一)
文章图片

下面看同步请求的excute()方法
override fun execute(): Response { //检查请求是否已执行,如果已执行,则抛出异常。反之,将executed设置为true。 check(executed.compareAndSet(false, true)) { "Already Executed" } //设置超时时间开始计时 timeout.enter() callStart() try { //在这步中将请求添加到同步请求队列中 client.dispatcher.executed(this) //调用拦截器,在这里才是真正的请求触发,请求发起获取响应都在这。 return getResponseWithInterceptorChain() } finally { //不管请求成功还是异常,都关闭当前请求任务 client.dispatcher.finished(this) } } private fun callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") //给这个回调,请求开始 eventListener.callStart(this) }

下面进到dispatcher.excuted()中看下
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private val runningSyncCalls = ArrayDeque() /** Used by [Call.execute] to signal it is in-flight. */ @Synchronized internal fun executed(call: RealCall) { //将当前请求任务添加到请求队列中。 runningSyncCalls.add(call) }

这里先暂时知道是在dispatcher中将请求任务添加到请求队列中就行,dispatcher 后面单独介绍。
下面调用getResponseWithInterceptorChain(),首先通过字面意思,我们可以知道这个方法意思是通过拦截器链获取响应。
进到getResponseWithInterceptorChain()中看下
@Throws(IOException::class) internal fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors.构建一个拦截器堆栈,定义,添加,添加添加……注意这里是有序的,顺序不能乱 val interceptors = mutableListOf() //client.interceptors就是我们定义okhttpClient时候调用的addIntercepter interceptors += client.interceptors //okhttp自己实现的拦截器 interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) //然后构造一个RealInterceptorChain对象 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis )var calledNoMoreExchanges = false try { //然后最后返回的response是在这里调用后返回 val response = chain.proceed(originalRequest) if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }

看下chain.proceed()方法
@Throws(IOException::class) override fun proceed(request: Request): Response { check(index < interceptors.size)calls++if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } }// Call the next interceptor in the chain. val next = copy(index = index + 1, request = request) val interceptor = interceptors[index]@Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null")if (exchange != null) { check(index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } }check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response }

这个方法里会构造一个index=index+1的RealInterceptorChain next,并调用interceptors[index].intercept(next),Interceptor中又会调用RealInterceptorChain.proceed()(除了最后一个interceptor,最后一个interceptor是CallServerInterceptor,后面单独讲解),这样一直链式调用,直到调用到最后一个interceptor的intercept()方法。。
这个就是典型的 责任链设计模式
因为最后一个是callServerInterceptor,所以这个就是最后发起请求接收响应的最后一步调用,等会再看。
下面看异步请求的enqueue()方法
override fun enqueue(responseCallback: Callback) { check(executed.compareAndSet(false, true)) { "Already Executed" }callStart() client.dispatcher.enqueue(AsyncCall(responseCallback)) }

和同步一样,都是先做个是否当前有请求的判断。
然后调用dispatcher的enqueue方法,首先看下 client.dispatcher.enqueue(AsyncCall(responseCallback))这里,异步方法调用中传入的是一个AsycCall,看下实现
internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable { @Volatile var callsPerHost = AtomicInteger(0) private setfun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost }val host: String get() = originalRequest.url.hostval request: Request get() = originalRequestval call: RealCall get() = this@RealCall/** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()var success = false try { executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher.finished(this) // This call is no longer running! } } }override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } } }internal class CallReference( referent: RealCall, /** * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for * identifying the origin of connection leaks. */ val callStackTrace: Any? ) : WeakReference(referent) }

看玩这个之后呢,再返回,我们进到enqueue方法中看下
internal fun enqueue(call: AsyncCall) { synchronized(this) { //首先将要执行的异步任务加入到队列中 readyAsyncCalls.add(call)// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() }

做个判断然后调用promoteAndExecute()方法,看下这个方法
/** * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the * executor service. Must not be called with synchronization because executing calls can call * into user code. * * @return true if the dispatcher is currently running calls. */ private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //判断当前请求数是不是大于最大请求数 64 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //判断当前连接主机数是不是超过最大值 5个 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //从当前需要准备请求的队列中移除当前请求 i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) //将当前需要准备执行的异步任务加入到正在运行异步任务的队列中 runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 }//遍历当前需要执行的异步任务,拿到异步任务执行。 for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) }return isRunning }

这块呢,就是从队列中取请求,判断当前请求数是不是大于最大请求数 64,//判断当前连接主机数是不是超过最大值 5个,//从当前需要准备请求的队列中移除当前请求,//将当前需要准备执行的异步任务加入到正在运行异步任务的队列中。最后执行的步骤就是asyncCall.executeOn(executorService)
executorService 这个东西就是一个线程池
@get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }

核心线程数量为 0 ,如果空闲的话是 60 s 的存活期,第二个参数传入了 Integer 的最大值,即线程池所能容纳的最大线程数为 Integer.MAX_VALUE ,虽然这里设置了很大的值,但是实际情况下并非会达到最大值,因为上面 enqueue() 方法中有做了判断。
那么它既然是放在子线程中处理的,那么就找下run()方法,诶,还真有,看下
override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false //计时 timeout.enter() try { //最后又调用了getResponseWithInterceptorChain() val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } }

开启计时,然后通过回调把响应数据返回给调用层,最后 finished,判断是否有需要执行的任务,然后继续删除准备执行的异步任务,开始执行删除的异步任务。
到了这里 同步执行、异步执行 最后都调用getResponseWithInterceptorChain()函数
getResponseWithInterceptorChain()可以看下上面的同步。
由上面可以知道最后调用的拦截器都是CallServerInterceptor,看下
//首先通过注释我们就可以看到,这是拦截器的链式调用的最后一个,它负责向服务端发起一个网络请求 /** This is the last interceptor in the chain. It makes a network call to the server. */ class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() exchange.writeRequestHeaders(request)var invokeStartEvent = true var responseBuilder: Response.Builder? = null if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() }if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code if (code == 100) { // Server sent a 100-continue even though we did not request one. Try again to read the actual // response status. responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() } response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code }exchange.responseHeadersEnd(response)response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close".equals(response.request.header("Connection"), ignoreCase = true) || "close".equals(response.header("Connection"), ignoreCase = true)) { exchange.noNewExchangesOnConnection() } if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}") } return response } }

在当前拦截器中我们把请求 head /body 通过 okio 写入了服务端,然后根据服务端的响应数据构建响应头、响应体等一些响应数据。然后再return response , 就回到了realCall的excute()方法那里。这里,有兴趣的可以搜一下深入了解一下连接过程,我看的脑袋疼。
最后,看一下Dispatcher
Dispatcher
/** * Policy on when async requests are executed. * * Each dispatcher uses an [ExecutorService] to run calls internally. If you supply your own * executor, it should be able to run [the configured maximum][maxRequests] number of calls * concurrently. */ class Dispatcher constructor() { /** * The maximum number of requests to execute concurrently. Above this requests queue in memory, * waiting for the running calls to complete. * * If more than [maxRequests] requests are in flight when this is invoked, those requests will * remain in flight. */ @get:Synchronized var maxRequests = 64 set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } synchronized(this) { field = maxRequests } promoteAndExecute() }/** * The maximum number of requests for each host to execute concurrently. This limits requests by * the URL's host name. Note that concurrent requests to a single IP address may still exceed this * limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy. * * If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests * will remain in flight. * * WebSocket connections to hosts **do not** count against this limit. */ @get:Synchronized var maxRequestsPerHost = 5 set(maxRequestsPerHost) { require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" } synchronized(this) { field = maxRequestsPerHost } promoteAndExecute() }/** * A callback to be invoked each time the dispatcher becomes idle (when the number of running * calls returns to zero). * * Note: The time at which a [call][Call] is considered idle is different depending on whether it * was run [asynchronously][Call.enqueue] or [synchronously][Call.execute]. Asynchronous calls * become idle after the [onResponse][Callback.onResponse] or [onFailure][Callback.onFailure] * callback has returned. Synchronous calls become idle once [execute()][Call.execute] returns. * This means that if you are doing synchronous calls the network layer will not truly be idle * until every returned [Response] has been closed. */ @set:Synchronized @get:Synchronized var idleCallback: Runnable? = nullprivate var executorServiceOrNull: ExecutorService? = null@get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }/** Ready async calls in the order they'll be run. */ private val readyAsyncCalls = ArrayDeque()/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private val runningAsyncCalls = ArrayDeque()/** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private val runningSyncCalls = ArrayDeque()constructor(executorService: ExecutorService) : this() { this.executorServiceOrNull = executorService } 省略

dispatcher这里面,先看类注释
每个调度使用的ExecutorService的在内部运行的call。 若你提供你自己的线程池,它应该能够运行配置的最大并发呼叫数量
看dispatcher的源码可知,dispatcher就相当于一个中转点,这里面维护了三个ArrayDeque,用来保存请求
readyAsyncCalls:准备需要执行的异步任务
runningAsyncCalls:正在执行的异步任务
runningSyncCalls:正在执行的同步任务
dispatcher默认支持的最大并发请求是64个,单个host最多执行5个并发请求,如果超过的话,call会被先放到readyAsyncCalls中,当出现空闲线程的时候,再将readyAsyncCalls中的线程移入runningAsynCalls中,执行请求。
总结
okhttp它的执行流程
同步请求 1.首先通过建造者模式构建okhttpClient和request,即分别配置它们的属性
2.构建realCall对象
3.调用realCall的excute()方法
4.调用dispatcher的excute()方法将请求call添加到同步请求执行队列中,runningSyncCalls中
5.通过调用getResponseWithInterceptorChain()方法,在其中构建拦截器,重新定义一个realInterceptChain对象,通过这个对象调用chain.proceed()方法进行链式处理拦截器,即调用拦截器的intercept()方法
6.最后在CallServerInterceptor()中向服务端具体发起请求
7.最后调用dispatcher.finished(),把 call 实例从队列中移除,并执行下一次任务
异步请求 1.首先通过建造者模式构建okhttpClient和request,即分别配置它们的属性
2.构建realCall对象
3.生成一个AsyncCall(responseCallback),这个实例(实现了Runnable)
4.调用realCall的enquene()方法
5.AsyncCall实例放入了Dispatcher.enqueue()中,并判断 maxRequests (最大请求数 64)maxRequestsPerHost(最大host请求数 5)是否满足条件,如果满足就把AsyncCall添加到runningAsyncCalls中,并放入线程池中执行;如果条件不满足,就添加到等待就绪的异步队列,当那些满足的条件的执行时 ,在Dispatcher.finifshed(this)中的promoteCalls(); 方法中 对等待就绪的异步队列进行遍历,生成对应的AsyncCall实例,并添加到runningAsyncCalls中,最后放入到线程池中执行,就回到对应的run()方法中
6.最后也调用到了getResponseWithInterceptorChain()方法同同步请求中的一样。

    推荐阅读