// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { AsyncCallexistingCall= findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCallasyncCall= i.next(); //正在运行的异步请求不能超过 64个 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. //在同一个Host下的异步请求不能超过5个 if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); }
finalStringmethod= userResponse.request().method(); switch (responseCode) { // 407 需要进行代理认证 case HTTP_PROXY_AUTH: ProxyselectedProxy= route != null ? route.proxy() : client.proxy(); if (selectedProxy.type() != Proxy.Type.HTTP) { thrownewProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy"); } return client.proxyAuthenticator().authenticate(route, userResponse); // 401 未经认证 case HTTP_UNAUTHORIZED: return client.authenticator().authenticate(route, userResponse); // 301 永久重定向 302 临时重定向 只有GET、HEAD请求方法才有效 case HTTP_PERM_REDIRECT: case HTTP_TEMP_REDIRECT: // "If the 307 or 308 status code is received in response to a request other than GET // or HEAD, the user agent MUST NOT automatically redirect the request" if (!method.equals("GET") && !method.equals("HEAD")) { returnnull; } // fall-through
case HTTP_MULT_CHOICE:// 300 多个重定向地址 case HTTP_MOVED_PERM:// 301 永久移除 指向了新的位置 case HTTP_MOVED_TEMP://302 临时移除 case HTTP_SEE_OTHER://303 查看其他位置 // 开发者是否允许重定向 if (!client.followRedirects()) returnnull; //重定向后的实际地址 Stringlocation= userResponse.header("Location"); if (location == null) returnnull; HttpUrlurl= userResponse.request().url().resolve(location);
// Don't follow redirects to unsupported protocols. if (url == null) returnnull;
// If configured, don't follow redirects between SSL and non-SSL. booleansameScheme= url.scheme().equals(userResponse.request().url().scheme()); if (!sameScheme && !client.followSslRedirects()) returnnull;
// Most redirects don't include a request body. Request.BuilderrequestBuilder= userResponse.request().newBuilder(); if (HttpMethod.permitsRequestBody(method)) { finalbooleanmaintainBody= HttpMethod.redirectsWithBody(method); if (HttpMethod.redirectsToGet(method)) { requestBuilder.method("GET", null); } else { RequestBodyrequestBody= maintainBody ? userResponse.request().body() : null; requestBuilder.method(method, requestBody); } if (!maintainBody) { requestBuilder.removeHeader("Transfer-Encoding"); requestBuilder.removeHeader("Content-Length"); requestBuilder.removeHeader("Content-Type"); } }
// When redirecting across hosts, drop all authentication headers. This // is potentially annoying to the application layer since they have no // way to retain them. if (!sameConnection(userResponse.request().url(), url)) { requestBuilder.removeHeader("Authorization"); }
return requestBuilder.url(url).build(); //408 超时 case HTTP_CLIENT_TIMEOUT: // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (!client.retryOnConnectionFailure()) { // The application layer has directed us not to retry the request. returnnull; }
if (userResponse.priorResponse() != null && userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) { // We attempted to retry and got another timeout. Give up. returnnull; }
if (retryAfter(userResponse, 0) > 0) { returnnull; }
return userResponse.request(); //503 服务端不可用 case HTTP_UNAVAILABLE: if (userResponse.priorResponse() != null && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) { // We attempted to retry and got another timeout. Give up. returnnull; }
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) { // specifically received an instruction to retry without delay return userResponse.request(); }
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. booleantransparentGzip=false; //默认使用Gzip压缩 if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } //设置 Cookie信息 List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } //设置UA if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } //传递至下一个拦截器处理 ResponsenetworkResponse= chain.proceed(requestBuilder.build());
// We need the network to satisfy this request. Possibly for validating a conditional GET. booleandoExtensiveHealthChecks= !request.method().equals("GET"); //建立连接 Exchangeexchange= transmitter.newExchange(chain, doExtensiveHealthChecks); //继续请求下一个拦截器 return realChain.proceed(request, transmitter, exchange); } }
booleanresponseHeadersStarted=false; Response.BuilderresponseBuilder=null; //判断当前是否有 请求体body if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. //如果是1XX的话 表示当前需要等服务端响应 if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); }
if (responseBuilder == null) { //写入请求体 if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSinkbufferedRequestBody= Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSinkbufferedRequestBody= Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); }
if (request.body() == null || !request.body().isDuplex()) { //结束请求 exchange.finishRequest(); }
if (!responseHeadersStarted) { exchange.responseHeadersStart(); } //得到响应头 if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); }
Responseresponse= responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); //读取响应体内容 intcode= response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build();
code = response.code(); }
exchange.responseHeadersEnd(response); //forWebSocket 表示为socket连接方式 if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } // close表示关闭连接 if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); }
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) thrownewAssertionError();
calls++;
// 存在已经在使用的流,直接进行复用 if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) { thrownewIllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }
// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.exchange != null && calls > 1) { thrownewIllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }
// Confirm that the next interceptor made its required call to chain.proceed(). if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { thrownewIllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }
// Confirm that the intercepted response isn't null. if (response == null) { thrownewNullPointerException("interceptor " + interceptor + " returned null"); }
if (response.body() == null) { thrownewIllegalStateException( "interceptor " + interceptor + " returned a response with no body"); }
// Find a condition to add to the request. If the condition is satisfied, the response body // will not be transmitted. String conditionName; String conditionValue; if (etag != null) { conditionName = "If-None-Match"; conditionValue = etag; } elseif (lastModified != null) { conditionName = "If-Modified-Since"; conditionValue = lastModifiedString; } elseif (servedDate != null) { conditionName = "If-Modified-Since"; conditionValue = servedDateString; } else { returnnewCacheStrategy(request, null); // No condition! Make a regular request. } //交由服务端去进行判断 Headers.BuilderconditionalRequestHeaders= request.headers().newBuilder(); Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
// If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } }
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; }
return candidate; } }
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { booleanfoundPooledConnection=false; RealConnectionresult=null; RouteselectedRoute=null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownewIOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt.
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null;
if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection; releasedConnection = null; }
if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else { selectedRoute = previousRoute; } } } closeQuietly(toClose);
if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; }
// If we need a route selection, make one. This is a blocking operation. booleannewRouteSelection=false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }
List<Route> routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownewIOException("Canceled");
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } //没有从连接池中获取到连接需要重新建立 if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = newRealConnection(connectionPool, selectedRoute); connectingConnection = result; } }
// If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }
// Do TCP + TLS handshakes. This is a blocking operation. // 开始TCP三次握手以及TLS操作,为阻塞操作 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route());
Socketsocket=null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; } else { //加入连接池 等待复用 connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop. if (keepAliveDuration <= 0) { thrownewIllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } } }
// We need the network to satisfy this request. Possibly for validating a conditional GET. booleandoExtensiveHealthChecks= !request.method().equals("GET"); HttpCodechttpCodec= streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//建立新连接 RealConnectionconnection= streamAllocation.connection();
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)throws IOException { while (true) { RealConnectioncandidate= findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); ...
return candidate; } }
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { ... synchronized (connectionPool) { if (released) thrownewIllegalStateException("released"); if (codec != null) thrownewIllegalStateException("codec != null"); if (canceled) thrownewIOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); ... if (result == null) { // Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this, null);//根据address从连接池中获取对应连接 if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } ... if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; }
...
synchronized (connectionPool) { if (canceled) thrownewIOException("Canceled");
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (inti=0, size = routes.size(); i < size; i++) { Routeroute= routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } }
if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = newRealConnection(connectionPool, selectedRoute);//连接池未找到对应连接,建立新连接 acquire(result, false); } }
// Pool the connection. Internal.instance.put(connectionPool, result);//将新建的连接加入到连接池内
// If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } }
/** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. * 清理过期的连接,且保证最多只能运行一个清理线程。 */ privatestaticfinalExecutorexecutor=newThreadPoolExecutor(0/* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L/* keepAliveTime */, TimeUnit.SECONDS, newSynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
// Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {//遍历连接储存队列 RealConnectionconnection= i.next();
// If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) {//标记正在使用的活跃连接 inUseConnectionCount++; continue; }
idleConnectionCount++;//非活跃标记为空闲连接
// If the connection is ready to be evicted, we're done. longidleDurationNs= now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection;//得到最长空闲时间的连接 } }
if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). // 空闲连接超过`maxIdleConnections`个或者空闲时间超过`keepAliveDurationNs`,需要清理该连接 connections.remove(longestIdleConnection); } elseif (idleConnectionCount > 0) { // A connection will be ready to evict soon. // 返回最大空闲连接的到期时间,等待到达时间后进行清理 return keepAliveDurationNs - longestIdleDurationNs; } elseif (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. // 所有都是活跃连接,返回最大空闲连接时间,等待到达时间后清理 return keepAliveDurationNs; } else { // No connections, idle or in use. // 当前不存在连接,直接返回 -1,不进行清理任务 cleanupRunning = false; return -1; } } // 立即关闭过期连接 closeQuietly(longestIdleConnection.socket());
privateintpruneAndGetAllocationCount(RealConnection connection, long now) { //连接弱引用列表 List<Reference<StreamAllocation>> references = connection.allocations; for (inti=0; i < references.size(); ) { //获取引用连接 Reference<StreamAllocation> reference = references.get(i); //不为null ,表示当前连接尚未回收 if (reference.get() != null) { i++; continue; }
// We've discovered a leaked allocation. This is an application bug. StreamAllocation.StreamAllocationReferencestreamAllocRef= (StreamAllocation.StreamAllocationReference) reference; Stringmessage="A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
// If this was the last allocation, the connection is eligible for immediate eviction. //所有引用都被移除,表示当前连接处于空闲 if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return0; } }
// Try each address for best behavior in mixed IPv4/IPv6 environments. List<InetAddress> addresses = address.dns().lookup(socketHost);//通过配置的DNS去解析对应域名的IP列表 if (addresses.isEmpty()) { thrownewUnknownHostException(address.dns() + " returned no addresses for " + socketHost); } //dns解析结束监听 eventListener.dnsEnd(call, socketHost, addresses);
for (inti=0, size = addresses.size(); i < size; i++) { InetAddressinetAddress= addresses.get(i); inetSocketAddresses.add(newInetSocketAddress(inetAddress, socketPort)); } } }