OkHttp原理

一、前言

OkHttp是目前较为常用的网络框架,了解其原理能更方便我们的使用。这里根据对其进行简单记录。

二、简单使用

OkHttp是一个网络访问框架。支持GETPOST等请求。支持同步、异步的请求方式,以及其它功能。这里从最简单的使用方式开始着手。

添加依赖:

implementation 'com.squareup.okhttp3:okhttp:4.9.3'
    fun startNet(){
        val httpUrl = "http://www.baidu.com"
        val client = OkHttpClient()
        val request = Request.Builder()
            .url(httpUrl)
            .build()
//        val response = client.newCall(request).execute()//同步
        val response = client.newCall(request).enqueue(object : Callback{
            override fun onFailure(call: Call, e: IOException) {
            }

            override fun onResponse(call: Call, response: Response) {
            }
        })
        val result = response.body?.string()
        Log.e("YM--->","获取的网络内容:${result}")
    }

配置上网络权限后就可以进行使用了。

三、整体结构

OkHttp整体由OkHttpClientRequestResPonseCallDispatchInterator组成的。这几个的含义大概意思如下:

  • OkHttpClient: 一个okhttp的整个对外访问的类。

  • Request: 处理关于请求方面的操作

  • Response: 处理关于响应方面的操作

  • Call: 一个接口,主要是用来接受Request和返回Response。并做些其他相关操作

  • Dispatch: 一个分发器。由于OkHttp是支持并发的网络框架,所以里面用到了线程池和队列的逻辑,这歌分发器决定何时处理请求,以及将请求分发到哪一部分。

  • Interator: 拦截器,在OkHttp中,主要是通过拦截器做网络处理,最终底层通过Socket请求完后将数据逐层返回,其结构采用责任链模式。

四、请求执行流程

​ 这里主要侧重于异步网络请求的过程。首先会通过建造者模式创建OkHttpClient(默认的构造方式创建,其内部也是建造者模式),在创建的时候会创建默认的Dispatch。然后通过调用newCall(request: Request)创建一个Call对象。其创建方式由自类RealCall创建。

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  ...
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
  ...
}

返回Call对象后调用同步函数execute()或者异步函数enqueue(responseCallback: Callback)执行。同步函数会返回Response对象,异步函数则会由回调返回Response对象。由于Call的示例对象是RealCall。所以其调用的enqueue(responseCallback: Callback)函数也是RealCall中的。其代码如下:

  override fun enqueue(responseCallback: Callback) {
    ...
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

可以看到其最终是调用的OkHttpClient中的Dispatchenqueue(call: AsyncCall)函数。其定义如下:

class Dispatcher constructor() {
  ...
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
	...
}

在这里将AsyncCall的对象添加进readyAsyncCalls队列中。其AsyncCall定义如下:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  ...
internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
	...
}
  ...
}

可以看到这个类实现的是Runnable并不是Call接口。这里再看存储的队列,定义如下:

class Dispatcher constructor() {
	...
  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()
  ...
}

可以看到有三个队列readyAsyncCallsrunningAsyncCallsrunningSyncCalls。他们的含义如下:

  • readyAsyncCalls: 准备进行的异步请求队列,存储还没有开始请求的队列
  • runningAsyncCalls:存储正在进行的异步请求队列
  • runningSyncCalls:存储正在进行的同步请求队列。

添加完队列后执行promoteAndExecute()函数,定义如下:

class Dispatcher constructor() {
  	@get:Synchronized var maxRequests = 64
  ...
  
  @get:Synchronized var maxRequestsPerHost = 5
  ...
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
}

可以看到这里对readyAsyncCalls进行遍历。并判断其是否满足需求,如果满足的话就将该请求从readyAsyncCalls中移除并添加进executableCallsrunningAsyncCalls。然后遍历executableCalls,将其添加进线程池中,其定义如下:

class Dispatcher constructor() {
  ...
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }
  ...
}

可以知道该线程池是一个最高并发的缓存池(其代码和JavaExecutors.newCachedThreadPool()构造线程池的方式几乎一模一样,不了解的话可以百度这个)。添加进线程池的方式定义如下:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  ...
internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
	 @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get() = originalRequest.url.host

    val request: Request
        get() = originalRequest

    val call: RealCall
        get() = this@RealCall

    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
}
  ...
}

可以看到在AsyncCall::executeOn(executorService: ExecutorService)中通过executorService.execute(this)添加进线程池。当该线程运行时候就会执行run()函数。在run()函数中通过调用getResponseWithInterceptorChain()获取到返回值Response。然后将返回值传递给当初传进来的CallBack。在run()函数的最后执行client.dispatcher.finished(this)进行结束。这里看下代码定义:

class Dispatcher constructor() {
  ...
internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }
  ...
}

可以看到最终在这里请求结束的任务从runningSyncCalls中移除,并再此调用promoteAndExecute()执行新的请求。

五、Interceptor

​ 上文说到在RealCall中通过拦截器val response = getResponseWithInterceptorChain()获取到Response。这里对其进行一个详细的了解。这里代码定义如下:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  ...
@Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }
  ...
}

可以看到这里将所有拦截器进行添加最终返回Response。其采用责任链模式进行处理,其简要实现参考以下链接:https://blog.csdn.net/Mr_Tony/article/details/123168997 .由于责任链模式的特性,在前面全部对其处理完后用CallServerInterceptor进行Socket连接。当然其链接过程并不是写在这个类里面,而是通过Exchange及其底层进行处理。这里暂时不再进行深究。

六、其余问题

通过上述过程对其有了简单的了解。这里对其它没涉及到的问题进行记录。不过并未深究,可能存在问题

1、缓存问题

​ OkHttp缓存是通过CacheInterceptor进行缓存的。其最终是通过Header的Cache-Control关键字进行控制。

2、高并发问题

​ OkHttp是如何实现高并发的。这里是通过无限线程池进行实现高并发,又通过请求队列对其数量进行控制防止OOM。

3、Socket缓存池复用问题

​ 上述对线程池有两个判断,一个是最大线程数,一个是ip数进行限制,当超过默认的5个后不添加到运行队列里面。其根本在于每次请求需要经过TCP/IP的三次握手,四次挥手。为了提高效率,所以这里使用Socket将同一个ip的多次请求进行了合并,如果一个ip的网络请求完后,后面还有同样ip的请求,就先不断开链接,继续使用,直到没有了,或者触发其他问题了再进行关掉。这样比每次请求都建立链接,再关闭链接会提高很多性能。

七、参考链接

  1. OkHttp
  2. 设计模式-责任链模式

版权声明:本文为Mr_Tony原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。