Zeppelin client执行flink sql on hive流程备份

主要目的:防止自己忘记,重新记录一下

1,Flink 1.11.2版本 集群配置
查看官网地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/

找到对应hive版本:

 下载这几个依赖包,放入到Flink 集群lib下:

2, zeppelin的flink和hive配置 整合

地址:

https://www.yuque.com/jeffzhangjianfeng/gldg8w/agf94n

 如下图:

 

3,使用Zeppelin

 简单的创建一个查询,执行成功

4,使用Zeppelin client代码执行

我们先写一个简单的paragraph:

 

 这个任务目前是没有运行状态。

我们运行本地代码执行Zeppelin client:

public class ZeppelinClientExec_test {

 public static void main(String[] args) throws Exception {
   ClientConfig clientConfig = new ClientConfig("http://bi-526:18080");
   ZeppelinClient zClient = new ZeppelinClient(clientConfig);

   String zeppelinVersion = zClient.getVersion();
   System.out.println("Zeppelin version: " + zeppelinVersion);

   //todo 创建 note 包名字


   try {

        //todo 已经存在的noteId
       String noteId = "2GAM7NK2G";
     //todo 我们这里的 noteId =  2GDTRU4U1
//       NoteResult noteResult = zClient.executeNote(noteId);
//       System.out.println("Execute note and the note result: " + noteResult);

       String paragraphId = "paragraph_1626839477148_1814189847";


       /*
        //todo   executeParagraph 和 submitParagraph的区别是 :
            executeParagraph 一直等待,submitParagraph不是一直等待
        */
       ParagraphResult paragraphResult = zClient.executeParagraph(noteId, paragraphId);
       List<Result> results = paragraphResult.getResults();
       System.out.println("results = " + results);
       String resultInText = paragraphResult.getResultInText();
       System.out.println("resultInText = " + resultInText);
       System.out.println("Paragraph result: " + paragraphResult);
       Status status = paragraphResult.getStatus();
       System.out.println("status = " + status);
       if ("FINISHED".equals(status)){

           System.out.println("任务结束.............");
       }

       //todo 提交之后没有返回结果
//       ParagraphResult submitParagraph =  zClient.submitParagraph(noteId, paragraphId);
//       zClient.waitUtilParagraphRunning(noteId, paragraphId);
//       System.out.println("Paragraph result: " + submitParagraph);
       //todo 打印输出为:Paragraph result: ParagraphResult{paragraphId='paragraph_1626782724461_1312182058', status=PENDING, results=, progress=0}

   } finally {
//     if (noteId != null) {
//       zClient.deleteNote(noteId);
//       System.out.println("Note " + noteId + " is deleted");
//     }
   }
 }
}

注意:

noteId  就是:

 

paragraphId就是:

 

执行中:

web端Zeppelin状态自动开始执行了 :

 

 

控制台执行结果:

Zeppelin version: 0.9.1-SNAPSHOT
results = [Result{type='TABLE', data='EXPR$0
18823
'}]
resultInText = EXPR$0
18823


Paragraph result: ParagraphResult{paragraphId='paragraph_1626839477148_1814189847', status=FINISHED, results=Result{type='TABLE', data='EXPR$0
18823
'}, progress=0}
status = FINISHED
 

参考:

https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh


版权声明:本文为qq_31866793原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。