模拟kettle组件转换任务中输入-转换-输出功能点

Java如何模拟kettle组件转换任务中输入-转换-输出功能点,此类问题在网上解答较少,需要研究kettle源码完成,在下面的叙述中,我将一一解答

输入

mysql

/**  
* 创建步骤:(输入:表输入)     
*      
* @param transMeta     
* @return    
*/     
private static StepMeta createStep1(TransMeta transMeta) {       
    // 新建一个表输入步骤(TableInputMeta)      
    TableInputMeta tableInputMeta = new TableInputMeta();      
    // 设置步骤1的数据库连接  
    tableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));       
    // 设置步骤1中的sql       
    tableInputMeta.setSQL("SELECT id ,name FROM user");      
    // 设置步骤名称       
    StepMeta step1 = new StepMeta("step1name", tableInputMeta);       
    return step1;    
}

CSV

private static StepMeta createCsv1(TransMeta transMeta){    
CsvInputMeta csvInputMeta=new CsvInputMeta();                  csvInputMeta.setFilename("C:/Users/LENOVO/Desktop/kettle/test.csv");    csvInputMeta.setDelimiter(",");    
csvInputMeta.setDefault();    
csvInputMeta.setHeaderPresent(true);    
TextFileInputField[] textFileInputField={new TextFileInputField("id",-1,-1),new TextFileInputField("name",-1,-1)};    
csvInputMeta.setInputFields(textFileInputField);    
StepMeta stepMeta1=new StepMeta("step1name",csvInputMeta);    
return stepMeta1;
}

输出

mysql

 /**      
 * 创建步骤:(输出:插入/更新)      
 *       
 * @param transMeta      
 * @return      
 */     
 private static StepMeta createStep2(TransMeta transMeta) {       
 // 新建一个插入/更新步骤        
 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();       
 // 设置步骤2的数据库连接   
 insertUpdateMeta.setDatabaseMeta(transMeta.findDatabase("toDbName"));        
 // 设置目标表       
 insertUpdateMeta.setTableName("userB");       
 // 设置用来查询的关键字        
insertUpdateMeta.setKeyLookup(new String[] {"id"});              insertUpdateMeta.setKeyCondition(new String[] {"="});        insertUpdateMeta.setKeyStream(new String[] {"id"});        insertUpdateMeta.setKeyStream2(new String[] {""});
// 一定要加上       
// 设置要更新的字段        
String[] updatelookup = {"id","name"};        
String[] updateStream = {"id","name"};        
Boolean[] updateOrNot = {false,true};        
// 设置表字段        
insertUpdateMeta.setUpdateLookup(updatelookup);        
// 设置流字段        
insertUpdateMeta.setUpdateStream(updateStream);       
// 设置是否更新        i
nsertUpdateMeta.setUpdate(updateOrNot);        
// 设置步骤2的名称        
StepMeta step2 = new StepMeta("step2name", insertUpdateMeta);        
return step2;
}

csv


private static StepMeta createCsvStep2(TransMeta transMeta){    
TextFileOutputMeta textFileOutputMeta=new TextFileOutputMeta();    textFileOutputMeta.setDefault();    
textFileOutputMeta.setHeaderEnabled(true);    textFileOutputMeta.setSeparator(",");    textFileOutputMeta.setFileName("C:/Users/LENOVO/Desktop/kettle/test2");    textFileOutputMeta.setExtension("csv");   
return new StepMeta("step2name",textFileOutputMeta);
}

创建转换

  /**     
 * 创建转换     
 *     
 * @return     
 */     
 private static TransMeta createTrans() {       
 TransMeta transMeta = new TransMeta();       
 // 设置转化的名称       
 transMeta.setName("cgmTransName");       
 // 添加转换的数据库连接      
 transMeta.addDatabase(new DatabaseMeta("fromDbName", "mysql", 
"Native(JDBC)", "192.168.17.99",               "tree_wf?useSSL=false", "3306", "root", "123456"));       
transMeta.addDatabase(new DatabaseMeta("toDbName", "mysql", "Native(JDBC)", 
"192.168.17.66", "tree_wf?useSSL=false",               "3306", "root", "123456"));       return transMeta;  
}

创建节点连接

 /**     
 * 创建节点连接     
 *      
 * @param step1     
 * @param step2     
 * @return     
 */     
 private static TransHopMeta createHop(StepMeta step1, StepMeta step2) {       
 // 设置起始步骤和目标步骤,把两个步骤关联起来       
 TransHopMeta transHopMeta = new TransHopMeta(step1, step2);       
 return transHopMeta;     
 }

定义一个转换

/**     
* 定义一个转换,但是还没有保存到资源库     
*      
* @return    
* @throws KettleException     
*/     
private static TransMeta generateTrans() throws KettleException {       
TransMeta transMeta = createTrans();      
// 创建步骤1并添加到转换中       
StepMeta step1 = createCsv1(transMeta);      
transMeta.addStep(step1);      
// 创建步骤2并添加到转换中       
StepMeta step2 = createStep2(transMeta);       
transMeta.addStep(step2);      
// 创建hop连接并添加hop       
TransHopMeta TransHopMeta = createHop(step1, step2);       transMeta.addTransHop(TransHopMeta);       
return transMeta;     
}

运行转换

/**     
* 运行转换     
*      
* @param transMeta     
* @throws KettleException     
*/    
private static void runTrans(TransMeta transMeta) throws KettleException {         VariableSpace space=new Variables();         space.setVariable("test","toDbName");        
space.initializeVariablesFrom(null);        
ChannelLogTable channelLogTable= 
ChannelLogTable.getDefault(space,transMeta);         channelLogTable.setConnectionName("toDbName");         channelLogTable.setTableName("TEST");         transMeta.setChannelLogTable(channelLogTable);         
TransLogTable 
transLogTable=TransLogTable.getDefault(space,transMeta,transMeta.getSteps());       transLogTable.setConnectionName("toDbName");         transLogTable.setTableName("t_lzfx_data_log");         transMeta.setTransLogTable(transLogTable);         
StepLogTable stepLogTable=StepLogTable.getDefault(space,transMeta);         stepLogTable.setTableName("KETTLE_STEP_LOG_TABLE");         stepLogTable.setConnectionName("toDbName");         transMeta.setStepLogTable(stepLogTable);         
//Trans trans = new Trans(transMeta);         System.out.println(transMeta.getXML());        
//trans.execute(null);// 执行转换         
//trans.waitUntilFinished(); // 等待转换执行结束    
}

运行

public static void main(String[] args) throws Exception {    
KettleEnvironment.init();    
TransMeta transMeta = generateTrans();    
runTrans(transMeta);}

pom.xml

<dependency>      
<groupId>org.projectlombok</groupId>       
<artifactId>lombok</artifactId>       
<optional>true</optional>
</dependency>
<dependency>       
<groupId>pentaho-kettle</groupId>       
<artifactId>kettle-core</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>       
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>       
<version>8.2.0.0-342</version>
</dependency>
<dependency>       
<groupId>pentaho-kettle</groupId>       
<artifactId>kettle-engine</artifactId>       
<version>8.2.0.0-342</version>
</dependency><dependency>       
<groupId>pentaho</groupId>       
<artifactId>metastore</artifactId>      
<version>8.2.0.0-342</version>
</dependency>

版权声明:本文为zhanglu1236789原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。