打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
JR 精品文章 - KETTLE?JAVA?API?实战记录
为什么要用Kettle和KETTLE JAVA API?
Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便,kettle的ETL工具集合也比较多,常用的ETL工具都包含了。

为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。

KETTLE JAVA API 实战操作记录:

、         搭建环境 :到http://www.kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\kettle目录

、         打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用System.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了.

、         建一个class : TransBuilder.java,可以把d:\kettle\ extra\TransBuilder.java的内容原样拷贝到你的TransBuilder.java里。

、         根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:

import org.eclipse.swt.dnd.Transfer;

//这个包被遗漏了,原始位置kettle根目录\libswt\win32\swt.jar

//add by chq(www.chq.name) on  2006.07.20

(后来发现,不必加这个引用,因为编译时不需要)

、         编译准备,在eclipse中增加jar包,主要包括(主要依据extra\TransBuilder.bat):

\lib\kettle.jar
\libext\CacheDB.jar
\libext\SQLBaseJDBC.jar
\libext\activation.jar
\libext\db2jcc.jar
\libext\db2jcc_license_c.jar
\libext\edtftpj-1.4.5.jar
\libext\firebirdsql-full.jar
\libext\firebirdsql.jar
\libext\gis-shape.jar
\libext\hsqldb.jar
\libext\ifxjdbc.jar
\libext\javadbf.jar
\libext\jconn2.jar
\libext\js.jar
\libext\jt400.jar
\libext\jtds-1.1.jar
\libext\jxl.jar
\libext\ktable.jar
\libext\log4j-1.2.8.jar
\libext\mail.jar
\libext\mysql-connector-java-3.1.7-bin.jar
\libext\ojdbc14.jar
\libext\orai18n.jar
\libext\pg74.215.jdbc3.jar
\libext\edbc.jar

(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\libswt\win32\swt.jar)
\libswt\win32\swt.jar 

、         编译成功后,准备运行

为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \Documents and Settings\用户\.kettle\,主要内容如下:

KETTLE_REPOSITORY=kettle@m80

KETTLE_USER=admin

KETTLE_PASSWORD=passwd

、         好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。

以下是运行时的控制台信息输出:



下面是自动生成的Transformation :



以下为修改后的程序源码:


--------------------------------------------------------------------------------
  1. package name.chq.test;
  2.  
  3. import java.io.DataOutputStream;
  4. import java.io.File;
  5. import java.io.FileOutputStream;
  6.  
  7. import be.ibridge.kettle.core.Const;
  8. import be.ibridge.kettle.core.LogWriter;
  9. import be.ibridge.kettle.core.NotePadMeta;
  10. import be.ibridge.kettle.core.database.Database;
  11. import be.ibridge.kettle.core.database.DatabaseMeta;
  12. import be.ibridge.kettle.core.exception.KettleException;
  13. import be.ibridge.kettle.core.util.EnvUtil;
  14. import be.ibridge.kettle.trans.StepLoader;
  15. import be.ibridge.kettle.trans.Trans;
  16. import be.ibridge.kettle.trans.TransHopMeta;
  17. import be.ibridge.kettle.trans.TransMeta;
  18. import be.ibridge.kettle.trans.step.StepMeta;
  19. import be.ibridge.kettle.trans.step.StepMetaInterface;
  20. import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
  21. import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
  22. import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
  23.  
  24.  
  25. //这个包被遗漏了,原始位置kettle根目录\libswt\win32\swt.jar
  26. //add by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20
  27. //import org.eclipse.swt.dnd.Transfer; 
  28.  
  29. /**
  30.  * Class created to demonstrate the creation of transformations on-the-fly.
  31.  * 
  32.  * @author Matt
  33.  * 
  34.  */
  35. public class TransBuilder
  36. {
  37.     public static final String[] databasesXML = {
  38.         "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
  39.         "<connection>" +
  40.           "<name>target</name>" +
  41.           "<server>192.168.17.35</server>" +
  42.           "<type>ORACLE</type>" +
  43.                      "<access>Native</access>" +
  44.                      "<database>test1</database>" +
  45.                      "<port>1521</port>" +
  46.                      "<username>testuser</username>" +
  47.                      "<password>pwd</password>" +
  48.                      "<servername/>" +
  49.                      "<data_tablespace/>" +
  50.                      "<index_tablespace/>" +
  51.                      "<attributes>" +
  52.                        "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +
  53.                        "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +
  54.                           "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +
  55.                             "</attributes>" +
  56.                        "</connection>" ,
  57.          
  58.         "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
  59.                          "<connection>" +
  60.                                 "<name>source</name>" +
  61.                                 "<server>192.168.16.12</server>" +
  62.                                 "<type>ORACLE</type>" +
  63.                                 "<access>Native</access>" +
  64.                                 "<database>test2</database>" +
  65.                                 "<port>1521</port>" +
  66.                                 "<username>testuser</username>" +
  67.                                 "<password>pwd2</password>" +
  68.                                 "<servername/>" +
  69.                                 "<data_tablespace/>" +
  70.                                 "<index_tablespace/>" +
  71.                                 "<attributes>" +
  72.                                     "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +
  73.                                     "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +
  74.                                        "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +
  75.                                 "</attributes>" +
  76.                          "</connection>" 
  77.     };
  78.  
  79.     /**
  80.      * Creates a new Transformation using input parameters such as the tablename to read from.
  81.      * @param transformationName The name of the transformation
  82.      * @param sourceDatabaseName The name of the database to read from
  83.      * @param sourceTableName The name of the table to read from
  84.      * @param sourceFields The field names we want to read from the source table
  85.      * @param targetDatabaseName The name of the target database
  86.      * @param targetTableName The name of the target table we want to write to
  87.      * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
  88.      * @return A new transformation
  89.      * @throws KettleException In the rare case something goes wrong
  90.      */
  91.     public static final TransMeta buildCopyTable(
  92.        String transformationName,String sourceDatabaseName, String sourceTableName, 
  93.        String[] sourceFields, String targetDatabaseName, String targetTableName, 
  94.        String[] targetFields)
  95.       throws KettleException
  96.     {
  97.         LogWriter log = LogWriter.getInstance();
  98.         EnvUtil.environmentInit();
  99.         try
  100.         {
  101.             //
  102.             // Create a new transformation...
  103.             //
  104.             TransMeta transMeta = new TransMeta();
  105.             transMeta.setName(transformationName);
  106.             
  107.             // Add the database connections
  108.  
  109.             for (int i=0;i<databasesXML.length;i++)
  110.             {
  111.                 DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
  112.                 transMeta.addDatabase(databaseMeta);
  113.             }
  114.             
  115.             DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
  116.             DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);
  117.  
  118.             
  119.             //
  120.             // Add a note
  121.             //
  122.             String note = "Reads information from table [" + sourceTableName+ "] on database [" 
  123.                             + sourceDBInfo + "]" + Const.CR;
  124.             note += "After that, it writes the information to table [" + targetTableName + "] on database [" 
  125.                             + targetDBInfo + "]";
  126.             NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
  127.             transMeta.addNote(ni);
  128.  
  129.             // 
  130.             // create the source step...
  131.             //
  132.             String fromstepname = "read from [" + sourceTableName + "]";
  133.             TableInputMeta tii = new TableInputMeta();
  134.             tii.setDatabaseMeta(sourceDBInfo);
  135.             String selectSQL = "SELECT "+Const.CR;
  136.             for (int i=0;i<sourceFields.length;i++)
  137.             {
  138.             /* modi by chq(www.chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */
  139.                 if (i>0) 
  140.                      selectSQL+=", "
  141.                 else selectSQL+="  ";
  142.                
  143.                 selectSQL+=sourceFields[i]+Const.CR;
  144.             }
  145.             selectSQL+="FROM "+sourceTableName;
  146.             tii.setSQL(selectSQL);
  147.  
  148.             StepLoader steploader = StepLoader.getInstance();
  149.  
  150.             String fromstepid = steploader.getStepPluginID(tii);
  151.             StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
  152.             fromstep.setLocation(150, 100);
  153.             fromstep.setDraw(true);
  154.             fromstep.setDescription("Reads information from table [" + sourceTableName 
  155.                                      + "] on database [" + sourceDBInfo + "]");
  156.             transMeta.addStep(fromstep);
  157.  
  158.             //
  159.             // add logic to rename fields
  160.             // Use metadata logic in SelectValues, use SelectValueInfo...
  161.             //
  162.             /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20
  163.             SelectValuesMeta svi = new SelectValuesMeta();
  164.             svi.allocate(0, 0, sourceFields.length);
  165.             for (int i = 0; i < sourceFields.length; i++)
  166.             {
  167.                 svi.getMetaName()[i] = sourceFields[i];
  168.                 svi.getMetaRename()[i] = targetFields[i];
  169.             }
  170.  
  171.             String selstepname = "Rename field names";
  172.             String selstepid = steploader.getStepPluginID(svi);
  173.             StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
  174.             selstep.setLocation(350, 100);
  175.             selstep.setDraw(true);
  176.             selstep.setDescription("Rename field names");
  177.             transMeta.addStep(selstep);
  178.  
  179.             TransHopMeta shi = new TransHopMeta(fromstep, selstep);
  180.             transMeta.addTransHop(shi);
  181.             fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20
  182.             */
  183.             // 
  184.             // Create the target step...
  185.             //
  186.             //
  187.             // Add the TableOutputMeta step...
  188.             //
  189.             String tostepname = "write to [" + targetTableName + "]";
  190.             TableOutputMeta toi = new TableOutputMeta();
  191.             toi.setDatabase(targetDBInfo);
  192.             toi.setTablename(targetTableName);
  193.             toi.setCommitSize(200);
  194.             toi.setTruncateTable(true);
  195.  
  196.             String tostepid = steploader.getStepPluginID(toi);
  197.             StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
  198.             tostep.setLocation(550, 100);
  199.             tostep.setDraw(true);
  200.             tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
  201.             transMeta.addStep(tostep);
  202.  
  203.             //
  204.             // Add a hop between the two steps...
  205.             //
  206.             TransHopMeta hi = new TransHopMeta(fromstep, tostep);
  207.             transMeta.addTransHop(hi);
  208.  
  209.             // OK, if we're still here: overwrite the current transformation...
  210.             return transMeta;
  211.         }
  212.         catch (Exception e)
  213.         {
  214.             throw new KettleException("An unexpected error occurred creating the new transformation", e);
  215.         }
  216.     }
  217.  
  218.     /**
  219.      * 1) create a new transformation
  220.      * 2) save the transformation as XML file
  221.      * 3) generate the SQL for the target table
  222.      * 4) Execute the transformation
  223.      * 5) drop the target table to make this program repeatable
  224.      * 
  225.      * @param args
  226.      */
  227.     public static void main(String[] args) throws Exception
  228.     {
  229.        EnvUtil.environmentInit();
  230.         // Init the logging...
  231.         LogWriter log = LogWriter.getInstance("TransBuilder.log"true, LogWriter.LOG_LEVEL_DETAILED);
  232.         
  233.         // Load the Kettle steps & plugins 
  234.         StepLoader stloader = StepLoader.getInstance();
  235.         if (!stloader.read())
  236.         {
  237.             log.logError("TransBuilder",  "Error loading Kettle steps & plugins... stopping now!");
  238.             return;
  239.         }
  240.         
  241.         // The parameters we want, optionally this can be 
  242.         String fileName = "NewTrans.xml";
  243.         String transformationName = "Test Transformation";
  244.         String sourceDatabaseName = "source";
  245.         String sourceTableName = "testuser.source_table";
  246.         String sourceFields[] = { 
  247.                "*" 
  248.                };
  249.  
  250.         String targetDatabaseName = "target";
  251.         String targetTableName = "testuser.target_table";
  252.         String targetFields[] = { 
  253.                "*"
  254.                };
  255.  
  256.         
  257.         // Generate the transformation.
  258.         TransMeta transMeta = TransBuilder.buildCopyTable(
  259.                 transformationName,
  260.                 sourceDatabaseName,
  261.                 sourceTableName,
  262.                 sourceFields,
  263.                 targetDatabaseName,
  264.                 targetTableName,
  265.                 targetFields
  266.                 );
  267.         
  268.         // Save it as a file:
  269.         String xml = transMeta.getXML();
  270.         DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
  271.         dos.write(xml.getBytes("UTF-8"));
  272.         dos.close();
  273.         System.out.println("Saved transformation to file: "+fileName);
  274.  
  275.         // OK, What's the SQL we need to execute to generate the target table?
  276.         String sql = transMeta.getSQLStatementsString();
  277.         
  278.         // Execute the SQL on the target table:
  279.         Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
  280.         targetDatabase.connect();
  281.         targetDatabase.execStatements(sql);
  282.         
  283.         // Now execute the transformation...
  284.         Trans trans = new Trans(log, transMeta);
  285.         trans.execute(null);
  286.         trans.waitUntilFinished();
  287.         
  288.         // For testing/repeatability, we drop the target table again
  289.         /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20 不必删表
  290.         //targetDatabase.execStatement("drop table "+targetTableName);
  291.         targetDatabase.disconnect();
  292.     }
  293.  
  294.  
  295. }
  296.  

 

相关文档:

 

[翻译]KETTLE JAVA API :编程定制自己的Kettle转换(transformation)
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
ETL工具kettle与java结合使用程序生成一个简单的转化文件
Java调用Kettle执行任务或转换
Morphia一个nosql的ORM框架
Python一键转Jar包,Java调用Python新姿势!
kafka-eagle在阿里云上邮件无法发送
Kubernetes官方java客户端之四:内部应用
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服