java ftp 断点下载,异常重连

package com.yang.task.utils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.yang.common.hq.constant.RedisKey;
import com.yang.common.ushq.constant.UsRedisKey;
import com.yang.task.utils.IISFtpClient.DownloadStatus;

import redis.clients.jedis.JedisCluster;

@Component
public class UsIISFtpClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private FTPClient ftpClient = new FTPClient(); 
    @Value("${us.iis.ftp.url}") 
    private String url;
    @Value("${us.iis.ftp.username}")
    private String username;
    @Value("${us.iis.ftp.password}")
    private String password;
       // 枚举类DownloadStatus代码
    public enum DownloadStatus {
        Remote_File_Noexist, // 远程文件不存在
        Download_Success, // 下载文件成功
        Download_Failed // 下载文件失败
    }
    
    @Resource(name = "jedisClusterClient")
    private JedisCluster jedisCluster;
    public boolean retrieveFile(String srcPath,String desPath){
        try {
            jedisCluster.hset(UsRedisKey.REDIS_NOTICE_DOWNLOAD_LIST,srcPath,desPath);//增加元素
            logger.info("下载");
            final ExecutorService exec = Executors.newFixedThreadPool(1);
            Callable<String> call = new Callable<String>() {
                public String call() throws Exception {
                    DownloadStatus status = download(srcPath,desPath);
                    if(status.equals(DownloadStatus.Download_Failed)){
                        logger.info("重连下载");
                        retrieveFile(srcPath,desPath);//如果没有下载成功重新下载 
                    }else{//下载成功,清除redis
                        jedisCluster.hdel(UsRedisKey.REDIS_NOTICE_DOWNLOAD_LIST,srcPath);//清除元素
                    }
                    return "线程执行完成.";
                }
            };

            try {
                Future<String> future = exec.submit(call);
                String obj = future.get(120 * 1000, TimeUnit.MILLISECONDS); // 任务处理超时时间设置
                logger.info("任务成功返回:" + obj);
            } catch (TimeoutException ex) {
                logger.error("处理超时啦....");
                ex.printStackTrace();
                throw new TimeoutException();
            }finally{
                // 关闭线程池
                exec.shutdown();
            }
            logger.info("完毕");
            return true;
        } catch (Exception e) {
            logger.error("iis-ftp下载文件失败,文件路径:{}",srcPath,e);
            try {
                TimeUnit.SECONDS.sleep(5);
                disconnect();
                retrieveFile(srcPath,desPath);//如果没有下载成功重新下载 
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return false;     
    }
    
    public DownloadStatus download(String remote, String local)
            throws Exception {
        checkAndReconnect();
        DownloadStatus result = DownloadStatus.Download_Failed;
        logger.info("检查文件开始:"+remote);
        // 检查远程文件是否存在
        FTPFile[] files = null;
        try {
            files = ftpClient.listFiles(new String(
                    remote.getBytes("GBK"), "iso-8859-1"));
        } catch (Exception e1) {
            e1.printStackTrace();
            throw new Exception(e1);
        } 
        if (files.length != 1) {
            logger.info("远程文件不存在:"+remote);
            return DownloadStatus.Remote_File_Noexist;
        }
        logger.info("检查文件结束:"+remote);
        long remoteSize = files[0].getSize();
        File f = new File(local);
        // 本地存在文件,进行断点下载
        if (!f.getParentFile().exists()) {  
            f.getParentFile().mkdirs();// 目录不存在的情况下,创建目录。  
        } 
        OutputStream out = null;
        InputStream in = null;
        byte[] bytes = new byte[1024];
        long step = remoteSize / 100;
        long process = 0;
        long localSize = 0L;
        int c;
        if (f.exists()) {
            localSize = f.length();
            // 判断本地文件大小是否大于远程文件大小
            if (localSize >= remoteSize) {
                //logger.info("本地文件大于等于远程文件,下载中止");
                return DownloadStatus.Download_Success;
            }
            logger.info("进行断点续传:"+remote);
            out = new FileOutputStream(f, true);
            ftpClient.setRestartOffset(localSize);
            process = localSize / step;
            step = remoteSize / 100;
        } else{
            logger.info("正常下载:"+remote);
            out = new FileOutputStream(f);
        }
        try {
            in = ftpClient.retrieveFileStream(new String(remote
                    .getBytes("GBK"), "iso-8859-1"));
            
            while ((c = in.read(bytes)) != -1) {
                out.write(bytes, 0, c);
                localSize += c;
                long nowProcess = localSize / step;
                if (nowProcess > process) {
                    process = nowProcess;
                    if (process % 10 == 0)
                         logger.info("下载"+remote+":进度:" + process+"%");
                    // TODO 更新文件下载进度,值存放在process变量中
                }
            }
            in.close();
            out.close();
            boolean upNewStatus = ftpClient.completePendingCommand();
            if (upNewStatus&&process==100) {
                result = DownloadStatus.Download_Success;
            } else {
                result = DownloadStatus.Download_Failed;
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new IOException(e);
        }finally {
            if(in != null){
                in.close();
            }
            if(out != null){
                out.close();
            }
        }
        return result;
    }
    
    public void checkAndReconnect() throws IOException{
        int count=0;
        while(count < 10){
            if(ftpClient.isConnected()){
                break;
            }
            ftpClient.connect(url);
            ftpClient.setBufferSize(1024); 
            ftpClient.enterLocalPassiveMode();
            ftpClient.setDefaultTimeout(60*1000);
            ftpClient.setConnectTimeout(60 * 1000);
            ftpClient.setDataTimeout(7200);
            ftpClient.login(username, password);
            ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
            count++;
            logger.info("重连ftp,第 {}次",count);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } 
    }

   public void disconnect(){
        if (ftpClient.isConnected()) {
            try {
                ftpClient.disconnect();
            } catch (IOException e) {
                logger.error("ftp断开异常",e);
            }
        }
    }
}


1、使用redis缓存每次下载的文件路径名,下载完成后,讲key移除,也可以使用数据库。

2、核心,使用递归方式,没下载完的不会跳出递归

3、考虑到ftpclient有listfile会卡死,采用future自定义超时。

有问题留言


版权声明:本文为yangxujia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。