OkHttp 源码阅读 一 老生常谈,项目中用到最多的网络请求框架应该是ok了吧,那么对于ok的内部实现流程也要有充分的了解才行,这样才方便解决一些遇到的问题。其实在网上,随便一搜okhttp的原理,会有很多文章,但是看过后,要么太简短要么太复杂,有的看起来甚至是一头雾水,这样虽然自己看过,但是真正记到脑中的知识海了吗?所以想试试以自己的想法,自己的思维来理解一遍实现方式,来理解一下为什么短短的几行调用方式就可以实现一个网络请求。就写一下笔记记录下自己的理解。
要理解一个东西,知道它是什么----->知道它怎么用----->它是怎么实现的
它是什么 【网络请求|OkHttp源码分析 (一)】okhttp的GitHub地址
框架介绍
HTTP是现代应用程序网络的方式。这就是我们交换数据和媒体的方式。有效地执行HTTP可以使您的内容加载更快,并节省带宽。
OkHttp是默认情况下有效的HTTP客户端:
优点
- HTTP / 2支持允许对同一主机的所有请求共享一个套接字。
- 连接池可减少请求延迟(如果HTTP / 2不可用)。
- 透明的GZIP缩小了下载大小。
- 响应缓存可以完全避免网络重复请求。
它怎么用 请求方式同步请求和异步请求
同步请求
private fun okRequest1() {//创建client实例(两种方式,一种是默认,一种建造者模式构建)
val client = OkHttpClient()
//val client = OkHttpClient.Builder()
//.connectTimeout(30,TimeUnit.SECONDS)
//.build();
//创建请求信息
val request = Request.Builder()
.url("https://www.wanandroid.com/friend/json")
.build()
try {
//新开一个子线程,网络请求必须放在子线程中请求
GlobalScope.launch {
//发起请求,并接收返回信息
val response = client.newCall(request).execute()
println(TAG + response.body!!.string())
}
} catch (e: IOException) {
e.printStackTrace()
}}
2020-07-29 09:50:22.285 20476-20526/com.example.yuanma I/System.out: Kr----->{"data":[{"icon":"","id":17,"link":"http://www.wanandroid.com/article/list/0?cid=176","name":"国内大牛博客集合","order":1,"visible":1}}
异步请求
private fun okRequest2() {
//创建client实例(两种方式,一种是默认,一种建造者模式构建)
val client = OkHttpClient()
//val client = OkHttpClient.Builder()
//.connectTimeout(30,TimeUnit.SECONDS)
//.build();
//创建请求信息
val request = Request.Builder()
.url("https://www.wanandroid.com/article/list/0/json")
.build()
try {
//新开一个子线程,网络请求必须放在子线程中请求
GlobalScope.launch {
//发起请求,并接收返回信息
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
println(TAG + "onFailure : " + e.message)
}override fun onResponse(call: Call, response: Response) {
println(TAG + "onResponse : " + response.body!!.string())
}
})
}
} catch (e: IOException) {
e.printStackTrace()
}}
2020-07-29 10:12:53.527 21141-21199/com.example.yuanma I/System.out: Kr----->onResponse : {"data":{"curPage":1,"datas":[{"apkLink":"","audit":1,"author":"","canEdit":false,"chapterId":502,"chapterName":"自助","collect":false,"courseId":13,"desc":"","descMd":"","envelopePic":"","fresh":true,"id":14538,"link":"https://mp.weixin.qq.com/s/uiVYTTFZhYPsC2ykjQ6Tfw","niceDate":"48分钟前","niceShareDate":"48分钟前","origin":"","prefix":"","projectLink":"","publishTime":1595985848000,"realSuperChapterId":493,"selfVisible":0,"shareDate":1595985848000,"shareUser":"飞洋","superChapterId":494,"superChapterName":"广场Tab","tags":[],"title":"你想要的Android性能优化系列:内存优化!","type":0,"userId":31360,"visible":1,"zan":0}}
ok使用方式就先到这,下面看内部实现
okhttp内部实现 通过上面两种请求方式的比对,发现除了最后请求方式,上面的流程都是一样,都是先创建一个client实例,然后构建request实例,然后构建newCall 实例,然后通过call对象进行调用不同的方法,同步请求调用execute()方法,异步请求调用enqueue()方法。
okhttpClient实例的创建 那么就从第一步client实例构建开始看,我们进去这个类,不知道有没有看注释的习惯,建议看类的源码,首先从类注释看,这样我们可以得到一些有用的信息。
文章图片
通过看这段注释,咦,是不是发现了什么关键字,对,就是每个okhttpClient都维护了自己的连接池和线程池,关键字 连接池和线程池,那么它们有什么用呢,下一句话讲了—> 可重用连接和线程可以减少等待时间和节省内存。这个先放着。
我们先看okhttpClient的构建,噢,典型的建造者模式
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()//调度器
internal var connectionPool: ConnectionPool = ConnectionPool()//连接池
internal val interceptors: MutableList = mutableListOf()//拦截器
internal val networkInterceptors: MutableList = mutableListOf()//网络拦截器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null//socket工厂
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List = DEFAULT_CONNECTION_SPECS //连接配置
internal var protocols: List = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = nullinternal constructor(okHttpClient: OkHttpClient) : this() {
this.dispatcher = okHttpClient.dispatcher
this.connectionPool = okHttpClient.connectionPool
this.interceptors += okHttpClient.interceptors
this.networkInterceptors += okHttpClient.networkInterceptors
this.eventListenerFactory = okHttpClient.eventListenerFactory
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
this.authenticator = okHttpClient.authenticator
this.followRedirects = okHttpClient.followRedirects
this.followSslRedirects = okHttpClient.followSslRedirects
this.cookieJar = okHttpClient.cookieJar
this.cache = okHttpClient.cache
this.dns = okHttpClient.dns
this.proxy = okHttpClient.proxy
this.proxySelector = okHttpClient.proxySelector
this.proxyAuthenticator = okHttpClient.proxyAuthenticator
this.socketFactory = okHttpClient.socketFactory
this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
this.x509TrustManagerOrNull = okHttpClient.x509TrustManager
this.connectionSpecs = okHttpClient.connectionSpecs
this.protocols = okHttpClient.protocols
this.hostnameVerifier = okHttpClient.hostnameVerifier
this.certificatePinner = okHttpClient.certificatePinner
this.certificateChainCleaner = okHttpClient.certificateChainCleaner
this.callTimeout = okHttpClient.callTimeoutMillis
this.connectTimeout = okHttpClient.connectTimeoutMillis
this.readTimeout = okHttpClient.readTimeoutMillis
this.writeTimeout = okHttpClient.writeTimeoutMillis
this.pingInterval = okHttpClient.pingIntervalMillis
this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress
this.routeDatabase = okHttpClient.routeDatabase
}
调用OkhttpClient.Builder().build()
Requst的实例的创建 也是建造者模式的构建,默认GET请求
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap, Any> = mutableMapOf()constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}internal constructor(request: Request) {
this.url = request.url
this.method = request.method
this.body = request.body
this.tags = if (request.tags.isEmpty()) {
mutableMapOf()
} else {
request.tags.toMutableMap()
}
this.headers = request.headers.newBuilder()
}open fun url(url: HttpUrl): Builder = apply {
this.url = url
}
总结一下:用构建者模式创建client和request的实例,里面封装了一些使用中必要的属性。
下面看call对象的构建
val call=client.newCall(request)/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
点进去看到,这里call对象是由实现类RealCall定义的
文章图片
下面看同步请求的excute()方法
override fun execute(): Response {
//检查请求是否已执行,如果已执行,则抛出异常。反之,将executed设置为true。
check(executed.compareAndSet(false, true)) { "Already Executed" }
//设置超时时间开始计时
timeout.enter()
callStart()
try {
//在这步中将请求添加到同步请求队列中
client.dispatcher.executed(this)
//调用拦截器,在这里才是真正的请求触发,请求发起获取响应都在这。
return getResponseWithInterceptorChain()
} finally {
//不管请求成功还是异常,都关闭当前请求任务
client.dispatcher.finished(this)
}
}
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
//给这个回调,请求开始
eventListener.callStart(this)
}
下面进到dispatcher.excuted()中看下
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque() /** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
//将当前请求任务添加到请求队列中。
runningSyncCalls.add(call)
}
这里先暂时知道是在dispatcher中将请求任务添加到请求队列中就行,dispatcher 后面单独介绍。
下面调用getResponseWithInterceptorChain(),首先通过字面意思,我们可以知道这个方法意思是通过拦截器链获取响应。
进到getResponseWithInterceptorChain()中看下
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.构建一个拦截器堆栈,定义,添加,添加添加……注意这里是有序的,顺序不能乱
val interceptors = mutableListOf()
//client.interceptors就是我们定义okhttpClient时候调用的addIntercepter
interceptors += client.interceptors
//okhttp自己实现的拦截器
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket) //然后构造一个RealInterceptorChain对象
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)var calledNoMoreExchanges = false
try {
//然后最后返回的response是在这里调用后返回
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
看下chain.proceed()方法
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)calls++if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response
}
这个方法里会构造一个index=index+1的RealInterceptorChain next,并调用interceptors[index].intercept(next),Interceptor中又会调用RealInterceptorChain.proceed()(除了最后一个interceptor,最后一个interceptor是CallServerInterceptor,后面单独讲解),这样一直链式调用,直到调用到最后一个interceptor的intercept()方法。。
这个就是典型的 责任链设计模式
因为最后一个是callServerInterceptor,所以这个就是最后发起请求接收响应的最后一步调用,等会再看。
下面看异步请求的enqueue()方法
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
和同步一样,都是先做个是否当前有请求的判断。
然后调用dispatcher的enqueue方法,首先看下 client.dispatcher.enqueue(AsyncCall(responseCallback))这里,异步方法调用中传入的是一个AsycCall,看下实现
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private setfun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}val host: String
get() = originalRequest.url.hostval request: Request
get() = originalRequestval call: RealCall
get() = this@RealCall/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}internal class CallReference(
referent: RealCall,
/**
* Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
* identifying the origin of connection leaks.
*/
val callStackTrace: Any?
) : WeakReference(referent)
}
看玩这个之后呢,再返回,我们进到enqueue方法中看下
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//首先将要执行的异步任务加入到队列中
readyAsyncCalls.add(call)// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
做个判断然后调用promoteAndExecute()方法,看下这个方法
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//判断当前请求数是不是大于最大请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//判断当前连接主机数是不是超过最大值 5个
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从当前需要准备请求的队列中移除当前请求
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
//将当前需要准备执行的异步任务加入到正在运行异步任务的队列中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}//遍历当前需要执行的异步任务,拿到异步任务执行。
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}return isRunning
}
这块呢,就是从队列中取请求,判断当前请求数是不是大于最大请求数 64,//判断当前连接主机数是不是超过最大值 5个,//从当前需要准备请求的队列中移除当前请求,//将当前需要准备执行的异步任务加入到正在运行异步任务的队列中。最后执行的步骤就是asyncCall.executeOn(executorService)
executorService 这个东西就是一个线程池
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
核心线程数量为 0 ,如果空闲的话是 60 s 的存活期,第二个参数传入了 Integer 的最大值,即线程池所能容纳的最大线程数为 Integer.MAX_VALUE ,虽然这里设置了很大的值,但是实际情况下并非会达到最大值,因为上面 enqueue() 方法中有做了判断。
那么它既然是放在子线程中处理的,那么就找下run()方法,诶,还真有,看下
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
//计时
timeout.enter()
try {
//最后又调用了getResponseWithInterceptorChain()
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
开启计时,然后通过回调把响应数据返回给调用层,最后 finished,判断是否有需要执行的任务,然后继续删除准备执行的异步任务,开始执行删除的异步任务。
到了这里 同步执行、异步执行 最后都调用getResponseWithInterceptorChain()函数
getResponseWithInterceptorChain()可以看下上面的同步。
由上面可以知道最后调用的拦截器都是CallServerInterceptor,看下
//首先通过注释我们就可以看到,这是拦截器的链式调用的最后一个,它负责向服务端发起一个网络请求
/** This is the last interceptor in the chain. It makes a network call to the server. */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}exchange.responseHeadersEnd(response)response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
}
在当前拦截器中我们把请求 head /body 通过 okio 写入了服务端,然后根据服务端的响应数据构建响应头、响应体等一些响应数据。然后再return response , 就回到了realCall的excute()方法那里。这里,有兴趣的可以搜一下深入了解一下连接过程,我看的脑袋疼。
最后,看一下Dispatcher
Dispatcher
/**
* Policy on when async requests are executed.
*
* Each dispatcher uses an [ExecutorService] to run calls internally. If you supply your own
* executor, it should be able to run [the configured maximum][maxRequests] number of calls
* concurrently.
*/
class Dispatcher constructor() {
/**
* The maximum number of requests to execute concurrently. Above this requests queue in memory,
* waiting for the running calls to complete.
*
* If more than [maxRequests] requests are in flight when this is invoked, those requests will
* remain in flight.
*/
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}/**
* The maximum number of requests for each host to execute concurrently. This limits requests by
* the URL's host name. Note that concurrent requests to a single IP address may still exceed this
* limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy.
*
* If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests
* will remain in flight.
*
* WebSocket connections to hosts **do not** count against this limit.
*/
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}/**
* A callback to be invoked each time the dispatcher becomes idle (when the number of running
* calls returns to zero).
*
* Note: The time at which a [call][Call] is considered idle is different depending on whether it
* was run [asynchronously][Call.enqueue] or [synchronously][Call.execute]. Asynchronous calls
* become idle after the [onResponse][Callback.onResponse] or [onFailure][Callback.onFailure]
* callback has returned. Synchronous calls become idle once [execute()][Call.execute] returns.
* This means that if you are doing synchronous calls the network layer will not truly be idle
* until every returned [Response] has been closed.
*/
@set:Synchronized
@get:Synchronized
var idleCallback: Runnable? = nullprivate var executorServiceOrNull: ExecutorService? = null@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque()/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque()/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque()constructor(executorService: ExecutorService) : this() {
this.executorServiceOrNull = executorService
}
省略
dispatcher这里面,先看类注释
每个调度使用的ExecutorService的在内部运行的call。 若你提供你自己的线程池,它应该能够运行配置的最大并发呼叫数量
看dispatcher的源码可知,dispatcher就相当于一个中转点,这里面维护了三个ArrayDeque,用来保存请求
readyAsyncCalls:准备需要执行的异步任务
runningAsyncCalls:正在执行的异步任务
runningSyncCalls:正在执行的同步任务
dispatcher默认支持的最大并发请求是64个,单个host最多执行5个并发请求,如果超过的话,call会被先放到readyAsyncCalls中,当出现空闲线程的时候,再将readyAsyncCalls中的线程移入runningAsynCalls中,执行请求。
总结
okhttp它的执行流程
同步请求 1.首先通过建造者模式构建okhttpClient和request,即分别配置它们的属性
2.构建realCall对象
3.调用realCall的excute()方法
4.调用dispatcher的excute()方法将请求call添加到同步请求执行队列中,runningSyncCalls中
5.通过调用getResponseWithInterceptorChain()方法,在其中构建拦截器,重新定义一个realInterceptChain对象,通过这个对象调用chain.proceed()方法进行链式处理拦截器,即调用拦截器的intercept()方法
6.最后在CallServerInterceptor()中向服务端具体发起请求
7.最后调用dispatcher.finished(),把 call 实例从队列中移除,并执行下一次任务
异步请求 1.首先通过建造者模式构建okhttpClient和request,即分别配置它们的属性
2.构建realCall对象
3.生成一个AsyncCall(responseCallback),这个实例(实现了Runnable)
4.调用realCall的enquene()方法
5.AsyncCall实例放入了Dispatcher.enqueue()中,并判断 maxRequests (最大请求数 64)maxRequestsPerHost(最大host请求数 5)是否满足条件,如果满足就把AsyncCall添加到runningAsyncCalls中,并放入线程池中执行;如果条件不满足,就添加到等待就绪的异步队列,当那些满足的条件的执行时 ,在Dispatcher.finifshed(this)中的promoteCalls(); 方法中 对等待就绪的异步队列进行遍历,生成对应的AsyncCall实例,并添加到runningAsyncCalls中,最后放入到线程池中执行,就回到对应的run()方法中
6.最后也调用到了getResponseWithInterceptorChain()方法同同步请求中的一样。
推荐阅读
- okhttp|浅析OkHttp使用-请求分发流程
- Android源码|okhttp源码分析(一)——基本流程(超详细)
- Android笔记|Retrofit2.5是如何解析在接口类中定义的方法()
- 【android|Android Https相关完全解析 当OkHttp遇到Https
- 【android|Android OkHttp完全解析 是时候来了解OkHttp了
- OKHTTP系列(七)--拦截器之ConnectInterceptor连接拦截器
- OkHttp拦截器 (四) ConnectInterceptor拦截器分析
- OkHttp|【Android】OkHttp系列(五)(连接拦截器ConnectInterceptor)
- 上网/游戏/看剧太慢了(瞧瞧是不是运营商干的)