一、前言
OkHttp是目前较为常用的网络框架,了解其原理能更方便我们的使用。这里根据对其进行简单记录。
二、简单使用
OkHttp是一个网络访问框架。支持GET、POST等请求。支持同步、异步的请求方式,以及其它功能。这里从最简单的使用方式开始着手。
添加依赖:
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
fun startNet(){
val httpUrl = "http://www.baidu.com"
val client = OkHttpClient()
val request = Request.Builder()
.url(httpUrl)
.build()
// val response = client.newCall(request).execute()//同步
val response = client.newCall(request).enqueue(object : Callback{
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})
val result = response.body?.string()
Log.e("YM--->","获取的网络内容:${result}")
}
配置上网络权限后就可以进行使用了。
三、整体结构
OkHttp整体由OkHttpClient、Request、ResPonse、Call、Dispatch、Interator组成的。这几个的含义大概意思如下:
OkHttpClient: 一个okhttp的整个对外访问的类。Request: 处理关于请求方面的操作Response: 处理关于响应方面的操作Call: 一个接口,主要是用来接受Request和返回Response。并做些其他相关操作Dispatch: 一个分发器。由于OkHttp是支持并发的网络框架,所以里面用到了线程池和队列的逻辑,这歌分发器决定何时处理请求,以及将请求分发到哪一部分。Interator: 拦截器,在OkHttp中,主要是通过拦截器做网络处理,最终底层通过Socket请求完后将数据逐层返回,其结构采用责任链模式。
四、请求执行流程
这里主要侧重于异步网络请求的过程。首先会通过建造者模式创建OkHttpClient(默认的构造方式创建,其内部也是建造者模式),在创建的时候会创建默认的Dispatch。然后通过调用newCall(request: Request)创建一个Call对象。其创建方式由自类RealCall创建。
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
...
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
...
}
返回Call对象后调用同步函数execute()或者异步函数enqueue(responseCallback: Callback)执行。同步函数会返回Response对象,异步函数则会由回调返回Response对象。由于Call的示例对象是RealCall。所以其调用的enqueue(responseCallback: Callback)函数也是RealCall中的。其代码如下:
override fun enqueue(responseCallback: Callback) {
...
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
可以看到其最终是调用的OkHttpClient中的Dispatch的enqueue(call: AsyncCall)函数。其定义如下:
class Dispatcher constructor() {
...
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
...
}
在这里将AsyncCall的对象添加进readyAsyncCalls队列中。其AsyncCall定义如下:
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
}
...
}
可以看到这个类实现的是Runnable并不是Call接口。这里再看存储的队列,定义如下:
class Dispatcher constructor() {
...
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
...
}
可以看到有三个队列readyAsyncCalls、runningAsyncCalls、runningSyncCalls。他们的含义如下:
readyAsyncCalls: 准备进行的异步请求队列,存储还没有开始请求的队列runningAsyncCalls:存储正在进行的异步请求队列runningSyncCalls:存储正在进行的同步请求队列。
添加完队列后执行promoteAndExecute()函数,定义如下:
class Dispatcher constructor() {
@get:Synchronized var maxRequests = 64
...
@get:Synchronized var maxRequestsPerHost = 5
...
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
}
可以看到这里对readyAsyncCalls进行遍历。并判断其是否满足需求,如果满足的话就将该请求从readyAsyncCalls中移除并添加进executableCalls和runningAsyncCalls。然后遍历executableCalls,将其添加进线程池中,其定义如下:
class Dispatcher constructor() {
...
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
...
}
可以知道该线程池是一个最高并发的缓存池(其代码和Java的Executors.newCachedThreadPool()构造线程池的方式几乎一模一样,不了解的话可以百度这个)。添加进线程池的方式定义如下:
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
...
}
可以看到在AsyncCall::executeOn(executorService: ExecutorService)中通过executorService.execute(this)添加进线程池。当该线程运行时候就会执行run()函数。在run()函数中通过调用getResponseWithInterceptorChain()获取到返回值Response。然后将返回值传递给当初传进来的CallBack。在run()函数的最后执行client.dispatcher.finished(this)进行结束。这里看下代码定义:
class Dispatcher constructor() {
...
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
...
}
可以看到最终在这里请求结束的任务从runningSyncCalls中移除,并再此调用promoteAndExecute()执行新的请求。
五、Interceptor
上文说到在RealCall中通过拦截器val response = getResponseWithInterceptorChain()获取到Response。这里对其进行一个详细的了解。这里代码定义如下:
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
...
}
可以看到这里将所有拦截器进行添加最终返回Response。其采用责任链模式进行处理,其简要实现参考以下链接:https://blog.csdn.net/Mr_Tony/article/details/123168997 .由于责任链模式的特性,在前面全部对其处理完后用CallServerInterceptor进行Socket连接。当然其链接过程并不是写在这个类里面,而是通过Exchange及其底层进行处理。这里暂时不再进行深究。
六、其余问题
通过上述过程对其有了简单的了解。这里对其它没涉及到的问题进行记录。不过并未深究,可能存在问题
1、缓存问题
OkHttp缓存是通过CacheInterceptor进行缓存的。其最终是通过Header的Cache-Control关键字进行控制。
2、高并发问题
OkHttp是如何实现高并发的。这里是通过无限线程池进行实现高并发,又通过请求队列对其数量进行控制防止OOM。
3、Socket缓存池复用问题
上述对线程池有两个判断,一个是最大线程数,一个是ip数进行限制,当超过默认的5个后不添加到运行队列里面。其根本在于每次请求需要经过TCP/IP的三次握手,四次挥手。为了提高效率,所以这里使用Socket将同一个ip的多次请求进行了合并,如果一个ip的网络请求完后,后面还有同样ip的请求,就先不断开链接,继续使用,直到没有了,或者触发其他问题了再进行关掉。这样比每次请求都建立链接,再关闭链接会提高很多性能。