android多线程上传大文件,android大文件上传

一、实现方式选型

需求:1. 支持多文件上传。2. 大文件上传

目前主流的做法有四种:

线程池

后台Service

前台Service

WorkManager

方案

执行速度

生命周期

特点

线程池

马上执行

很短

用来执行耗时操作,这是比较普遍的做法,但是它比较容易在后台被系统杀死。

后台Service

马上执行

后台服务实际上也要用到线程,只是它是一个独立的组件比较容易管理,但是同样也容易被高版本的系统杀死。

前台Service

马上执行

在通知栏通知用户在做什么操作,和应用生命周期绑定,比后台Service优先级高,但是不适合后台任务的场景。

WorkManager

延迟执行

很长

优势在于即使应用退出后依然可以执行后台任务,但任务可能会延迟执行。

WorkManager 的特性其实比较诱人,即使杀了应用依然能把后台任务进行到底,但对于可能上传很大文件的应用来说有些流氓了。。而且可能会延迟执行,不适合我们的场景。

前台Service不适合我们后台上传的场景。

线程池生命周期太短,对于这种持续执行I/O操作的场景还是需要更稳定的方式。

后台Service相对来说是比较好的方案,但同样有缺点,就是在8.0以上的系统很容易被杀死,而且系统内存不足的情况下也会优先被杀死。所以需要做好重启Service,有必要的话还可以重启被中断的任务。

选完核心的组件后需要确定如何实现大文件上传,这方面其实并不需要纠结,基本就是切片上传或长连接上传,这里采取的是切片上传的方案。

最后对于多文件上传的处理:

并发上传。

串行上传。

这里选择了串行上传,一个文件上传完后再上传下一个文件。不选择并发上传的原因是即使多线程上传,线程大部分时间还是在处理I/O问题,每个文件分到的带宽会减少,和单一的按顺序上传文件应该相差不大。而且多文件可能也会给服务端带来压力。最后多线程并发也会增加上传完成后回调处理的复杂度。

最后确定的方案是 后台Service+切片上传+串行上传。

二、串行任务队列

由于需要串行上传,因此需要实现串行队列,其实 java 库已经有类似的类 LinkedBlockingQueue,不过我依然需要一些定制的 api 来实现一些功能,所以直接自己简单实现了一个任务队列,先明确下这个队列的特点:

当一个任务在执行时,下一个任务在队列中等待。

当队列中没任务时,子线程一直在等待,直到队列内添加了新任务。

不能重复添加同一个任务。

实现以上功能需要用到锁相关的知识,这里用的是 ReentrantLock 可重入锁:

private val thread: Thread

private var queue = CopyOnWriteArrayList()

private val lock: ReentrantLock = ReentrantLock()

private val condition: Condition

private var isDone = false

init {

condition = lock.newCondition()

thread = Thread(Runnable {

while (!isDone) {

lock.lock()

try {

// 队列为空则线程进入等待状态,等待其他线程唤醒

while (queue != null && queue.isEmpty()) {

condition.await()

}

} finally {

lock.unlock()

}

// 按入队顺序执行,一次只能执行一个

val task = queue[0]

task.runnable.run()

queue.remove(task)

}

})

thread.start()

}

复制代码

在初始化的时候就会开启一个线程并无限循环保证线程存活,若队列为空则线程进入等待状态,等待此线程被唤醒后便从队列内取出第一个 task 并执行,执行完成后移除任务。

接着看下添加 task 的方法:

fun add(task: Task) {

if(get(task.id) == null){

queue.add(task)

}

lock.lock()

try {

condition.signal()

}finally {

lock.unlock()

}

}

复制代码

在添加完任务之后需要调用 condition.signal() 唤醒正在等待的线程。

在 add 方法内会先判断任务是否已经存在于队列中,接着进入 get 方法看下:

fun get(id: Int): Task?{

lock.lock()

try {

for (task in queue) {

if(task.id == id){

return task

}

}

}finally {

lock.unlock()

}

return null

}

复制代码

遍历队列内是否有相同的id来判断重复任务,因此我们会自定义一个 Task 类来给任务编号:

class Task(val id: Int, val runnable: Runnable)

复制代码

Task 实例是由上层代码实例化并添加到队列的,其实这个 Task 还可以优化,若任务只希望能执行就好,不用管是否重复,那么这个 id 也就不需要上层设置了,可以让 Task 自动生成一个随机 id 即可。

最后给出完整的代码:

package com.myhexin.recorder.util.upload

import java.util.concurrent.CopyOnWriteArrayList

import java.util.concurrent.locks.Condition

import java.util.concurrent.locks.ReentrantLock

/**

* desc: 场景:在子线程中根据任务队列添加顺序串行执行任务

* @author sunxianglei@myhexin.com

* @date 2020/1/8.

*/

class SerialWorker {

private val thread: Thread

private var queue = CopyOnWriteArrayList()

private val lock: ReentrantLock = ReentrantLock()

private val condition: Condition

private var isDone = false

init {

condition = lock.newCondition()

thread = Thread(Runnable {

while (!isDone) {

lock.lock()

try {

while (queue != null && queue.isEmpty()) {

condition.await()

}

} finally {

lock.unlock()

}

// 按入队顺序执行,一次只能执行一个

val task = queue[0]

task.runnable.run()

queue.remove(task)

}

})

thread.start()

}

/**

* 添加任务到队列

*/

fun add(task: Task) {

if(get(task.id) == null){

queue.add(task)

}

lock.lock()

try {

condition.signal()

}finally {

lock.unlock()

}

}

/**

* 根据id获取任务

*/

fun get(id: Int): Task?{

lock.lock()

try {

for (task in queue) {

if(task.id == id){

return task

}

}

}finally {

lock.unlock()

}

return null

}

fun remove(id: Int){

queue.remove(get(id))

}

fun size(): Int{

return queue.size

}

fun shutdown(){

queue.clear()

isDone = true

}

class Task(val id: Int, val runnable: Runnable)

}

复制代码

三、切片上传

之前说到大文件推荐用切片上传或长连接。切片上传本质上就是把文件转成输入流,然后输入流每次读取一定长度的字节数组,再调用后台接口上传这段字节数组即可,如此循环知道上传完文件。

下面是对切片文件的计算:

val file = File(filePath)

// 每个切片文件默认大小为 sliceSize, 切片文件数量sliceNum

val sliceNum = file.length / sliceSize

// 最后一片切片的大小

val lastSliceSize = file.length() - sliceSize * sliceNum

if(lastSliceSize != 0){

sliceNum++

}

复制代码

切片上传会有一个校验的过程,若上次上传到一半就中断了的话,下次找到这个文件位置继续上传。

fun checkChunkFile(chunkNumber: Int) {

if(chunkNumber > sliceNum){

// 整个文件校验失败代表文件块都已经上传, 再调一次merge,让服务端合并所有文件块

mergeFile()

return

}

// 这里进行校验的网络请求并获取结果,校验成功表示文件块未上传,失败表示已上传。

// ......

if(成功){

// 跳过校验成功的字节数

val inputStream = skipUploadedBytes(chunkNumber)

uploadChunkFile(inputStream,chunkNumber)

}else(失败){

// 递归校验直到校验成功或所有校验完成

checkChunkFile(chunkNumber + 1)

}

}

private fun skipUploadedBytes(chunkNumber: Int): InputStream{

val inputStream = BufferedInputStream(file.inputStream())

if(chunkNumber <= 1){

return inputStream

}

val skipLen = (chunkNumber - 1) * sliceSize

val bytes = ByteArray(skipLen)

// 跳过已经上传的字节

inputStream.read(bytes, 0, skipLen)

return inputStream

}

复制代码

接着看上传的伪代码:

fun uploadChunkFile(inputStream: InputStream, chunkNumber: Int){

if(chunkNumber > sliceNum){

// 所有文件上传完成,通知服务端合并文件块

mergeFile()

return

}

var len = sliceSize

if(chunkNumber == sliceNum){

len = lastSliceSize

}

val bytes = ByteArray(len)

inputStream.read(bytes, 0, len)

// 这里执行上传的网络请求,把 bytes 上传

// ......

if(成功){

// 这一块上传成功,继续上传下一块

uploadChunkFile(inputStream, chunkNumber + 1)

}

}

复制代码

mergeFile 就只是发个请求通知下服务端合并文件。整体的上传逻辑完成。

四、Service后台服务

class UploadService : Service() {

override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {

LogUtils.d("启动UploadService")

if(uploadManager == null){

uploadManager = UploadManager()

}

return START_STICKY

}

}

复制代码

UploadManager 就是刚刚切片上传的管理类,Service 只持有这个管理类实例,具体实现让管理类做,包括对 SerialWorker 的调用,判断任务是否重复的逻辑等。 Service 比较关键的是在 onStartCommand 方法内返回 START_STICKY 表示若服务被系统杀死则自动重启。

五、总结

以上就是整个大文件上传的流程:

用串行阻塞队列让上传文件的任务排队,一次只能执行一个任务,先入队的先执行。

大文件进行切片上传,切片的时候检验文件并跳过已经上传的文件部分。

用 Service 作为容器,被杀死时自重启。

其实还有需要优化的点,例如在 Service 重启后是否需要重新上传未上传的文件?如果需要思路就是在 Service 的 onDestroy 方法内保存队列内目前的存在的任务,等 Service 重启后重新入队。当然如果应用被杀死了的话任务是无论如何也恢复不了的。这种情况还想上传就需要用到 WorkManager 这种官方推荐的后台任务了。