OkHttp源码解析

OkHttp源码解析 一、OkHttp使用

//创建Client OkHttpClient client = new OkHttpClient(); Request.Builder requestBuilder = new Request.Builder().url("http://www.baidu.com").method("GET",null); Request request = requestBuilder.build(); //创建Call Call call = client.newCall(request); //异步请求 call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { //do samething } @Override public void onResponse(Call call, Response response) throws IOException { //do samething } }); //同步请求 try { Response r= call.execute(); if(r.isSuccessful()){ //do samething } } catch (IOException e) { e.printStackTrace(); }

二、调用流程 OkHttp源码解析
文章图片

1、创建OkHttpClient 这里通过Builder模式创建Client
public Builder newBuilder() { return new Builder(this); }

Builder(OkHttpClient okHttpClient) { this.dispatcher = okHttpClient.dispatcher; this.proxy = okHttpClient.proxy; this.protocols = okHttpClient.protocols; this.connectionSpecs = okHttpClient.connectionSpecs; this.interceptors.addAll(okHttpClient.interceptors); this.networkInterceptors.addAll(okHttpClient.networkInterceptors); this.proxySelector = okHttpClient.proxySelector; this.cookieJar = okHttpClient.cookieJar; this.internalCache = okHttpClient.internalCache; this.cache = okHttpClient.cache; this.socketFactory = okHttpClient.socketFactory; this.sslSocketFactory = okHttpClient.sslSocketFactory; this.certificateChainCleaner = okHttpClient.certificateChainCleaner; this.hostnameVerifier = okHttpClient.hostnameVerifier; this.certificatePinner = okHttpClient.certificatePinner; this.proxyAuthenticator = okHttpClient.proxyAuthenticator; this.authenticator = okHttpClient.authenticator; this.connectionPool = okHttpClient.connectionPool; this.dns = okHttpClient.dns; this.followSslRedirects = okHttpClient.followSslRedirects; this.followRedirects = okHttpClient.followRedirects; this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure; this.connectTimeout = okHttpClient.connectTimeout; this.readTimeout = okHttpClient.readTimeout; this.writeTimeout = okHttpClient.writeTimeout; this.pingInterval = okHttpClient.pingInterval; }

2、创建Call 从使用可以看出:Call的创建是通过OkHttpClient中的newCall方法。
@Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); }

Call是一个接口,这里的实现类是RealCall:
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); }

3、同步或者异步请求
  • 同步请求
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }

同步请求,很直接就调用到了最核心的函数getResponseWithInterceptorChain()。再看下异步请求。
  • 异步请求
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }

异步请求,将用户传入responseCallback对象封装成一个AsyncCall对象提交给Dispather来处理,这里的AsyncCall是RealCall的一个内部类。再看下这个Dispather怎么处理这个AsyncCall的。
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }

Dispather中管理了一些请求队列,如果运行中异步请求队列未满则加入该队列,并提交到线程池。否则,加入等待队列。
这里的AsyncCall其实就是Runnable的子类,所以直接能把AsyncCall的对象给了线程池。
//RealCall中内部类 final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; }String host() { return originalRequest.url().host(); }Request request() { return originalRequest; }RealCall get() { return RealCall.this; }@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } } //NamedRunnable.java public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); }

AsyncCall父类的run()方法会调用抽象方法execute(),也就是将在Dispather里的线程池执行AsyncCall对象的时候,就会执行到execute(),在这个方法里同样调用了核心的网络请求方法getResponseWithInterceptorChain()。
而且在execute()里会回调用户接口responseCallback的回调方法。注意:这里的回调是在非主线程直接回调的,也就是在Android里使用的话要注意这里面不能直接更新UI操作。
所以,同步请求和异步请求最终都是调用的getResponseWithInterceptorChain(); 来发送网络请求,只是异步请求涉及到一些线程池操作,包括请求的队列管理、调度。
4、调用getResponseWithInterceptorChain()方法
//RealCall.java Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

这个方法中是添加了一些拦截器,然后启动一个拦截器调用链,拦截器递归调用之后最后返回请求的响应Response。这里的拦截器分层的思想就是借鉴的网络里的分层模型的思想。请求从最上面一层到最下一层,响应从最下一层到最上一层,每一层只负责自己的任务,对请求或响应做自己负责的那块的修改。
//RealInterceptorChain.java public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, Connection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !sameConnection(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }// Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }// Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); }return response; }

三、拦截器 1、分层结构 【OkHttp源码解析】OkHttp源码解析
文章图片

RealInterceptorChain的proceed(),每次重新创建一个RealInterceptorChain对象,然后调用下一层的拦截器的interceptor.intercept()方法。
每一个拦截器的intercept()方法都是这样的结构:
@Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); // 1、该拦截器在Request阶段负责的事情// 2、调用RealInterceptorChain.proceed(),其实是递归调用下一层拦截器的intercept方法 response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); //3、该拦截器在Response阶段负责的事情,然后返回到上一层拦截器的 response阶段 returnresponse; } }

OkHttp中最底层为CallServerInterceptor,OkHttp还支持自定义拦截器。
2、几个拦截器
  • BridgeInterceptor
@Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); //Request阶段 RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); }long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } }if (userRequest.header("Host") == null) { requestBuilder.header("Host", hostHeader(userRequest.url(), false)); }if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); }// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); }List cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); }if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); }Response networkResponse = chain.proceed(requestBuilder.build()); //Response阶段 HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody))); }return responseBuilder.build(); }

BridgeInterceptor拦截器再Request阶段,就是配置相关信息,重新build Request对象,添加请求头。在Response阶段做gzip解压。
  • CacheInterceptor
@Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; if (cache != null) { cache.trackResponse(strategy); }if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. }// If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); }// If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); }Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }// If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } }Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (HttpHeaders.hasBody(response)) { CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache); response = cacheWritingResponse(cacheRequest, response); }return response; }

CacheInterceptor拦截器在Request中检查是否该请求有缓存,是否要重新请求,如果不需要,则使用缓存,不调用下一层。Response阶段则对下一层的Response做缓存。
  • ConnectInterceptor
    @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); }

    ConnectInterceptor拦截器只在Request阶段建立连接,Response阶段直接把下一层的Response返回给上一层。再看下建立连接的过程。
    //StreamAllocation.java public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) { int connectTimeout = client.connectTimeoutMillis(); int readTimeout = client.readTimeoutMillis(); int writeTimeout = client.writeTimeoutMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); //查找健康网络连接 try { RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks); HttpCodec resultCodec = resultConnection.newCodec(client, this); synchronized (connectionPool) { codec = resultCodec; return resultCodec; } } catch (IOException e) { throw new RouteException(e); } }private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); // If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; } return candidate; } }private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) throws IOException { Route selectedRoute; synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // Attempt to use an already-allocated connection. RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) { return allocatedConnection; }// Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this); if (connection != null) { return connection; }selectedRoute = route; }// If we need a route, make one. This is a blocking operation. if (selectedRoute == null) { selectedRoute = routeSelector.next(); }// Create a connection and assign it to this allocation immediately. This makes it possible for // an asynchronous cancel() to interrupt the handshake we're about to do. RealConnection result; synchronized (connectionPool) { route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result); if (canceled) throw new IOException("Canceled"); }// Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { // Pool the connection. Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); return result; }

    //ConnectionPool.java public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); }

    这里基本就是从连接池里去找已有的网络连接,如果有,则复用,减少三次握手;没有的话,则创建一个RealConnection对象,三次握手,建立连接,然后将连接放到连接池。具体的内部connect过程,就不深入了。ConnecctonPool中最多支持保持5个地址的连接keep-alive,每个keep-alive 5分钟,并有异步线程循环清理无效的连接。

    推荐阅读