背景
项目中用到腾讯COS去存储、查询文件,但是未做分块上传的优化逻辑,所以决定增加大文件分块上传逻辑
步骤
腾讯COS官网显示分块上传有7个方法
Initiate Multipart Upload
实现初始化分块上传,成功执行此请求后将返回 UploadId,用于后续的 Upload Part 请求。Upload Part
实现将对象按照分块的方式上传到 COS。最多支持10000分块,每个分块大小为1MB - 5GB,最后一个分块可以小于1MB。Upload Part -Copy
实现将一个对象的分块内容从源路径复制到目标路径。通过指定 x-cos-copy-source 来指定源对象,x-cos-copy-source-range 指定字节范围(允许分块的大小为1MB - 5GB)。Complete Multipart Upload
实现完成整个分块上传。当使用 Upload Part 上传所有分块完成后,必须调用该 API 来完成整个文件的分块上传。在使用该 API 时,您必须在请求体中给出每一个块的 PartNumber 和 ETag,用来校验块的准确性。由于分块上传完成后需要合并,而合并需要数分钟时间,因而当合并分块开始时,COS 会立即返回200的状态码并指定 Transfer-Encoding: chunked 响应头部,在合并的过程中,COS 会周期性的使用 chunked 方式返回空格信息来保持连接活跃,直到合并完成,COS 会在最后一个 chunk 中返回合并完成后整个对象的信息。
Abort Multipart Upload
实现舍弃一个分块上传并删除已上传的块。当您调用 Abort Multipart Upload 时,如果有正在使用这个 Upload Parts 上传块的请求,则 Upload Parts 会返回失败。当该 UploadId 不存在时,会返回404 NoSuchUpload。List Multipart Uploads
用于查询正在进行中的分块上传任务。单次请求操作最多列出1000个正在进行中的分块上传。List Parts
查询特定分块上传中的已上传的块,即罗列出指定 UploadId 所属的所有已上传成功的分块。
踩坑一
直接使用Upload Part 方法,发现缺少必要入参UploadId,通读七个接口才明白这是一套组合接口
踩坑二
我从上到下调用了 Initiate Multipart Upload 、Upload Part、Complete Multipart Upload 方法,一块上传成功,分块上传失败:Read timed out,原因是在uploadPart方法这里执行while逻辑太久,猜测是自己的字节数组设置的太小,于是设置大一些,有原来的 byte[] buffer = new byte[1024] 改为 byte[] buffer = new byte[1024*512],虽然执行时间少了些,但是执行到UploadPartResult uploadPartResult = cosClient.uploadPart(uploadPartRequest); 处还会抛出异常:Read timed out。于是我决定上网查查原因,没查到,于是请教同事
InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(cosConfig.getBucketName(), key);
COSClient cosClient = getClient();
//初始化分块上传
InitiateMultipartUploadResult initiateMultipartUploadResult = cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
List<PartETag> partETagList = new ArrayList<>();
//1. 创建流
InputStream in = inputStream;
//2. 读写数据
byte[] buffer = new byte[1024];
int len ;
List<Integer> sizeList = new ArrayList<>();
while ((len = in.read(buffer))!=-1) {
sizeList.add(len);
// os.write(buffer, 0, len);
in.read(buffer,0,len);
//实现将对象按照分块的方式上传到 COS。最多支持10000分块,每个分块大小为1MB - 5GB,最后一个分块可以小于1MB。
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setUploadId(initiateMultipartUploadResult.getUploadId());
uploadPartRequest.setInputStream(in);
uploadPartRequest.setKey(key);
uploadPartRequest.setPartSize(len);
uploadPartRequest.setBucketName(cosConfig.getBucketName());
uploadPartRequest.setPartNumber(sizeList.size());
UploadPartResult uploadPartResult = cosClient.uploadPart(uploadPartRequest);
PartETag partETag = new PartETag(1, uploadPartResult.getETag());
partETagList.add(partETag);
}
//3. 关闭流
in.close();
// os.close();
//实现完成整个分块上传
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(cosConfig.getBucketName(), key, initiateMultipartUploadResult.getUploadId(), partETagList);
CompleteMultipartUploadResult completeMultipartUploadResult = cosClient.completeMultipartUpload(completeMultipartUploadRequest);
踩坑三
请教同事后,通知指出我的问题出在
1、uploadPartRequest.setInputStream(in); 这里的in其实上传的还是整体的文件
2、在while内我上传的是同一个文件
3、 in.read(buffer,0,len);其实是把从0 开始截取len长度的内容都放到buffer字节数组李,我应该上传这个buffer,修改代码后,测试终于成功!
踩坑四
如果是大文件的话,单线程上传代码会很慢,考虑到性能,我添加了多线程(代码如下),可是执行程序会出现异常400,显示XML格式未经验证,不符合COS的规格,我走本地代码,发现是线程的问题,因为线程代码还没走完,就走后面的逻辑,自然会出问题,于是我添加了CountDownLatch,保证子线程都走完才走后面的逻辑
添加线程后:
public String uploadPart(String key, MultipartFile file) throws InterruptedException, IOException {
COSClient cosClient = null;
Long batch = null;
cosClient = getClient();
byte[] fileByte = file.getBytes();
// 计算文件总大小
long totalSize = fileByte.length;
// 设置分块大小:1M
byte data[] = new byte[1024 * 1024*10];
int batchSize = data.length;
// 计算分块数
batch = totalSize / batchSize + (totalSize % batchSize > 0 ? 1 : 0);
try {
//初始化分块上传
InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(cosConfig.getBucketName(), key);
InitiateMultipartUploadResult initiateMultipartUploadResult = cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
//文件分块
List<PartETag> partETagList = new ArrayList<>();
Map<Integer, InputStream> uploadPart = new HashMap<>();
for (int i = 0; i < batch; i++) {
// 如果是最后一个分块,需要重新计算分块大小
long partSize = batchSize;
if (i == batch - 1) {
partSize = totalSize - i * batchSize;
}
int from=i * batchSize;
int to=(int) partSize + (i * batchSize);
//文件分块
byte[] partByte=Arrays.copyOfRange(fileByte,from, to);
InputStream input = new ByteArrayInputStream(partByte);
uploadPart.put(i + 1, input);
}
Long finalBatch = batch;
//多线程上传分块文件
uploadPart.forEach((k, v)->{
pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
PartETag partETag = uploadPartFile(initiateMultipartUploadResult.getUploadId(), key, v, k, finalBatch.intValue());
partETagList.add(partETag);
return true;
}
});
});
//主线程
latch.await();//阻塞当前线程直到latch中数值为零才执行
System.out.println("主线程执行!");
//实现完成整个分块上传
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(cosConfig.getBucketName(), key, initiateMultipartUploadResult.getUploadId(), partETagList);
cosClient.completeMultipartUpload(completeMultipartUploadRequest);
String url = getUrl(key);
log.info("key:{},url:{}", key, url);
return url;
} finally {
if (cosClient != null) {
cosClient.shutdown();
}
}
}
public PartETag uploadPartFile(String uploadId, String key, InputStream input, int partNumber,int batch) throws IOException {
System.out.println("共" + batch + "块,正在进行第" + partNumber + "块");
if (partNumber>10000) {
throw new CosClientException("分块数量超过最大限制1000,分块数量为:"+partNumber);
}
COSClient cosClient = getClient();
//实现将对象按照分块的方式上传到 COS。最多支持10000分块,每个分块大小为1MB - 5GB,最后一个分块可以小于1MB。
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(input);
uploadPartRequest.setKey(key);
uploadPartRequest.setPartSize(input.available());
uploadPartRequest.setBucketName(cosConfig.getBucketName());
uploadPartRequest.setPartNumber(partNumber);
UploadPartResult uploadPartResult = cosClient.uploadPart(uploadPartRequest);
PartETag partETag = new PartETag(partNumber, uploadPartResult.getETag());
return partETag;
}
添加CountDownLatch后:
public String uploadPart(String key, MultipartFile file) throws InterruptedException, IOException {
COSClient cosClient = null;
Long batch = null;
cosClient = getClient();
byte[] fileByte = file.getBytes();
// 计算文件总大小
long totalSize = fileByte.length;
// 设置分块大小:1M
byte data[] = new byte[1024 * 1024*10];
int batchSize = data.length;
// 计算分块数
batch = totalSize / batchSize + (totalSize % batchSize > 0 ? 1 : 0);
try {
//初始化分块上传
InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(cosConfig.getBucketName(), key);
InitiateMultipartUploadResult initiateMultipartUploadResult = cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
//文件分块
List<PartETag> partETagList = new ArrayList<>();
Map<Integer, InputStream> uploadPart = new HashMap<>();
for (int i = 0; i < batch; i++) {
// 如果是最后一个分块,需要重新计算分块大小
long partSize = batchSize;
if (i == batch - 1) {
partSize = totalSize - i * batchSize;
}
int from=i * batchSize;
int to=(int) partSize + (i * batchSize);
//文件分块
byte[] partByte=Arrays.copyOfRange(fileByte,from, to);
InputStream input = new ByteArrayInputStream(partByte);
uploadPart.put(i + 1, input);
}
Long finalBatch = batch;
final CountDownLatch latch= new CountDownLatch(batch.intValue());//使用java并发库concurrent
//多线程上传分块文件
uploadPart.forEach((k, v)->{
pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
PartETag partETag = uploadPartFile(initiateMultipartUploadResult.getUploadId(), key, v, k, finalBatch.intValue());
partETagList.add(partETag);
System.out.println("子线程执行!");
latch.countDown();//让latch中的数值减一
return true;
}
});
});
//主线程
latch.await();//阻塞当前线程直到latch中数值为零才执行
System.out.println("主线程执行!");
//实现完成整个分块上传
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(cosConfig.getBucketName(), key, initiateMultipartUploadResult.getUploadId(), partETagList);
cosClient.completeMultipartUpload(completeMultipartUploadRequest);
String url = getUrl(key);
log.info("key:{},url:{}", key, url);
return url;
} finally {
if (cosClient != null) {
cosClient.shutdown();
}
}
}
public PartETag uploadPartFile(String uploadId, String key, InputStream input, int partNumber,int batch) throws IOException {
System.out.println("共" + batch + "块,正在进行第" + partNumber + "块");
if (partNumber>10000) {
throw new CosClientException("分块数量超过最大限制1000,分块数量为:"+partNumber);
}
COSClient cosClient = getClient();
//实现将对象按照分块的方式上传到 COS。最多支持10000分块,每个分块大小为1MB - 5GB,最后一个分块可以小于1MB。
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(input);
uploadPartRequest.setKey(key);
uploadPartRequest.setPartSize(input.available());
uploadPartRequest.setBucketName(cosConfig.getBucketName());
uploadPartRequest.setPartNumber(partNumber);
UploadPartResult uploadPartResult = cosClient.uploadPart(uploadPartRequest);
PartETag partETag = new PartETag(partNumber, uploadPartResult.getETag());
return partETag;
}
结语
其实添加线程后,本地测试时间还是挺长的,长达7s,后面会继续优化,希望我的踩坑之旅尽早结束