flink readFile方法按时间范围递归读取文件工具

import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class InputUtil {
public static TextInputFormat myInputFormat(String dir, String startDate, String endDate){
Configuration configuration = new Configuration();
configuration.setBoolean(“recursive.file.enumeration”, true);
Path path = new Path(dir);
TextInputFormat textInputFormat = new TextInputFormat(path);
textInputFormat.configure(configuration);
textInputFormat.supportsMultiPaths();

    textInputFormat.setFilesFilter(new FilePathFilter() {
        @Override
        public boolean filterPath(Path filePath) {

            String filepath = filePath.toString();
            // 截取时间字符串用于转换为timestamp类型
            String[] splits = filepath.split("dt=");
            int length = splits.length;
            String date = splits[length - 1];
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");



            Date parseDate = null;
            Date parseStartDate = null;
            Date parseEndDate = null;
            try {
                parseDate = simpleDateFormat.parse(date);
                parseStartDate = simpleDateFormat.parse(startDate);
                parseEndDate = simpleDateFormat.parse(endDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }

            long dateTime = parseDate.getTime();
            long startDateTime = parseStartDate.getTime();
            long endDateTime = parseEndDate.getTime();

            boolean s = dateTime >= startDateTime;
            boolean e = dateTime <= endDateTime;
            System.out.println(filePath.toString());

            return !(s && e);

        }
    });




    return textInputFormat;
}

}

最后的文件夹必须是这种格式


版权声明:本文为DogTips原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。