Java如何模拟kettle组件转换任务中输入-转换-输出功能点,此类问题在网上解答较少,需要研究kettle源码完成,在下面的叙述中,我将一一解答
输入
mysql
/**
* 创建步骤:(输入:表输入)
*
* @param transMeta
* @return
*/
private static StepMeta createStep1(TransMeta transMeta) {
// 新建一个表输入步骤(TableInputMeta)
TableInputMeta tableInputMeta = new TableInputMeta();
// 设置步骤1的数据库连接
tableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
// 设置步骤1中的sql
tableInputMeta.setSQL("SELECT id ,name FROM user");
// 设置步骤名称
StepMeta step1 = new StepMeta("step1name", tableInputMeta);
return step1;
}
CSV
private static StepMeta createCsv1(TransMeta transMeta){
CsvInputMeta csvInputMeta=new CsvInputMeta(); csvInputMeta.setFilename("C:/Users/LENOVO/Desktop/kettle/test.csv"); csvInputMeta.setDelimiter(",");
csvInputMeta.setDefault();
csvInputMeta.setHeaderPresent(true);
TextFileInputField[] textFileInputField={new TextFileInputField("id",-1,-1),new TextFileInputField("name",-1,-1)};
csvInputMeta.setInputFields(textFileInputField);
StepMeta stepMeta1=new StepMeta("step1name",csvInputMeta);
return stepMeta1;
}
输出
mysql
/**
* 创建步骤:(输出:插入/更新)
*
* @param transMeta
* @return
*/
private static StepMeta createStep2(TransMeta transMeta) {
// 新建一个插入/更新步骤
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
// 设置步骤2的数据库连接
insertUpdateMeta.setDatabaseMeta(transMeta.findDatabase("toDbName"));
// 设置目标表
insertUpdateMeta.setTableName("userB");
// 设置用来查询的关键字
insertUpdateMeta.setKeyLookup(new String[] {"id"}); insertUpdateMeta.setKeyCondition(new String[] {"="}); insertUpdateMeta.setKeyStream(new String[] {"id"}); insertUpdateMeta.setKeyStream2(new String[] {""});
// 一定要加上
// 设置要更新的字段
String[] updatelookup = {"id","name"};
String[] updateStream = {"id","name"};
Boolean[] updateOrNot = {false,true};
// 设置表字段
insertUpdateMeta.setUpdateLookup(updatelookup);
// 设置流字段
insertUpdateMeta.setUpdateStream(updateStream);
// 设置是否更新 i
nsertUpdateMeta.setUpdate(updateOrNot);
// 设置步骤2的名称
StepMeta step2 = new StepMeta("step2name", insertUpdateMeta);
return step2;
}
csv
private static StepMeta createCsvStep2(TransMeta transMeta){
TextFileOutputMeta textFileOutputMeta=new TextFileOutputMeta(); textFileOutputMeta.setDefault();
textFileOutputMeta.setHeaderEnabled(true); textFileOutputMeta.setSeparator(","); textFileOutputMeta.setFileName("C:/Users/LENOVO/Desktop/kettle/test2"); textFileOutputMeta.setExtension("csv");
return new StepMeta("step2name",textFileOutputMeta);
}
创建转换
/**
* 创建转换
*
* @return
*/
private static TransMeta createTrans() {
TransMeta transMeta = new TransMeta();
// 设置转化的名称
transMeta.setName("cgmTransName");
// 添加转换的数据库连接
transMeta.addDatabase(new DatabaseMeta("fromDbName", "mysql",
"Native(JDBC)", "192.168.17.99", "tree_wf?useSSL=false", "3306", "root", "123456"));
transMeta.addDatabase(new DatabaseMeta("toDbName", "mysql", "Native(JDBC)",
"192.168.17.66", "tree_wf?useSSL=false", "3306", "root", "123456")); return transMeta;
}
创建节点连接
/**
* 创建节点连接
*
* @param step1
* @param step2
* @return
*/
private static TransHopMeta createHop(StepMeta step1, StepMeta step2) {
// 设置起始步骤和目标步骤,把两个步骤关联起来
TransHopMeta transHopMeta = new TransHopMeta(step1, step2);
return transHopMeta;
}
定义一个转换
/**
* 定义一个转换,但是还没有保存到资源库
*
* @return
* @throws KettleException
*/
private static TransMeta generateTrans() throws KettleException {
TransMeta transMeta = createTrans();
// 创建步骤1并添加到转换中
StepMeta step1 = createCsv1(transMeta);
transMeta.addStep(step1);
// 创建步骤2并添加到转换中
StepMeta step2 = createStep2(transMeta);
transMeta.addStep(step2);
// 创建hop连接并添加hop
TransHopMeta TransHopMeta = createHop(step1, step2); transMeta.addTransHop(TransHopMeta);
return transMeta;
}
运行转换
/**
* 运行转换
*
* @param transMeta
* @throws KettleException
*/
private static void runTrans(TransMeta transMeta) throws KettleException { VariableSpace space=new Variables(); space.setVariable("test","toDbName");
space.initializeVariablesFrom(null);
ChannelLogTable channelLogTable=
ChannelLogTable.getDefault(space,transMeta); channelLogTable.setConnectionName("toDbName"); channelLogTable.setTableName("TEST"); transMeta.setChannelLogTable(channelLogTable);
TransLogTable
transLogTable=TransLogTable.getDefault(space,transMeta,transMeta.getSteps()); transLogTable.setConnectionName("toDbName"); transLogTable.setTableName("t_lzfx_data_log"); transMeta.setTransLogTable(transLogTable);
StepLogTable stepLogTable=StepLogTable.getDefault(space,transMeta); stepLogTable.setTableName("KETTLE_STEP_LOG_TABLE"); stepLogTable.setConnectionName("toDbName"); transMeta.setStepLogTable(stepLogTable);
//Trans trans = new Trans(transMeta); System.out.println(transMeta.getXML());
//trans.execute(null);// 执行转换
//trans.waitUntilFinished(); // 等待转换执行结束
}
运行
public static void main(String[] args) throws Exception {
KettleEnvironment.init();
TransMeta transMeta = generateTrans();
runTrans(transMeta);}
pom.xml
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>8.2.0.0-342</version>
</dependency><dependency>
<groupId>pentaho</groupId>
<artifactId>metastore</artifactId>
<version>8.2.0.0-342</version>
</dependency>
版权声明:本文为zhanglu1236789原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。