阅读 171

OkHttp源码剖析(四) 报文读写工具ExchangeCodec

概念明确

codec

codec是网络报文交互的读写工具。

用途:encode HTTP 请求,decode HTTP 响应 。

codec通过健康连接创立,它不仅包含编码接码算法,也包含可用的健康连接,只需要往codec写东西就可以发请求,向codec读东西可以拿到返回的报文信息。

Exchange

Exchange 是网络报文交互的读写管理员,内部调用 codec去进行读写操作。

传送一个 HTTP 请求和响应对,该层连接管理和 ExchangeCodec(实际处理 I/O) 的事件。

源码分析

获取codec

通过前面几篇文章的分析我们知道,OkHttp会在ConnectInterceptor中会进行网络的连接,如下图所示。连接步骤最终会调用到ExchangeFinder中的RealConnection.newCodec()方法

  fun find(     client: OkHttpClient,     chain: RealInterceptorChain   ): ExchangeCodec {     try {       val resultConnection = findHealthyConnection(           ...       )       return resultConnection.newCodec(client, chain)     } catch (e: RouteException) {            }   } 复制代码

RealConnection.newCodec()通过之前拿到的健康连接,建立了一个codec,下面我们具体看一看codec建立方法:

internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {     val socket = this.socket!!     val source = this.source!!     val sink = this.sink!!     val http2Connection = this.http2Connection     return if (http2Connection != null) {       // 如果是http2,       // 返回http2编码解码器       Http2ExchangeCodec(client, this, chain, http2Connection)     } else {        // 如果不是http2,       // 返回http1编码解码器       socket.soTimeout = chain.readTimeoutMillis()       source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)       sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)       Http1ExchangeCodec(client, this, source, sink)     }   } 复制代码

如代码所示,该方法根据不同的http协议类型建立http2或者http1的编码解码器。

Http1ExchangeCodec

进入 Http1ExchangeCodec(client, this, source, sink)该方法分析下何为编码解码器。以Http1ExchangeCodec中的writeRequest方法为例,可以看到该方法实际上是在拼接Http协议的请求规则,将我们的请求写进数据。

  /** Returns bytes of a request header for sending on an HTTP transport. */   fun writeRequest(headers: Headers, requestLine: String) {     check(state == STATE_IDLE) { "state: $state" }     sink.writeUtf8(requestLine).writeUtf8("\r\n")     for (i in 0 until headers.size) {       sink.writeUtf8(headers.name(i))           .writeUtf8(": ")           .writeUtf8(headers.value(i))           .writeUtf8("\r\n")     }     sink.writeUtf8("\r\n")     state = STATE_OPEN_REQUEST_BODY   } 复制代码

Http2ExchangeCodec

同样以Http2ExchangeCodec中的writeRequestHeaders方法为例,可以看到该方法通过Stream流进行读写,Http2不同于http1的一请求一回应,而是分为多个stream流,每个stream可以有多个请求,多个响应(Server Push)。

override fun writeRequestHeaders(request: Request) {     if (stream != null) return     val hasRequestBody = request.body != null     val requestHeaders = http2HeadersList(request)     stream = http2Connection.newStream(requestHeaders, hasRequestBody)     // We may have been asked to cancel while creating the new stream and sending the request     // headers, but there was still no stream to close.     if (canceled) {       stream!!.closeLater(ErrorCode.CANCEL)       throw IOException("Canceled")     }     stream!!.readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)     stream!!.writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)   } 复制代码

获取Exchange

在获得codec之后,再会看下上面的图,回到val exchange = realChain.call.initExchange(chain)这一行,不要忘了我们一开始出发的地方,我们为什么要获取codec

我们需要将codec塞进Exchange里生成一个Exchange

 internal fun initExchange(chain: RealInterceptorChain): Exchange {     ...     val exchangeFinder = this.exchangeFinder!!     val codec = exchangeFinder.find(client, chain)     val result = Exchange(this, eventListener, exchangeFinder, codec)     ...     return result   } 复制代码

在把codec装在Exchange里 (外观模式)获得Exchange后 ,就可以通过Exchange进行具体的写head,写body等操作。

codec是真正的读写工具,Exchange是管理员,是网络的报文交互,请求响应的来回称为exchange。

报文读写

如下所示,查看Exchange类,可以看到,finishRequest()readResponseHeaders()等方法实际上调用的是ExchangeCodec类的finishRequest()readResponseHeaders()等方法。

 class Exchange(   internal val call: RealCall,   internal val eventListener: EventListener,   internal val finder: ExchangeFinder,   private val codec: ExchangeCodec ) {   ...   @Throws(IOException::class)   fun writeRequestHeaders(request: Request) {     try {       eventListener.requestHeadersStart(call)       codec.writeRequestHeaders(request)       eventListener.requestHeadersEnd(call, request)     } catch (e: IOException) {       ...     }   }   @Throws(IOException::class)   fun createRequestBody(request: Request, duplex: Boolean): Sink {     this.isDuplex = duplex     val contentLength = request.body!!.contentLength()     eventListener.requestBodyStart(call)     val rawRequestBody = codec.createRequestBody(request, contentLength)     return RequestBodySink(rawRequestBody, contentLength)   }   @Throws(IOException::class)   fun flushRequest() {     try {       codec.flushRequest()     } catch (e: IOException) {       ...     }   }   @Throws(IOException::class)   fun finishRequest() {     try {       codec.finishRequest()     } catch (e: IOException) {      ...     }   }   @Throws(IOException::class)   fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {     try {       val result = codec.readResponseHeaders(expectContinue)       result?.initExchange(this)       return result     } catch (e: IOException) {       ...     }   }   fun responseHeadersEnd(response: Response) {     eventListener.responseHeadersEnd(call, response)   }   @Throws(IOException::class)   fun openResponseBody(response: Response): ResponseBody {     try {       val contentType = response.header("Content-Type")       val contentLength = codec.reportedContentLength(response)       val rawSource = codec.openResponseBodySource(response)       val source = ResponseBodySource(rawSource, contentLength)       return RealResponseBody(contentType, contentLength, source.buffer())     } catch (e: IOException) {       ...     }   }   @Throws(SocketException::class)   fun newWebSocketStreams(): RealWebSocket.Streams {     call.timeoutEarlyExit()     return codec.connection.newWebSocketStreams(this)   }   fun noNewExchangesOnConnection() {     codec.connection.noNewExchanges()   }   fun cancel() {     codec.cancel()   } ... } 复制代码

真实用武之地

我们在ConnectInterceptor获取到了Exchange,之前讲到过的责任链模式会继续往下推执行到CallServerInterceptor,该拦截器作为最后责任链最后一环最终会向服务器发送请求接受响应,如下源码所示,可以看到ConnectInterceptor的网络请求最终也都是调用的exchange去执行的。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {   @Throws(IOException::class)   override fun intercept(chain: Interceptor.Chain): Response {     val realChain = chain as RealInterceptorChain     val exchange = realChain.exchange!!     val request = realChain.request     val requestBody = request.body     val sentRequestMillis = System.currentTimeMillis()     exchange.writeRequestHeaders(request)     var invokeStartEvent = true     var responseBuilder: Response.Builder? = null     if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {       // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100       // Continue" response before transmitting the request body. If we don't get that, return       // what we did get (such as a 4xx response) without ever transmitting the request body.       if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {         exchange.flushRequest()         responseBuilder = exchange.readResponseHeaders(expectContinue = true)         exchange.responseHeadersStart()         invokeStartEvent = false       }       if (responseBuilder == null) {         if (requestBody.isDuplex()) {           // Prepare a duplex body so that the application can send a request body later.           exchange.flushRequest()           val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()           requestBody.writeTo(bufferedRequestBody)         } else {           // Write the request body if the "Expect: 100-continue" expectation was met.           val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()           requestBody.writeTo(bufferedRequestBody)           bufferedRequestBody.close()         }       } else {         exchange.noRequestBody()         if (!exchange.connection.isMultiplexed) {           // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection           // from being reused. Otherwise we're still obligated to transmit the request body to           // leave the connection in a consistent state.           exchange.noNewExchangesOnConnection()         }       }     } else {       exchange.noRequestBody()     }     if (requestBody == null || !requestBody.isDuplex()) {       exchange.finishRequest()     }     if (responseBuilder == null) {       responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!       if (invokeStartEvent) {         exchange.responseHeadersStart()         invokeStartEvent = false       }     }     var response = responseBuilder         .request(request)         .handshake(exchange.connection.handshake())         .sentRequestAtMillis(sentRequestMillis)         .receivedResponseAtMillis(System.currentTimeMillis())         .build()     var code = response.code     if (code == 100) {       // Server sent a 100-continue even though we did not request one. Try again to read the actual       // response status.       responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!       if (invokeStartEvent) {         exchange.responseHeadersStart()       }       response = responseBuilder           .request(request)           .handshake(exchange.connection.handshake())           .sentRequestAtMillis(sentRequestMillis)           .receivedResponseAtMillis(System.currentTimeMillis())           .build()       code = response.code     }     exchange.responseHeadersEnd(response)     response = if (forWebSocket && code == 101) {       // Connection is upgrading, but we need to ensure interceptors see a non-null response body.       response.newBuilder()           .body(EMPTY_RESPONSE)           .build()     } else {       response.newBuilder()           .body(exchange.openResponseBody(response))           .build()     }     if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||         "close".equals(response.header("Connection"), ignoreCase = true)) {       exchange.noNewExchangesOnConnection()     }     if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {       throw ProtocolException(           "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")     }     return response   } }


作者:许进进
链接:https://juejin.cn/post/7020348509056401415


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐