package com.yang.task.utils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.yang.common.hq.constant.RedisKey;
import com.yang.common.ushq.constant.UsRedisKey;
import com.yang.task.utils.IISFtpClient.DownloadStatus;
import redis.clients.jedis.JedisCluster;
@Component
public class UsIISFtpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private FTPClient ftpClient = new FTPClient();
@Value("${us.iis.ftp.url}")
private String url;
@Value("${us.iis.ftp.username}")
private String username;
@Value("${us.iis.ftp.password}")
private String password;
// 枚举类DownloadStatus代码
public enum DownloadStatus {
Remote_File_Noexist, // 远程文件不存在
Download_Success, // 下载文件成功
Download_Failed // 下载文件失败
}
@Resource(name = "jedisClusterClient")
private JedisCluster jedisCluster;
public boolean retrieveFile(String srcPath,String desPath){
try {
jedisCluster.hset(UsRedisKey.REDIS_NOTICE_DOWNLOAD_LIST,srcPath,desPath);//增加元素
logger.info("下载");
final ExecutorService exec = Executors.newFixedThreadPool(1);
Callable<String> call = new Callable<String>() {
public String call() throws Exception {
DownloadStatus status = download(srcPath,desPath);
if(status.equals(DownloadStatus.Download_Failed)){
logger.info("重连下载");
retrieveFile(srcPath,desPath);//如果没有下载成功重新下载
}else{//下载成功,清除redis
jedisCluster.hdel(UsRedisKey.REDIS_NOTICE_DOWNLOAD_LIST,srcPath);//清除元素
}
return "线程执行完成.";
}
};
try {
Future<String> future = exec.submit(call);
String obj = future.get(120 * 1000, TimeUnit.MILLISECONDS); // 任务处理超时时间设置
logger.info("任务成功返回:" + obj);
} catch (TimeoutException ex) {
logger.error("处理超时啦....");
ex.printStackTrace();
throw new TimeoutException();
}finally{
// 关闭线程池
exec.shutdown();
}
logger.info("完毕");
return true;
} catch (Exception e) {
logger.error("iis-ftp下载文件失败,文件路径:{}",srcPath,e);
try {
TimeUnit.SECONDS.sleep(5);
disconnect();
retrieveFile(srcPath,desPath);//如果没有下载成功重新下载
} catch (Exception e2) {
e2.printStackTrace();
}
}
return false;
}
public DownloadStatus download(String remote, String local)
throws Exception {
checkAndReconnect();
DownloadStatus result = DownloadStatus.Download_Failed;
logger.info("检查文件开始:"+remote);
// 检查远程文件是否存在
FTPFile[] files = null;
try {
files = ftpClient.listFiles(new String(
remote.getBytes("GBK"), "iso-8859-1"));
} catch (Exception e1) {
e1.printStackTrace();
throw new Exception(e1);
}
if (files.length != 1) {
logger.info("远程文件不存在:"+remote);
return DownloadStatus.Remote_File_Noexist;
}
logger.info("检查文件结束:"+remote);
long remoteSize = files[0].getSize();
File f = new File(local);
// 本地存在文件,进行断点下载
if (!f.getParentFile().exists()) {
f.getParentFile().mkdirs();// 目录不存在的情况下,创建目录。
}
OutputStream out = null;
InputStream in = null;
byte[] bytes = new byte[1024];
long step = remoteSize / 100;
long process = 0;
long localSize = 0L;
int c;
if (f.exists()) {
localSize = f.length();
// 判断本地文件大小是否大于远程文件大小
if (localSize >= remoteSize) {
//logger.info("本地文件大于等于远程文件,下载中止");
return DownloadStatus.Download_Success;
}
logger.info("进行断点续传:"+remote);
out = new FileOutputStream(f, true);
ftpClient.setRestartOffset(localSize);
process = localSize / step;
step = remoteSize / 100;
} else{
logger.info("正常下载:"+remote);
out = new FileOutputStream(f);
}
try {
in = ftpClient.retrieveFileStream(new String(remote
.getBytes("GBK"), "iso-8859-1"));
while ((c = in.read(bytes)) != -1) {
out.write(bytes, 0, c);
localSize += c;
long nowProcess = localSize / step;
if (nowProcess > process) {
process = nowProcess;
if (process % 10 == 0)
logger.info("下载"+remote+":进度:" + process+"%");
// TODO 更新文件下载进度,值存放在process变量中
}
}
in.close();
out.close();
boolean upNewStatus = ftpClient.completePendingCommand();
if (upNewStatus&&process==100) {
result = DownloadStatus.Download_Success;
} else {
result = DownloadStatus.Download_Failed;
}
} catch (Exception e) {
e.printStackTrace();
throw new IOException(e);
}finally {
if(in != null){
in.close();
}
if(out != null){
out.close();
}
}
return result;
}
public void checkAndReconnect() throws IOException{
int count=0;
while(count < 10){
if(ftpClient.isConnected()){
break;
}
ftpClient.connect(url);
ftpClient.setBufferSize(1024);
ftpClient.enterLocalPassiveMode();
ftpClient.setDefaultTimeout(60*1000);
ftpClient.setConnectTimeout(60 * 1000);
ftpClient.setDataTimeout(7200);
ftpClient.login(username, password);
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
count++;
logger.info("重连ftp,第 {}次",count);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void disconnect(){
if (ftpClient.isConnected()) {
try {
ftpClient.disconnect();
} catch (IOException e) {
logger.error("ftp断开异常",e);
}
}
}
}
1、使用redis缓存每次下载的文件路径名,下载完成后,讲key移除,也可以使用数据库。
2、核心,使用递归方式,没下载完的不会跳出递归
3、考虑到ftpclient有listfile会卡死,采用future自定义超时。
有问题留言
版权声明:本文为yangxujia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。