okhttp3请求流程分析

1.简单的get异步请求

String url = "https://www.baidu.com/"; OkHttpClient okHttpClient = new OkHttpClient(); Request request = new Request.Builder().url(url).build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) {}@Override public void onResponse(Call call, Response response) throws IOException {Log.d(TAG, "onResponse: response="+response.toString()); } });

一:创建OkHttpClient对象
OkHttpClient okHttpClient = new OkHttpClient();

okhttp源码
public OkHttpClient() { this(new Builder()); }

创建的 OkHttpClient对象时并且默认构造builder对象进行初始化
二:初始化请求对象
Request request = new Request.Builder().url(url).build();

Request:每一次网络请求都是一个request,Request是对url,method,header,body的封装,也是对http协议中请求行、请求头和实体内容的封装。属于构建者模式。
三:发起请求
@Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); }

RealCall是Call接口的实现
final class RealCall implements Call { RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); // TODO(jwilson): this is unsafe publication and not threadsafe. this.eventListener = eventListenerFactory.create(this); } }

异步请求的执行流程
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }

由上可知:
1.检查这个 call 是否已经被执行了,每个 call 只能被执行一次
2.利用 client.dispatcher().enqueue(this) 来进行实际执行, client.dispatcher()如下
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { public Dispatcher dispatcher() { return dispatcher; } }

/* *维护请求状态 *维护线程池 */ public final class Dispatcher { private int maxRequests = 64; //线程最大并发数 private int maxRequestsPerHost = 5; //每个主机最大请求数 /** Ready async calls in the order they'll be run. 缓存等待的异步请求队列*/ private final Deque readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. 正在运行的异步请求队列*/ private final Deque runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. 正在运行的同步请求队列*/ private final Deque runningSyncCalls = new ArrayDeque<>(); //ArrayDeque是没有容量限制的/** Returns the number of running calls that share a host with {@code call}.正在调度请求每个主机的数量 */ private int runningCallsForHost(AsyncCall call) { int result = 0; for (AsyncCall c : runningAsyncCalls) { if (c.host().equals(call.host())) result++; } return result; } synchronized void enqueue(AsyncCall call) { //如果正在运行的异步任务的数量小于64并且正在调度请求每个主机的数量小于5 if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call)

AsyncCall是RealCall的一个内部类并且继承NamedRunnable
final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; }String host() { return originalRequest.url().host(); }Request request() { return originalRequest; }RealCall get() { return RealCall.this; }@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }

NamedRunnable类如下:
public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); }@Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } }protected abstract void execute(); }

由上可以看出NameRunnable 实现了Runnable接口并且是个抽象类,其抽象方法是execute(),其子类应该实现execute方法。
final class RealCall implements Call {final class AsyncCall extends NamedRunnable { //实现execute方法 @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } } }

由上可见,AsyncCall实现了execute方法,首先是调用getResponseWithInterceptorChain()方法获取响应,然后获取成功后,就调用回调的onReponse方法,如果失败,就调用回调的onFailure方法。最后,调用Dispatcher的finished方法。
getResponseWithInterceptorChain源码,构建拦截器
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

  • 创建一系列拦截器,并将其放入一个拦截器数组中。这部分拦截器即包括用户自定义的拦截器也包括框架内部拦截器
  • 创建一个拦截器链RealInterceptorChain,并执行拦截器链的proceed方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }// Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }// Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); }return response; }

一个拦截器的intercept方法所执行的逻辑大致分为三部分:
1.在发起请求前对request进行处理
2.调用下一个拦截器,获取response
3.对response进行处理,返回给上一个拦截器
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

可以看到当前拦截器的Response依赖于下一个拦截器的Intercept的Response。因此,就会沿着这条拦截器链依次调用每一个拦截器,当执行到最后一个拦截器之后,就会沿着相反的方向依次返回Response,最终得到我们需要的“终极版”Response。
RetryAndFollowUpInterceptor 在网络请求失败后进行重试
当服务器返回当前请求需要进行重定向时直接发起新的请求,并在条件允许情况下复用当前连接
BridgeInterceptor 设置内容长度,内容编码
设置gzip压缩,并在接收到内容后进行解压。省去了应用层处理数据解压的麻烦
添加cookie
设置其他报头,如User-Agent,Host,Keep-alive等。其中Keep-Alive是实现多路复用的必要步骤
CacheInterceptor 当网络请求有符合要求的Cache时直接返回Cache
当服务器返回内容有改变时更新当前cache
如果当前cache失效,删除
ConnectInterceptor 即为当前请求找到合适的连接,可能复用已有连接也可能是重新创建的连接,返回的连接由连接池负责决定
CallServerInterceptor 负责向服务器发起真正的访问请求,并在接收到服务器返回后读取响应返回
public final class Dispatcher { /** Ready async calls in the order they'll be run.准备将要执行的请求*/ private final Deque readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet.正在执行的异步请求,包含已经取消但未执行完的请求 */ private final Deque runningAsyncCalls = new ArrayDeque<>(); //client.dispatcher().enqueue(new AsyncCall(responseCallback))源码 //如果当前还能执行一个并发请求,则加入 runningAsyncCalls ,立即执行,否则加入 readyAsyncCalls 队列 synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }//client.dispatcher().finished(this)源码void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); } //将正在运行的任务Call从队列runningAsyncCalls中移除 private void finished(Deque calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; }if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } //构造了单例线程池 public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } }

线程池构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }

【okhttp3请求流程分析】corePoolSize:最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁
maximumPoolSize:最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
keepAliveTime:当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
unit:时间单位
workQueue:工作队列,先进先出
threadFactory:单个线程的工厂
由此可看出,在Okhttp中,构建了一个核心为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒。

    推荐阅读