运行kettle 任务/转换

kettle 运行资源库中的作业
在项目中如要获取作业运行状态,结束作业,停止作业需要保存JOB对象(通过HashMap或序列化到表中,未开发)


import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.trans.Trans;

public class KettleRunJob {
    public static void main(String[] args) throws Exception {
        runJob();
    }

    /**
     * 运行资源库中的作业
     *
     * @throws KettleException
     */
    public static void runJob() throws KettleException {
        KettleDatabaseRepository kettleDatabaseRepository = ResourceLibrary.repositoryCon();
        // 获取目录
        RepositoryDirectoryInterface directory =
                kettleDatabaseRepository.loadRepositoryDirectoryTree().findDirectory("/");
        // 根据作业名称获取作业id
        ObjectId id = kettleDatabaseRepository.getJobId("JobName", directory);
        // 加载作业
        JobMeta jobMeta = kettleDatabaseRepository.loadJob(id, null);
        Job job = new Job(kettleDatabaseRepository, jobMeta);
        // 执行作业
        job.run();
        job.setLogLevel(LogLevel.ROWLEVEL);
        // 等待作业执行完毕
        job.waitUntilFinished();
    }

    /**
     * 获取作业运行状态
     * @return
     */
    public static String getJobStatus(Job job) {
        String status = job.getStatus();
        if(status.indexOf("errors")>-1){
            status = "STOP_FAILED";
        }
        return status;
    }

    /**
     * 结束作业
     * @return
     * @throws Exception
     */
    @SuppressWarnings("deprecation")
    public static String killJob(Job job) throws Exception {
        if(job == null){
            return Trans.STRING_STOPPED;
        }
        //采用线程中断结束卡住的线程
        job.interrupt();
        job.join();
        //中断无效时,直接结束
        if(!job.getState().equals(Thread.State.TERMINATED)){
            job.stop();
        }
        System.out.println("作业结束完成:"+job.getJobname());
        String status = getJobStatus(job);
        return status;
    }

    /**
     * 停止作业
     * @return
     * @throws Exception
     */
    public static String stopJob(Job job ) throws Exception {
        if(job == null){
            return Trans.STRING_STOPPED;
        }
        job.stopAll();
        System.out.println("作业停止完成:"+job.getJobname());
        String status = getJobStatus(job);
        return status;
    }

}


后添加


import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

public class Demo3 {

    public static void main(String[] args) {

        String[] params = {"1", "content", "d:\\test1.txt"};
        runTransfer(params, "D:\\kettle\\test.ktr");
    }

    /**
     * 运行转换文件方法
     *
     * @param params  多个参数变量值
     * @param ktrPath 转换文件的路径,后缀ktr
     */
    public static void runTransfer(String[] params, String ktrPath) {
        Trans trans = null;
        try {
            // // 初始化
            // 转换元对象
            KettleEnvironment.init();// 初始化
            EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(ktrPath);
            // 转换
            trans = new Trans(transMeta);

            // 执行转换
            trans.execute(params);
            // 等待转换执行结束
            trans.waitUntilFinished();
            // 抛出异常
            if (trans.getErrors() > 0) {
                throw new Exception(
                        "There are errors during transformation exception!(传输过程中发生异常)");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * java 调用 kettle 的job
     *
     * @param jobPath 如: String fName= "D:\\kettle\\informix_to_am_4.ktr";
     */
    public static void runJob(String[] params, String jobPath) {
        try {
            KettleEnvironment.init();
            // jobname 是Job脚本的路径及名称
            JobMeta jobMeta = new JobMeta(jobPath, null);
            Job job = new Job(null, jobMeta);
            // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
            // job.setVariable(paraname, paravalue);
            job.setVariable("id", params[0]);
            job.setVariable("content", params[1]);
            job.setVariable("file", params[2]);
            job.start();
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception(
                        "There are errors during job exception!(执行job发生异常)");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

    

版权声明:本文为qq_41982570原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。