kettle 运行资源库中的作业
在项目中如要获取作业运行状态,结束作业,停止作业需要保存JOB对象(通过HashMap或序列化到表中,未开发)
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.trans.Trans;
public class KettleRunJob {
public static void main(String[] args) throws Exception {
runJob();
}
/**
* 运行资源库中的作业
*
* @throws KettleException
*/
public static void runJob() throws KettleException {
KettleDatabaseRepository kettleDatabaseRepository = ResourceLibrary.repositoryCon();
// 获取目录
RepositoryDirectoryInterface directory =
kettleDatabaseRepository.loadRepositoryDirectoryTree().findDirectory("/");
// 根据作业名称获取作业id
ObjectId id = kettleDatabaseRepository.getJobId("JobName", directory);
// 加载作业
JobMeta jobMeta = kettleDatabaseRepository.loadJob(id, null);
Job job = new Job(kettleDatabaseRepository, jobMeta);
// 执行作业
job.run();
job.setLogLevel(LogLevel.ROWLEVEL);
// 等待作业执行完毕
job.waitUntilFinished();
}
/**
* 获取作业运行状态
* @return
*/
public static String getJobStatus(Job job) {
String status = job.getStatus();
if(status.indexOf("errors")>-1){
status = "STOP_FAILED";
}
return status;
}
/**
* 结束作业
* @return
* @throws Exception
*/
@SuppressWarnings("deprecation")
public static String killJob(Job job) throws Exception {
if(job == null){
return Trans.STRING_STOPPED;
}
//采用线程中断结束卡住的线程
job.interrupt();
job.join();
//中断无效时,直接结束
if(!job.getState().equals(Thread.State.TERMINATED)){
job.stop();
}
System.out.println("作业结束完成:"+job.getJobname());
String status = getJobStatus(job);
return status;
}
/**
* 停止作业
* @return
* @throws Exception
*/
public static String stopJob(Job job ) throws Exception {
if(job == null){
return Trans.STRING_STOPPED;
}
job.stopAll();
System.out.println("作业停止完成:"+job.getJobname());
String status = getJobStatus(job);
return status;
}
}
后添加
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
public class Demo3 {
public static void main(String[] args) {
String[] params = {"1", "content", "d:\\test1.txt"};
runTransfer(params, "D:\\kettle\\test.ktr");
}
/**
* 运行转换文件方法
*
* @param params 多个参数变量值
* @param ktrPath 转换文件的路径,后缀ktr
*/
public static void runTransfer(String[] params, String ktrPath) {
Trans trans = null;
try {
// // 初始化
// 转换元对象
KettleEnvironment.init();// 初始化
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrPath);
// 转换
trans = new Trans(transMeta);
// 执行转换
trans.execute(params);
// 等待转换执行结束
trans.waitUntilFinished();
// 抛出异常
if (trans.getErrors() > 0) {
throw new Exception(
"There are errors during transformation exception!(传输过程中发生异常)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* java 调用 kettle 的job
*
* @param jobPath 如: String fName= "D:\\kettle\\informix_to_am_4.ktr";
*/
public static void runJob(String[] params, String jobPath) {
try {
KettleEnvironment.init();
// jobname 是Job脚本的路径及名称
JobMeta jobMeta = new JobMeta(jobPath, null);
Job job = new Job(null, jobMeta);
// 向Job 脚本传递参数,脚本中获取参数值:${参数名}
// job.setVariable(paraname, paravalue);
job.setVariable("id", params[0]);
job.setVariable("content", params[1]);
job.setVariable("file", params[2]);
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception(
"There are errors during job exception!(执行job发生异常)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
版权声明:本文为qq_41982570原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。