// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); }
private Request followUpRequest(Response userResponse, @Nullable Route route)throws IOException { if (userResponse == null) thrownew IllegalStateException(); int responseCode = userResponse.code();
final String method = userResponse.request().method(); switch (responseCode) { // 407 需要进行代理认证 case HTTP_PROXY_AUTH: Proxy selectedProxy = route != null ? route.proxy() : client.proxy(); if (selectedProxy.type() != Proxy.Type.HTTP) { thrownew ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy"); } return client.proxyAuthenticator().authenticate(route, userResponse); // 401 未经认证 case HTTP_UNAUTHORIZED: return client.authenticator().authenticate(route, userResponse); // 301 永久重定向 302 临时重定向 只有GET、HEAD请求方法才有效 case HTTP_PERM_REDIRECT: case HTTP_TEMP_REDIRECT: // "If the 307 or 308 status code is received in response to a request other than GET // or HEAD, the user agent MUST NOT automatically redirect the request" if (!method.equals("GET") && !method.equals("HEAD")) { returnnull; } // fall-through
case HTTP_MULT_CHOICE:// 300 多个重定向地址 case HTTP_MOVED_PERM:// 301 永久移除 指向了新的位置 case HTTP_MOVED_TEMP://302 临时移除 case HTTP_SEE_OTHER://303 查看其他位置 // 开发者是否允许重定向 if (!client.followRedirects()) returnnull; //重定向后的实际地址 String location = userResponse.header("Location"); if (location == null) returnnull; HttpUrl url = userResponse.request().url().resolve(location);
// Don't follow redirects to unsupported protocols. if (url == null) returnnull;
// If configured, don't follow redirects between SSL and non-SSL. boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme()); if (!sameScheme && !client.followSslRedirects()) returnnull;
// Most redirects don't include a request body. Request.Builder requestBuilder = userResponse.request().newBuilder(); if (HttpMethod.permitsRequestBody(method)) { finalboolean maintainBody = HttpMethod.redirectsWithBody(method); if (HttpMethod.redirectsToGet(method)) { requestBuilder.method("GET", null); } else { RequestBody requestBody = maintainBody ? userResponse.request().body() : null; requestBuilder.method(method, requestBody); } if (!maintainBody) { requestBuilder.removeHeader("Transfer-Encoding"); requestBuilder.removeHeader("Content-Length"); requestBuilder.removeHeader("Content-Type"); } }
// When redirecting across hosts, drop all authentication headers. This // is potentially annoying to the application layer since they have no // way to retain them. if (!sameConnection(userResponse.request().url(), url)) { requestBuilder.removeHeader("Authorization"); }
return requestBuilder.url(url).build(); //408 超时 case HTTP_CLIENT_TIMEOUT: // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (!client.retryOnConnectionFailure()) { // The application layer has directed us not to retry the request. returnnull; }
if (userResponse.priorResponse() != null && userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) { // We attempted to retry and got another timeout. Give up. returnnull; }
if (retryAfter(userResponse, 0) > 0) { returnnull; }
return userResponse.request(); //503 服务端不可用 case HTTP_UNAVAILABLE: if (userResponse.priorResponse() != null && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) { // We attempted to retry and got another timeout. Give up. returnnull; }
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) { // specifically received an instruction to retry without delay return userResponse.request(); }
// We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); //建立连接 Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); //继续请求下一个拦截器 return realChain.proceed(request, transmitter, exchange); } }
long sentRequestMillis = System.currentTimeMillis(); //写入请求头 exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; //判断当前是否有 请求体body if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. //如果是1XX的话 表示当前需要等服务端响应 if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); }
if (responseBuilder == null) { //写入请求体 if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); }
if (request.body() == null || !request.body().isDuplex()) { //结束请求 exchange.finishRequest(); }
if (!responseHeadersStarted) { exchange.responseHeadersStart(); } //得到响应头 if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); }
Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); //读取响应体内容 int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build();
code = response.code(); }
exchange.responseHeadersEnd(response); //forWebSocket 表示为socket连接方式 if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } // close表示关闭连接 if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); }
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) thrownew AssertionError();
calls++;
// 存在已经在使用的流,直接进行复用 if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }
// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.exchange != null && calls > 1) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }
// 调用该链中的下一个拦截器 实质为 用户自定义的拦截器,不存在则为 RetryAndFollowUpInterceptor RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed(). if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { thrownew IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }
// Confirm that the intercepted response isn't null. if (response == null) { thrownew NullPointerException("interceptor " + interceptor + " returned null"); }
if (response.body() == null) { thrownew IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); }
// If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } }
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; }
return candidate; } }
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownew IOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt.
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null;
if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection; releasedConnection = null; }
if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else { selectedRoute = previousRoute; } } } closeQuietly(toClose);
if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; }
// If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } //没有从连接池中获取到连接需要重新建立 if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } }
// If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }
// Do TCP + TLS handshakes. This is a blocking operation. // 开始TCP三次握手以及TLS操作,为阻塞操作 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route());
Socket socket = null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; } else { //加入连接池 等待复用 connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop. if (keepAliveDuration <= 0) { thrownew IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } } }
ConnectionPool支持配置以下变量
maxIdleConnections:最大空闲连接数,默认5
keepAliveDurationNs:最大连接保持时间,默认5min
Connection连接池中的Connection任一超出以上配置,就需要执行清理。
可以通过以下方法配置连接池
1 2
OkHttpClient.Builder builder = new OkHttpClient.Builder() .connectionPool(new ConnectionPool()); //配置连接池
// We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//建立新连接 RealConnection connection = streamAllocation.connection();
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); ...
return candidate; } }
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { ... synchronized (connectionPool) { if (released) thrownew IllegalStateException("released"); if (codec != null) thrownew IllegalStateException("codec != null"); if (canceled) thrownew IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); ... if (result == null) { // Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this, null);//根据address从连接池中获取对应连接 if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } ... if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; }
...
synchronized (connectionPool) { if (canceled) thrownew IOException("Canceled");
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } }
if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute);//连接池未找到对应连接,建立新连接 acquire(result, false); } }
// Pool the connection. Internal.instance.put(connectionPool, result);//将新建的连接加入到连接池内
// If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } }
/** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. * 清理过期的连接,且保证最多只能运行一个清理线程。 */ privatestaticfinal Executor executor = new ThreadPoolExecutor(0/* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L/* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
//清理过期连接任务 privatefinal Runnable cleanupRunnable = new Runnable() { @Overridepublicvoidrun(){ while (true) { long waitNanos = cleanup(System.nanoTime());//返回下次需要清理连接的时间 if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos);//阻塞等待 } catch (InterruptedException ignored) { } } } } } };
longcleanup(long now){ int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {//遍历连接储存队列 RealConnection connection = i.next();
// If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) {//标记正在使用的活跃连接 inUseConnectionCount++; continue; }
idleConnectionCount++;//非活跃标记为空闲连接
// If the connection is ready to be evicted, we're done. long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection;//得到最长空闲时间的连接 } }
if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). // 空闲连接超过`maxIdleConnections`个或者空闲时间超过`keepAliveDurationNs`,需要清理该连接 connections.remove(longestIdleConnection); } elseif (idleConnectionCount > 0) { // A connection will be ready to evict soon. // 返回最大空闲连接的到期时间,等待到达时间后进行清理 return keepAliveDurationNs - longestIdleDurationNs; } elseif (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. // 所有都是活跃连接,返回最大空闲连接时间,等待到达时间后清理 return keepAliveDurationNs; } else { // No connections, idle or in use. // 当前不存在连接,直接返回 -1,不进行清理任务 cleanupRunning = false; return -1; } } // 立即关闭过期连接 closeQuietly(longestIdleConnection.socket());
privateintpruneAndGetAllocationCount(RealConnection connection, long now){ //连接弱引用列表 List<Reference<StreamAllocation>> references = connection.allocations; for (int i = 0; i < references.size(); ) { //获取引用连接 Reference<StreamAllocation> reference = references.get(i); //不为null ,表示当前连接尚未回收 if (reference.get() != null) { i++; continue; }
// We've discovered a leaked allocation. This is an application bug. StreamAllocation.StreamAllocationReference streamAllocRef = (StreamAllocation.StreamAllocationReference) reference; String message = "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
// If this was the last allocation, the connection is eligible for immediate eviction. //所有引用都被移除,表示当前连接处于空闲 if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return0; } }
for (int i = 0, size = cookieStrings.size(); i < size; i++) { Cookie cookie = Cookie.parse(url, cookieStrings.get(i)); if (cookie == null) continue; if (cookies == null) cookies = new ArrayList<>(); cookies.add(cookie); }
return cookies != null ? Collections.unmodifiableList(cookies) : Collections.<Cookie>emptyList(); }
// Try each address for best behavior in mixed IPv4/IPv6 environments. List<InetAddress> addresses = address.dns().lookup(socketHost);//通过配置的DNS去解析对应域名的IP列表 if (addresses.isEmpty()) { thrownew UnknownHostException(address.dns() + " returned no addresses for " + socketHost); } //dns解析结束监听 eventListener.dnsEnd(call, socketHost, addresses);
for (int i = 0, size = addresses.size(); i < size; i++) { InetAddress inetAddress = addresses.get(i); inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort)); } } }