OkHttp简析

基于OkHttp 3.13版本进行分析

OkHttp简析

OkHttp定义

OkHttp现如今已成为主流的网络请求框架,连Android源码中都引入其作为基础网络库,可知它的重要性。

所以只会简单的使用是远远不够的,更要深入了解其原理,知道它的设计概念,这才是最重要的。

OkHttp支持SPDY协议,可以合并多个到同一个主机的请求,分享同一个Socket。如果SPDY不可用,会使用连接池的技术减少请求的延迟。

SPDY协议:Google提出的基于TCP的应用层协议,通过压缩、多路复用、优先级来缩短加载时间。

OkHttp使用示例

构造请求

GET请求

1
2
3
Request request = new Request.Builder()
.url(url)
.build();

POST请求

1
2
3
4
5
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();

同步请求

1
2
3
OkHttpClient client = new OkHttpClient();
Response response = client.newCall(request).execute();
return response.body().string();

异步请求

1
2
3
4
5
6
7
8
9
10
11
OkHttpClient client = new OkHttpClient();

client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}

@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
}
});

OkHttp源码分析

构造OkHttpClient对象

构造OkHttpClient对象

需要先创建一个OkHttpClient用以执行后续请求。内部主要是相关参数配置。

主要功能:通信的客户端,用以统一发起请求与解析返回值。

OkHttpClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public OkHttpClient() {
this(new Builder());
}

OkHttpClient(Builder builder) {
//用于调用网络请求 本质为 线程池
this.dispatcher = builder.dispatcher;
//设置代理
this.proxy = builder.proxy;
//设置协议
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
//设置拦截器
this.interceptors = Util.immutableList(builder.interceptors);
//设置网络拦截器
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
//回调监听
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
//Cookie
this.cookieJar = builder.cookieJar;
//缓存
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;

boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}

//用于Https请求
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}

if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}

this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
//是否需要重试
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
//链接超时时长
this.connectTimeout = builder.connectTimeout;
//读取超时时间
this.readTimeout = builder.readTimeout;
//写入超时时间
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;

if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}

OkHttpClient是应用建造者模式,通过OkHttpClient.Builder来构造一个OkHttpClient对象,支持数十种参数配置。

构造Request请求对象

OkHttp-构造Request请求对象

创建一个Request对象用以包括请求的所有信息,内部包含了请求地址,请求头,请求内容

主要功能:封装请求的具体信息。

Request.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class Request {
Request(Builder builder) {
//请求地址
this.url = builder.url;
//请求方法 例如:GET、POST、PUT、DELETE等
this.method = builder.method;
//请求头信息
this.headers = builder.headers.build();
//请求内容构造体
this.body = builder.body;
//请求的标签 用于后续对指定标签可进行特殊处理
this.tags = Util.immutableMap(builder.tags);
}

public static class Builder {
@Nullable HttpUrl url;
String method;
Headers.Builder headers;
@Nullable RequestBody body;

/** A mutable map of tags, or an immutable empty map if we don't have any. */
Map<Class<?>, Object> tags = Collections.emptyMap();

//默认是GET方法,不带有请求体
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}

Builder(Request request) {
this.url = request.url;
this.method = request.method;
this.body = request.body;
this.tags = request.tags.isEmpty()
? Collections.emptyMap()
: new LinkedHashMap<>(request.tags);
this.headers = request.headers.newBuilder();
}
...
}
}

Request使用的也是建造者模式,通过Request.Builder去构造对应Request

请求体RequestBody

主要功能:用以提交流、表单等请求信息

FormBody

支持提交键值对类型。例如userId : 1

使用方法

1
2
3
4
5
6
FormBody.Builder formBodyBuilder = new FormBody.Builder();
Map<String, Object> map = new ConcurrentHashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
formBodyBuilder.add(entry.getKey(), entry.getValue().toString());
}
RequestBody body = formBodyBuilder.build();
MultipartBody

除了支持键值对,还有提交文件功能。

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
MultipartBody.Builder multipartBuilder = new MultipartBody.Builder().setType(MultipartBody.FORM);
Map<String, Object> map = new ConcurrentHashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
multipartBuilder.addFormDataPart(entry.getKey(), entry.getValue().toString());
}
//可以针对文件新起一个 参数来进行传递
for (Map.Entry<String, File> entry : message.getFiles().entrySet()) {
File f = entry.getValue();
if (f == null)
continue;
String name = f.getName();
String ext = name.substring(name.lastIndexOf('.'));
String imageFormat = "jpg";
if (".png".equalsIgnoreCase(ext)) {
imageFormat = "png";
}
multipartBuilder.addFormDataPart(
entry.getKey(),
entry.getValue().getName(),
RequestBody.create(MediaType.parse("image/" + imageFormat), entry.getValue())
);
}

RequestBody body = multipartBuilder.build();

发送Request请求

OkHttp-发送Request请求

通过OkHttpClient.newCall()发送出Request请求

OkHttpClient.java
1
2
3
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

返回了一个Call对象,实现类为RealCall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface Call extends Cloneable {
//获取当前请求
Request request();
//执行当前请求 并返回结果
Response execute() throws IOException;
//异步请求
void enqueue(Callback responseCallback);
//取消请求
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
void cancel();
//当前请求是否正在执行
boolean isExecuted();
//请求是否已取消
boolean isCanceled();
//超时返回
Timeout timeout();
//克隆请求 用于重新调用
Call clone();

interface Factory {
Call newCall(Request request);
}
}
RealCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final class RealCall implements Call {
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//前面先行创建的 OkHttpClient
this.client = client;
//创建的请求对象
this.originalRequest = originalRequest;
//用于建立长连接
this.forWebSocket = forWebSocket;
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
//为这次请求设置了事件监听器,包括请求开始、结束、异常等监听
call.transmitter = new Transmitter(client, call);
return call;
}
}

通过newCall()根据传递进来的Request创建一个RealCall实例去发送请求。

同步请求——execute()

直接执行并返回请求结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
//请求开始
transmitter.callStart();
try {
//加入 runningSuncCalls 队列中
client.dispatcher().executed(this);
//返回响应结果
return getResponseWithInterceptorChain();
} finally {
//从队列中移除 避免重复执行
client.dispatcher().finished(this);
}
}

执行execute()时,监听到请求事件开始,就会加入到Dispatcher.runningSyncCalls中,里面记录的是当前正在进行同步请求的call,然后当call完成时或因异常结束时,再从Dispatcher.runningSyncCalls移除。

异步请求——enqueue(Callback callback)

构造一个异步执行队列,然后把请求加入队列中处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  @Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}

AtomicInteger callsPerHost() {
return callsPerHost;
}
...

void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
//回调请求结果
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
//回调失败并返回异常
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//从队列中移除
client.dispatcher().finished(this);
}
}
}

enqueue()调用到Dispatcher.enqueue()传入的是一个AsyncCall对象,AsyncCall本质是一个Runnable对象,通过Dispatcher中的ExecutorService来执行AsyncCall

执行Request请求

execute()enqueue()发送请求时,最后都是需要有Dispatch去执行请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;

private @Nullable ExecutorService executorService;
//正在准备执行的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步请求队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

//用以执行异步请求
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

void enqueue(AsyncCall call) {
synchronized (this) {
//加入正在执行的异步队列中
readyAsyncCalls.add(call);

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}

//加入正在执行的同步队列
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
//从政在执行的异步队列中移除
void finished(AsyncCall call) {
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
//执行完毕后 从正在执行的同步队列中移除
void finished(RealCall call) {
finished(runningSyncCalls, call);
}

private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}

private boolean promoteAndExecute() {
//判定当前线程是否持有锁
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//正在运行的异步请求不能超过 64个
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//在同一个Host下的异步请求不能超过5个
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

i.remove();
//CAS
asyncCall.callsPerHost().incrementAndGet();
//添加至异步执行队列
executableCalls.add(asyncCall);
//添加至正在执行异步请求队列
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//执行异步请求
asyncCall.executeOn(executorService());
}

return isRunning;
}
}

Dispatcher是一个任务调度器,内部维护了三个双端队列:

  • readyAsyncCalls:准备执行的异步请求。已经超过请求上限的异步请求就会放在该队列中。
  • runningAsyncCalls:正在执行的异步请求。不超过请求上限时,异步请求会加入到该队列中,超过时,依然放到readyAsyncCalls中。
  • runningSyncCalls:正在执行的同步请求。直接把同步请求添加到该队列中。

通过Dispatcher中的executorService去执行对应请求。

处理Request请求——通过拦截器

OkHttp-处理Request请求

通过Dispatcher执行完请求后,返回回调结果前,需要通过getResponseWithInterceptorChain()通过层层责任链的执行来获得最终的请求结果。

通过责任链模式将请求一层层的通过拦截器进行处理。

RealCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//加入用户自定义的拦截器
interceptors.addAll(client.interceptors());
//重试和重定向拦截器
interceptors.add(new RetryAndFollowUpInterceptor(client));
//转化用户请求为网络请求
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//负责读取缓存以及更新缓存
interceptors.add(new CacheInterceptor(client.internalCache()));
//与服务器建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//用户自定义的网络拦截器
interceptors.addAll(client.networkInterceptors());
}
//从服务器读取响应的数据
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

boolean calledNoMoreExchanges = false;
try {
//链式调用拦截器,最终返回 Response
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}

在获得响应结果之前,需要对用户设置的原始请求转换为实际的网络请求,然后通过一系列拦截器,直到最终得到结果,采用链式调用保证这些拦截器的执行顺序。

OkHttp拦截器

所有的拦截器都实现了Interceptor接口,支持用户去自定义拦截器,只要实现Interceptor接口即可。

拦截器可以 用来监控、改写和重试HTTP访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface Interceptor {
//主要实现该接口,控制返回结果
Response intercept(Chain chain) throws IOException;

interface Chain {

Request request();

Response proceed(Request request) throws IOException;

//返回Request执行后的返回结果
@Nullable Connection connection();

Call call();

int connectTimeoutMillis();

Chain withConnectTimeout(int timeout, TimeUnit unit);

int readTimeoutMillis();

Chain withReadTimeout(int timeout, TimeUnit unit);

int writeTimeoutMillis();

Chain withWriteTimeout(int timeout, TimeUnit unit);
}
}
ApplicationInterceptor

OkHttp-ApplicationInterceptor

该拦截器会被第一个执行,此处得到的Request为最原始状态。但是最终得到的Response是最终的结果。

引用代码

1
2
3
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new LoggingInterceptor())
.build();

ApplicationInterceptor适用于在请求前统一添加一些公共参数,例如App的版本号,系统信息等。

也可用于对返回的Response进行加工。

ApplicationInterceptor有以下特定:

  • 不需要关心后续拦截器进行的操作,因为是会被第一个执行的,只要关心返回结果即可。
  • 只会被响应一次,即使强制缓存获取
  • 可以对后续的拦截器调用进行拦截或者进行多次调用——通过Chain.proceed()进行控制
RetryAndFollowUpInterceptor

OkHttp-RetryAndFollowUpInterceptor

负责失败重试和重定向的拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public final class RetryAndFollowUpInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//从自定义拦截器 那里传递下来的请求
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//获取事件监听器
Transmitter transmitter = realChain.transmitter();
//初始化 重定向次数
int followUpCount = 0;
Response priorResponse = null;
//开启死循环 进行重试操作
while (true) {
transmitter.prepareToConnect(request);
//请求取消
if (transmitter.isCanceled()) {
throw new IOException("Canceled");
}

Response response;
boolean success = false;
try {
//向下调用 下一个拦截器——BridgeInterceptor
response = realChain.proceed(request, transmitter, null);
success = true;
} catch (RouteException e) {
// 不需要重试 则抛出异常
if (!recover(e.getLastConnectException(), transmitter, false, request)) {
throw e.getFirstConnectException();
}
continue;
} catch (IOException e) {
// 无法与服务端建立连接
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
} finally {
// The network call threw an exception. Release any resources.
if (!success) {
//释放资源
transmitter.exchangeDoneDueToException();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
//根据返回的 response的Code 判断是否需要进行重定向
Request followUp = followUpRequest(response, route);

if (followUp == null) {
//释放资源
if (exchange != null && exchange.isDuplex()) {
transmitter.timeoutEarlyExit();
}
return response;
}

RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot()) {
return response;
}

closeQuietly(response.body());
if (transmitter.hasExchange()) {
exchange.detachWithViolence();
}

//超出重定向次数
if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
//获取重定向结果 赋予 request继续向下请求
request = followUp;
priorResponse = response;
}
}

private boolean recover(IOException e, Transmitter transmitter,
boolean requestSendStarted, Request userRequest)
{
// 未开启重试 retryOnConnectionFailure(false)
if (!client.retryOnConnectionFailure()) return false;
// 只允许发送一次 isOneShot(){return true;}
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false;
// 发生异常
if (!isRecoverable(e, requestSendStarted)) return false;
// 设置不允许重试
if (!transmitter.canRetry()) return false;
return true;
}
}
  1. 尝试执行下一个拦截器,即BridgeInterceptor
  2. 抛出异常,需要根据以下情况去判断是否需要重试:
    • 客户端是否开启 retryOnConnectionFailure
    • RequestBody.isOneShot()返回值
    • 判断异常类型,除了ConnectionShutdownException被中断情况外的IOException的子类,都不会进行重试
  3. 根据Response返回的响应码code进行处理
RetryAndFollowUpInterceptor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
private Request followUpRequest(Response userResponse, @Nullable Route route) throws IOException {
if (userResponse == null) throw new IllegalStateException();
int responseCode = userResponse.code();

final String method = userResponse.request().method();
switch (responseCode) {
// 407 需要进行代理认证
case HTTP_PROXY_AUTH:
Proxy selectedProxy = route != null
? route.proxy()
: client.proxy();
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
return client.proxyAuthenticator().authenticate(route, userResponse);
// 401 未经认证
case HTTP_UNAUTHORIZED:
return client.authenticator().authenticate(route, userResponse);
// 301 永久重定向 302 临时重定向 只有GET、HEAD请求方法才有效
case HTTP_PERM_REDIRECT:
case HTTP_TEMP_REDIRECT:
// "If the 307 or 308 status code is received in response to a request other than GET
// or HEAD, the user agent MUST NOT automatically redirect the request"
if (!method.equals("GET") && !method.equals("HEAD")) {
return null;
}
// fall-through

case HTTP_MULT_CHOICE:// 300 多个重定向地址
case HTTP_MOVED_PERM:// 301 永久移除 指向了新的位置
case HTTP_MOVED_TEMP://302 临时移除
case HTTP_SEE_OTHER://303 查看其他位置
// 开发者是否允许重定向
if (!client.followRedirects()) return null;
//重定向后的实际地址
String location = userResponse.header("Location");
if (location == null) return null;
HttpUrl url = userResponse.request().url().resolve(location);

// Don't follow redirects to unsupported protocols.
if (url == null) return null;

// If configured, don't follow redirects between SSL and non-SSL.
boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
if (!sameScheme && !client.followSslRedirects()) return null;

// Most redirects don't include a request body.
Request.Builder requestBuilder = userResponse.request().newBuilder();
if (HttpMethod.permitsRequestBody(method)) {
final boolean maintainBody = HttpMethod.redirectsWithBody(method);
if (HttpMethod.redirectsToGet(method)) {
requestBuilder.method("GET", null);
} else {
RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
requestBuilder.method(method, requestBody);
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding");
requestBuilder.removeHeader("Content-Length");
requestBuilder.removeHeader("Content-Type");
}
}

// When redirecting across hosts, drop all authentication headers. This
// is potentially annoying to the application layer since they have no
// way to retain them.
if (!sameConnection(userResponse.request().url(), url)) {
requestBuilder.removeHeader("Authorization");
}

return requestBuilder.url(url).build();
//408 超时
case HTTP_CLIENT_TIMEOUT:
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (!client.retryOnConnectionFailure()) {
// The application layer has directed us not to retry the request.
return null;
}

RequestBody requestBody = userResponse.request().body();
if (requestBody != null && requestBody.isOneShot()) {
return null;
}

if (userResponse.priorResponse() != null
&& userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) {
// We attempted to retry and got another timeout. Give up.
return null;
}

if (retryAfter(userResponse, 0) > 0) {
return null;
}

return userResponse.request();
//503 服务端不可用
case HTTP_UNAVAILABLE:
if (userResponse.priorResponse() != null
&& userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null;
}

if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request();
}

return null;

default:
return null;
}
}

通过followUpRequest()Response返回的code进行对应操作,在触发到重定向相关的code3XX时,需要对应的转换Request使用获取到的重定向后地址进行请求。

由源码可知,可以重试的最大次数为20次,可以通过retryOnConnectionFailure(true)设置支持重试。但是不支持自定义重试次数,若需要自定义重试次数,需要自定义拦截器去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class RetryInterceptor(var maxRetry: Int/*最大重试次数*/) : Interceptor {
//当前重试次数
private var retryNum = 0

override fun intercept(chain: Interceptor.Chain): Response {
val request = chain.request()
var response = chain.proceed(request)

while (!response.isSuccessful && retryNum < maxRetry) {
retryNum++
response = chain.proceed(request)
}
return response
}
}
BridgeInterceptor

OkHttp-BridgeInterceptor

用以将用户的请求转换为向服务器的请求,之后再把服务器返回的数据转换成用户直观的数据。主要是对Header进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//重构用户请求 为 服务器请求格式
RequestBody body = userRequest.body();
//如果存在Body
if (body != null) {
//对Header进行调整
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
//设置Header中的 host
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

//设置 connection : Keep-Alive 保持长连接模式
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
//默认使用Gzip压缩
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//设置 Cookie信息
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
//设置UA
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//传递至下一个拦截器处理
Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
//如果服务器支持Gzip压缩,需要进行解压操作
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}
}

GZip:是一种压缩技术,可以改进Web应用的性能,将请求体明显的减少其大小,如果服务器也支持该格式,就会返回对应格式的内容,客户端需要进行解压操作,可以明显的减少流量消耗。

BridgeInterceptor主要执行了以下3步:

  • 用户请求转换为网络请求

    在原来Request上添加了很多Header,例如Content-Type(定义网络文件的类型和网页的编码)、Content-Length(请求体内容长度)、Transfer-Encoding(请求体的大小)与Content-Length互斥、Accept-Encoding(编码格式)

    未设置Accept-Encoding默认为gzip

  • 执行转换后的网络请求

    chain.proceed(requestBuilder.build())

  • 服务器返回的响应结果转换为用户响应结果

    根据上一步获得Response后,需要再次转化为用户直观格式。主要在于服务端返回的信息里是否设置了Accept-Encoding:gzip,设置了则需要进行解压过程,获取最终结果。

CacheInterceptor

OkHttp-CacheInterceptor

主要用于读取缓存以及更新缓存的为了节省流量和提高响应速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//根据请求的相关信息获取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();
//创建缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();①
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}
//缓存无法使用,关闭获得的Response
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// 根据策略,不使用网络且没有缓存的直接报错,返回504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// 直接返回缓存,不允许使用网络
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
//请求向下传递
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// 接受到服务器返回数据,如果返回code为 304 直接使用缓存结果
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
//更新当前存储的缓存信息
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//读取服务器返回结果
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//对数据进行缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}
}

CacheInterceptor的执行流程如下所示:

  1. 先行读取缓存数据
  2. 创建好对应的缓存策略:强制缓存对比缓存
  3. 根据缓存策略,不使用网络、也没有对应缓存,返回504
  4. 根据缓存策略,不使用网络,存在缓存则直接返回
  5. 前面都没有返回结果,继续向下执行请求:chain.proceed()
  6. 接受到对应网络结果,如果返回code为304,代表直接使用缓存并更新对应缓存信息
  7. 读取网络结果,对数据进行缓存
  8. 返回获取的网络结果

具体的缓存策略请参考缓存策略

ConnectInterceptor

OkHttp-ConnectInterceptor

真正与服务端建立连接,底层是通过Socket进行连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;

@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//建立连接
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
//继续请求下一个拦截器
return realChain.proceed(request, transmitter, exchange);
}
}

ConnectInterceptor主要功能是建立与服务器的连接关系,通过Transmitter.newExchange()建立连接,建立完成后继续向下执行请求。

具体的连接过程可以参考连接机制

NetworkInterceptor

OkHttp-NetworkInterceptor

用户自定义的网络拦截器,处于第6个拦截器,前面经过了RetryAndFolowUpInterceptor的重定向过程以及BridgeInterceptor的请求头处理,在此处可以获取到更多的连接信息。

引用代码

1
2
3
OkHttpClient client = new OkHttpClient.Builder()
.addNetworkInterceptor(new LoggingInterceptor())
.build();

NetworkInterceptor可以获取到最终请求的Request,以及获取到真正进行过网络请求的得到的Response,从而可以针对Response进行修改然后再回传到上层拦截器。

NetworkInterceptor主要有以下特点:

  • 可以操作经过重定向、重试得到的Response
  • 无法响应缓存数据的请求,因为CacheInterceptor执行在它之前
  • 得到最终进行请求的Request
  • 可以获得连接信息
CallServerInterceptor

OkHttp-CallServerInterceptor

数据的写入过程,也就是客户端和服务端进行交互的过程,客户端发送数据,服务端返回数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public final class CallServerInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();
//写入请求头
exchange.writeRequestHeaders(request);

boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
//判断当前是否有 请求体body
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
//如果是1XX的话 表示当前需要等服务端响应
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}


if (responseBuilder == null) {
//写入请求体
if (request.body().isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}

if (request.body() == null || !request.body().isDuplex()) {
//结束请求
exchange.finishRequest();
}

if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}
//得到响应头
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false);
}

Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
//读取响应体内容
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

exchange.responseHeadersEnd(response);
//forWebSocket 表示为socket连接方式
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}
// close表示关闭连接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}
}

CallServerInterceptor主要执行了以下过程:

  • 写入请求头
  • 写入请求体(如果存在)
  • 获取状态行及响应头
  • 获取响应体

CallServerInterceptor已经是最后一个拦截器了,接下来就是向上回溯并返回自己获得的Response

HTTP报文结构:

请求报文

请求行:声明请求方法、主机域名及协议版本

请求头:声明客户端的部分报文信息

请求体:存放客户端发送给服务器的数据

响应报文

状态行:声明HTTP协议版本、状态码及描述

响应头:声明服务端的部分报文信息

响应体:服务端返回客户端的数据

责任链模式串联

介绍完上述的拦截器后,接下来就是分析如何将这些拦截器进行串联调用。

RealCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
...
//构建责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

boolean calledNoMoreExchanges = false;
try {
//开始从头链式调用拦截器
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}

实际执行链式调用的是RealInterceptorChain,由他负责责任链的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final class RealInterceptorChain implements Interceptor.Chain {
@Override public Response proceed(Request request) throws IOException {
return proceed(request, transmitter, exchange);
}

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException
{
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// 存在已经在使用的流,直接进行复用
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// 调用该链中的下一个拦截器 实质为 用户自定义的拦截器,不存在则为 RetryAndFollowUpInterceptor
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}
}

RealIntercrptor为链式调用的起点,调用proceed()之后,继续调用下一层的拦截器,直到得到最终的Response。

后续的拦截器也是按照这个规则向下执行,内部都会调用到chain.proceed()直到没有调用为止。

Request是按照定义的interceptor顺序向下执行,然后Response是逆向向上处理的。

获取请求结果Response

Response:返回HTTP请求响应结果,包含了状态码,响应正文等

CallServerInterceptor得到最初格式的Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//获得状态行及响应头  
public @Nullable Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
try {
Response.Builder result = codec.readResponseHeaders(expectContinue);
if (result != null) {
Internal.instance.initExchange(result, this);
}
return result;
} catch (IOException e) {
eventListener.responseFailed(call, e);
trackFailure(e);
throw e;
}
}
//获得响应正文
public ResponseBody openResponseBody(Response response) throws IOException {
try {
eventListener.responseBodyStart(call);
String contentType = response.header("Content-Type");
long contentLength = codec.reportedContentLength(response);
Source rawSource = codec.openResponseBodySource(response);
ResponseBodySource source = new ResponseBodySource(rawSource, contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
} catch (IOException e) {
eventListener.responseFailed(call, e);
trackFailure(e);
throw e;
}
}

OkHttp执行流程

OkHttp缓存机制

Http缓存

OkHttp-HTTP缓存机制

Http缓存是web性能优化的重要手段,缓存机制是依赖于header中的参数实现的,这些参数指定了缓存需要从缓存中获取还是从服务端获取

Http缓存有多种规则,根据是否需要重新向服务器发起请求来进行分类:

强制缓存

当客户端第一次请求数据时,服务端在响应头会携带缓存规则信息,主要为两个字段:ExpiresCache-Control

当再次请求数据时,如果符合缓存规则,则直接使用缓存数据,无需与服务端重新交互。

强制缓存流程

强制缓存在缓存未失效的情况下,可以直接使用缓存数据,接下来介绍判断缓存数据是否失效
上文提到,强制缓存是根据两个Header字段进行判定的,这两个字段表示了失效规则

Expires

服务端返回的到期时间,即下一次请求时,请求时间小于服务端返回的到期时间,直接使用缓存数据。
这参数是HTTP1.0的东西了,现在主流的是HTTP1.1。
可能由于客户端时间没有与服务端时间同步而导致缓存命中的误差。

Cache-Control

在HTTP1.1中替代Expires,功能与其一致。
Cache-Control常见的取值有如下几种:

  • private:客户端可以进行缓存
  • public:客户端以及代理服务器都可以进行缓存
  • max-age= XX:缓存数据在 XX秒后失效
  • no-cache:需要使用到对比缓存
  • no-store:所有内容都不进行缓存
  • s-maxage = XX:限定缓存可以在代理服务器中存放多久

对比缓存

需要进行比较判断来确定是否使用缓存,当客户端第一次请求数据时,服务端会返回缓存标识以及数据给客户端,客户端对两者都要进行备份到缓存,当再次请求数据时,客户端会带上缓存标识发送给服务端,服务端对标识进行判断,返回code值。返回若为304,则继续使用缓存。

对比缓存流程

缓存标识未失效时,可以继续使用缓存数据,每次都需要与服务端进行交互去验证缓存标识
对比缓存也是依据两个Header字段进行判定的,这两个字段表示了缓存标识

Last-Modified/If-Modified-Since

Last-Modified:服务端返回给客户端,表示资源的最后修改时间。
If-Modified-Since:客户端发给服务端,表示服务端上次返回的资源最后修改时间。
服务端接收到If-Modified-Since后,与被请求资源的最后修改时间进行比对。

  • 若大于,返回最新资源并返回code为200,客户端需要重新进行缓存
  • 否则,说明资源无修改并返回code为304,客户端继续使用缓存数据
ETag/If-None-Match

ETag:服务端返回给客户端,表示当前资源在服务器的唯一标识。
If-None-Match:客户端发送给服务端,表示服务端上次返回的资源唯一标识。
服务端接收到If-None-Match后,与被请求资源的唯一标识进行比对

  • 标识不同,表示资源被改动过,返回最新资源及设置code为200,客户端需要重新进行缓存
  • 标识相同,表示资源未被改动,返回code为304,客户端继续使用缓存数据

其中ETag/If-None-Match的优先级是高于Last-Modified/If-Modified-Since

总结

  • 强制缓存的优先级是高于对比缓存的
  • 对于强制缓存,服务端会给予一个过期时间,在有效期内再次请求都只会使用缓存,不会请求服务端。
  • 超过有效期就使用对比缓存策略,将服务端返回的ETag/Last-Modified发还给服务端进行验证,有效则继续使用缓存数据(返回code为304),无效则重新获取并进行缓存(返回code为200)。

缓存存储

OkHttp缓存实现

介绍完毕Http的缓存机制后,接下来就是看OkHttp中的源码实现

CacheStrategy.java
1
2
3
4
CacheStrategy(Request networkRequest, Response cacheResponse) {
this.networkRequest = networkRequest;
this.cacheResponse = cacheResponse;
}

缓存策略主要通过CacheStrategy类实现,关键参数为networkRequest(网络请求)cacheResponse(缓存的响应结果)
CacheStrategy通过工厂模式进行构建的,最终通过调用getCandidate()来生成不同模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private CacheStrategy getCandidate() {
// 没缓存直接进行重新请求
if (cacheResponse == null) {
return new CacheStrategy(request, null);
}

// 如果是HTTPs且握手信息丢失进行重新请求
if (request.isHttps() && cacheResponse.handshake() == null) {
return new CacheStrategy(request, null);
}
//判断缓存已经失效 重新进行请求
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}
//
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}

CacheControl responseCaching = cacheResponse.cacheControl();
//
long ageMillis = cacheResponseAge();
long freshMillis = computeFreshnessLifetime();

if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}

long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}

long maxStaleMillis = 0;
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
//处于强制缓存状态,直接返回缓存数据
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
}
return new CacheStrategy(null, builder.build());
}

// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
String conditionName;
String conditionValue;
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
return new CacheStrategy(request, null); // No condition! Make a regular request.
}
//交由服务端去进行判断
Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);

Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build();
return new CacheStrategy(conditionalRequest, cacheResponse);
}

CacheStrategy根据之前的缓存结果以及要发送的request的header计算缓存策略

networkRequest cacheResponse CacheStrategy
null null 不进行网络请求且缓存不存在或过期
返回504错误
null not null 不进行网络请求但是存在缓存且有效
直接返回缓存数据
not null null 进行网络请求且缓存不存在或过期
直接进行网络请求获取数据
not null not null 进行网络请求,请求头包含ETag/Last-Modified且缓存存在
根据网络请求结果判断
返回304,使用缓存
返回200,使用请求数据且更新缓存

OkHttp连接机制

ConnectInterceptor中进行了与服务端的连接,通过Exchange类进行连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) throw new IllegalStateException("released");
if (exchange != null) throw new IllegalStateException("exchange != null");
}
//建立连接
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}

通过Socket连接服务端

通过ExchangeCodec.find()来设置连接或者复用

ExchangeCodec.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks)
{
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
//返回一个健康的连接
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks)
throws IOException
{
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);

// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}

// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}

return candidate;
}
}

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled)
throws IOException
{
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.

Route previousRoute = retryCurrentRoute()
? transmitter.connection.route()
: null;

// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new exchanges.
releasedConnection = transmitter.connection;
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;

if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
result = transmitter.connection;
releasedConnection = null;
}

if (result == null) {
// Attempt to get a connection from the pool.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else {
selectedRoute = previousRoute;
}
}
}
closeQuietly(toClose);

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}

// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}

List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection.getAll();
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
//没有从连接池中获取到连接需要重新建立
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}

// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}

// Do TCP + TLS handshakes. This is a blocking operation.
// 开始TCP三次握手以及TLS操作,为阻塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
} else {
//加入连接池 等待复用
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);

eventListener.connectionAcquired(call, result);
return result;
}

最终通过Socket.connect()进行连接。

连接池(ConnectionPool)

OkHttp-连接池 ConnectionPool

频繁的进行Socket连接(三次握手)和Socket断开(四次挥手)非常消耗网络资源以及时间。在HTTP1.1之后提供了keep-alive这个header,可以实现长连接,有效的降低了延迟并提升了处理速度。
连接池就是为了复用已存在连接,可以有效降低创建连接的开销。

连接池构造方法以及成员变量

ConnectionPool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final class ConnectionPool {
//后台清理线程
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
//最大的空闲连接数
private final int maxIdleConnections;
//连接最大持续时间
private final long keepAliveDurationNs;
//存储连接的双向队列
private final Deque<RealConnection> connections = new ArrayDeque<>();

public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
}

ConnectionPool支持配置以下变量

  • maxIdleConnections:最大空闲连接数,默认5
  • keepAliveDurationNs:最大连接保持时间,默认5min

Connection连接池中的Connection任一超出以上配置,就需要执行清理。

可以通过以下方法配置连接池

1
2
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectionPool(new ConnectionPool()); //配置连接池

连接池加入连接

双端队列

通过connections存储Connection

1
2
3
4
5
6
7
8
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {//当前清理线程没有运行
cleanupRunning = true;
executor.execute(cleanupRunnable);//开启清理过程
}
connections.add(connection);//加入队列
}

在外部执行put()时,连接加入连接池,并且开启清理线程去清理那些超出配置的连接。

外部执行put()路径如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// ConnectInterceptor 连接拦截器,其中执行连接过程
public final class ConnectInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//建立新连接
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

// StreamAllocation
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks)
{
...

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);//寻找可用的连接
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks)
throws IOException
{
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
...

return candidate;
}
}

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled)
throws IOException
{
...
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");

// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
...
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);//根据address从连接池中获取对应连接
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
...
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}

...

synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}

if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);//连接池未找到对应连接,建立新连接
acquire(result, false);
}
}

...

synchronized (connectionPool) {
reportedAcquired = true;

// Pool the connection.
Internal.instance.put(connectionPool, result);//将新建的连接加入到连接池内

// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}

}

//OkHttpClient
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
static {
Internal.instance = new Internal() {
...
@Override public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route)
{
return pool.get(address, streamAllocation, route);//从连接池获取连接
}

@Override public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);//向连接池添加连接
}
...
}
}

通过ConnectInterceptor.intercept()去建立连接,向下调用到StreamAllocation.newStream()

newStream()中继续执行到findConnection(),其中主要执行了两步

  1. ConnectionPool寻找是否存有当前address对应的连接,调用ConnectionPool.get(XXX),存在就返回对应连接。
  2. 不存在对应连接,执行new RealConnection()新建连接,并调用ConnectionPool.put()存储新连接

对应的get()、put()都是通过Internal.instance调用的,其中Internal是一个抽象类,具体实现类对应的就是OkHttpClient

连接池清理连接

在使用连接池时,初始化了一个Executor线程池,这个主要就是为了在清理无效连接时去开启清理线程用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  /**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
* 清理过期的连接,且保证最多只能运行一个清理线程。
*/

private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

//清理过期连接任务
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());//返回下次需要清理连接的时间
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);//阻塞等待
} catch (InterruptedException ignored) {
}
}
}
}
}
};
cleanup()

内部主要执行的是标记空闲连接清理空闲连接返回下次清理时间这几步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {//遍历连接储存队列
RealConnection connection = i.next();

// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {//标记正在使用的活跃连接
inUseConnectionCount++;
continue;
}

idleConnectionCount++;//非活跃标记为空闲连接

// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;//得到最长空闲时间的连接
}
}

if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
// 空闲连接超过`maxIdleConnections`个或者空闲时间超过`keepAliveDurationNs`,需要清理该连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
// 返回最大空闲连接的到期时间,等待到达时间后进行清理
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
// 所有都是活跃连接,返回最大空闲连接时间,等待到达时间后清理
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
// 当前不存在连接,直接返回 -1,不进行清理任务
cleanupRunning = false;
return -1;
}
}
// 立即关闭过期连接
closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

cleanup()执行流程如下:

  • 遍历ConnectionPoolconnections,通过pruneAndGetAllocationCount()判断connection是否空闲
  • 遍历完毕后,找到最长时间的空闲连接(longestIdleConnection)
  • 得到longestIdleConnection后,先是比较当前的idleConnectionCount是否大于maxIdleConnections或者longestIdleDurarionNs是否大于keepAliveDurationNs,两者满足其一,则清理掉longestIdleConnection
  • 不满足其上条件,继续判断idleConnectionCount > 0,表示当前存在空闲连接,就返回距离最大空闲连接时间差keepAliveDurationNs - longestIdleDurationNs,等待到时清理
  • 不满足其上条件,继续判断inUseConnectionCount > 0,表示当前都是活跃连接,返回keepAliveDurationNs,等待达到时间清理
  • 以上条件都不满足,表示当前没有连接,直接返回-1
  • 存在longestIdleConnection,即调用longestIdleConnection.socket().close()关闭连接即可
pruneAndGetAllocationCount(Connection)

判断当前连接是否正在活跃,采用了引用计数法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
//连接弱引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
//获取引用连接
Reference<StreamAllocation> reference = references.get(i);
//不为null ,表示当前连接尚未回收
if (reference.get() != null) {
i++;
continue;
}

// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

//移除引用
references.remove(i);
connection.noNewStreams = true;

// If this was the last allocation, the connection is eligible for immediate eviction.
//所有引用都被移除,表示当前连接处于空闲
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}

return references.size();
}

StreamAllocation引用是在StreamAllocation.acquire()时加入的

1
2
3
4
5
6
7
8
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();

this.connection = connection;
this.reportedAcquired = reportedAcquired;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

通过这种引用计数法来判断当前是否为空闲连接

OkHttp-Cookie机制

OkHttp-Cookie机制

使用方法

1
2
3
4
5
6
7
8
9
10
11
OkHttpClient.Builder()
.//其他参数
.cookieJar(object : CookieJar {
override fun saveFromResponse(url:HttpUrl,cookies:MutableList<Cookie>){
//从Response 获取Cookie信息 并存储到本地
}

override fun loadForRequest(url:HttpUrl) : MutableList<Cookie> {
//根据url获取存储本地的Cookie信息 放到 Request请求中
}
})

Cookie一般都会在请求中进行使用,多用于请求头的Cookie字段,大致可以猜测相关的处理位于BridgeInterceptor中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//BridgeInterceptor.java
public final class BridgeInterceptor implements Interceptor{
//这个也就是在 构建Client时放入的CookieJar对象
private final CookieJar cookieJar;

@Override public Response intercept(Chain chain) throws IOException {
...
//从配置的cookiejar中获取信息
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
//在请求头中添加 Cookie请求头,携带Cookie参数
requestBuilder.header("Cookie", cookieHeader(cookies));
}
...
//请求最后得到的 response结果
Response networkResponse = chain.proceed(requestBuilder.build());
//从rensponse中解析Cookie并缓存到 CookieJar中
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

}
}

//HttpHeaders.java
public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
if (cookieJar == CookieJar.NO_COOKIES) return;

List<Cookie> cookies = Cookie.parseAll(url, headers);
if (cookies.isEmpty()) return;
//获取到Cookie对象 缓存中 CookieJar中
cookieJar.saveFromResponse(url, cookies);
}

//Cookie.java
public static List<Cookie> parseAll(HttpUrl url, Headers headers) {
//从响应头中读取到 Set-Cookie字段,并转换成 Cookie对象
List<String> cookieStrings = headers.values("Set-Cookie");
List<Cookie> cookies = null;

for (int i = 0, size = cookieStrings.size(); i < size; i++) {
Cookie cookie = Cookie.parse(url, cookieStrings.get(i));
if (cookie == null) continue;
if (cookies == null) cookies = new ArrayList<>();
cookies.add(cookie);
}

return cookies != null
? Collections.unmodifiableList(cookies)
: Collections.<Cookie>emptyList();
}

Cookie原理简述:

服务端返回的Response通过Set-Cookie响应头返回对应的Cookie信息,以便在下次进行请求时在Request的请求头中添加Cookie字段以携带Cookie信息。

OkHttp通过setCookieJar对所有的Cookie文件进行管理。

  • saveFromResponse():可以获取对应域名的Cookie信息
  • loadForRequest():可以在请求域名时添加Cookie信息

可以通过一个全局的CookieJar类来实现应用内Cookie文件的管理。

OkHttp-DNS功能

DNS介绍

Domain Name System:根据域名查出IP地址,是HTTP协议的前提,只有将域名正确的进行解析,得到IP地址后,才可以继续进行网络连接。

DNS服务器结构如下:

  • 根DNS服务器:返回顶级DNS服务器的IP地址
  • 顶级域DNS服务器:返回权威DNS服务器的IP地址
  • 权威DNS服务器:返回对应主机的IP地址

img

LocalDNS

OkHttp-LocalDNS

运营商提供的DNS服务器,请求时优先查询LocalDNS 缓存,存在直接使用。不存在就需要从根域名服务器 -> 顶级域名服务器 -> 权威域名服务器往上查询可用的IP地址

img

缺陷
  1. 不稳定

    DNS劫持或者服务器故障,导致解析服务不可用

  2. 不准确

    LocalDNS调度不一定是就近原则。某些运营商会把解析请求转发到其他运营商的LocalDNS服务器。

    就会导致解析出的IP不是就近服务器,致使访问变慢甚至无法访问。

  3. 不及时

    运营商可能修改DNS的TTL(Time-To-Live,DNS缓存时间),导致DNS解析结构发生修改,但是在当前请求条件下尚未生效。

img

HttpDNS

OkHttp-HTTPDNS

HTTPDNS利用HTTP协议与DNS服务交互,绕开了运营商LocalDNS服务,有效防止了域名劫持以及提高了域名解析成功率。

原理

img

  1. 客户端直接访问HttpDNS接口,获取域名在HTTPDNS服务器上的最优IP(从容灾方面考虑,还需要保留LocalDNS请求)
  2. 客户端获取到IP后,直接向该IP发起HTTP请求
优势
  1. 降低了UnknownHostException异常发生
  2. 调度精准,根据用户IP,精准获取域名对应IP
  3. 扩展性强,可以自定义域名对应IP规则

OkHttp-HttpDNS实现

OkHttp提供了Dns接口,可以进行自定义拓展替代本身的LocalDNS解析方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//自定义DNS
class OptimizeLocalDNS : Dns{
override fun lookup(hostname: String): List<InetAddress> {
return DNSLookUpUtil.loadLocalDNS(hostname)
}
}

//设置LocalDNS超时取消
fun loadLocalDNS(hostname: String, timeout: Long = 10L): List<InetAddress> {
try {
val task = FutureTask<List<InetAddress>>(Callable<List<InetAddress>> {
//返回去重结果
InetAddress.getAllByName(hostname).toList().distinct()
})
Thread(task).start()
return task.get(timeout, TimeUnit.SECONDS)
} catch (e: Exception) {
}
return listOf()
}

//设置自定义DNS
mOkHttpClient = httpBuilder
.dns(OptimizeLocalDNS())
.build();

OkHttp-DNS原理

配置的dns()初始使用位于RetryAndFollowUpInterceptor.intercept()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class RetryAndFollowUpInterceptor implements Interceptor {
...
@Override public Response intercept(Chain chain) throws IOException {
...
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
}

private Address createAddress(HttpUrl url) {
...
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}

...
}

构造出一个Address对象,里面包含了主机名(host)、端口(port)、DNS配置(DNS)、SSL配置(sslSocketFactory,certificatePinner)、代理设置

得到Address,通过StreamAllocation构造了RouteSelector对象

1
2
3
4
5
6
7
8
9
public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,
EventListener eventListener, Object callStackTrace)
{
this.connectionPool = connectionPool;
this.address = address;
this.call = call;
this.eventListener = eventListener;
this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener);
this.callStackTrace = callStackTrace;
}

RouteSelector主要为了Select Route(选择路由),返回一个可用的Route对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void resetNextInetSocketAddress(Proxy proxy) throws IOException {
// Clear the addresses. Necessary if getAllByName() below throws!
inetSocketAddresses = new ArrayList<>();

String socketHost;
int socketPort;
if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {//存在代理
socketHost = address.url().host();
socketPort = address.url().port();
} else {
SocketAddress proxyAddress = proxy.address();
if (!(proxyAddress instanceof InetSocketAddress)) {
throw new IllegalArgumentException(
"Proxy.address() is not an " + "InetSocketAddress: " + proxyAddress.getClass());
}
InetSocketAddress proxySocketAddress = (InetSocketAddress) proxyAddress;
socketHost = getHostString(proxySocketAddress);
socketPort = proxySocketAddress.getPort();
}

if (socketPort < 1 || socketPort > 65535) {
throw new SocketException("No route to " + socketHost + ":" + socketPort
+ "; port is out of range");
}

if (proxy.type() == Proxy.Type.SOCKS) {//解析的直接为代理地址
inetSocketAddresses.add(InetSocketAddress.createUnresolved(socketHost, socketPort));
} else {
//dns开始解析监听
eventListener.dnsStart(call, socketHost);

// Try each address for best behavior in mixed IPv4/IPv6 environments.
List<InetAddress> addresses = address.dns().lookup(socketHost);//通过配置的DNS去解析对应域名的IP列表
if (addresses.isEmpty()) {
throw new UnknownHostException(address.dns() + " returned no addresses for " + socketHost);
}
//dns解析结束监听
eventListener.dnsEnd(call, socketHost, addresses);

for (int i = 0, size = addresses.size(); i < size; i++) {
InetAddress inetAddress = addresses.get(i);
inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
}
}
}

resetNextInetSocketAddress()返回List<InetSocketAddress>,区分了一下两种情况

  • 设置了proxies代理服务器,直接返回InetSocketAddress(socketHost,socketPort)代理服务器对应的地址和端口
  • 未设置代理服务器,通过设置的dns去解析对应域名(dns.lookup(host))得到List<InetAddress>对应的IP列表,在返回对应地址

OkHttp-HTTP2.0协议支持

OkHttp-HTTP2.0

基于二进制分帧首部压缩服务端推送进行分析

//TODO

OkHttp拓展

请求时间获取

img

EventListener是OkHttp提供的监听回调,可以通过实现这个抽象类监听到网络请求各阶段的时间点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class EventListener {
//请求相关回调
public void callStart(Call call) {}
public void callEnd(Call call) {}
public void callFailed(Call call, IOException ioe) {}

//dns解析回调
public void dnsStart(Call call, String domainName) {}
public void dnsEnd(Call call, String domainName, List<InetAddress> inetAddressList) {}

//请求连接相关回调
public void connectStart(Call call, InetSocketAddress inetSocketAddress, Proxy proxy){}
public void connectEnd(Call call, InetSocketAddress inetSocketAddress, Proxy proxy,@Nullable Protocol protocol) {}
public void connectFailed(Call call, InetSocketAddress inetSocketAddress, Proxy proxy,
@Nullable Protocol protocol, IOException ioe)
{}
public void connectionAcquired(Call call, Connection connection){}
public void connectionReleased(Call call, Connection connection){}

...
}

DNS解析耗时

只要监听dnsStart()dnsEnd()之间的时间差即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private long dnsStartTime;

private long dnsDuration = -1L;

@Override
public void dnsStart(Call call, String domainName) {
super.dnsStart(call, domainName);
recordEventLog("dnsStart");
dnsStartTime = System.nanoTime();
}

@Override
public void dnsEnd(Call call, String domainName, List<InetAddress> inetAddressList) {
super.dnsEnd(call, domainName, inetAddressList);
recordEventLog("dnsEnd");
dnsDuration = (System.nanoTime() - dnsStartTime) / 1000000;
}

public long getDnsDuration() {
return dnsDuration;
}

dnsDuration即为DNS解析耗时

请求连接耗时

初始连接耗时

使用Socket建立TCP连接,初始连接表示的就是Socket建立连接的过程

只要监听connectStart()connectEnd()之间的时间差。

复用连接耗时

OkHttp设置ConnectionPool,可以复用已存在的连接

需要监听connectAcquired()connectReleased()之间的时间差

IP直连问题

//TODO

内容引用

开源框架源码鉴赏:OkHttp
彻底弄懂HTTP缓存机制及原理

百度App网络深度优化系列《一》DNS优化

HTTP2.0相关

HPACK算法

HTTP/2首部压缩的OkHttp3实现


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!