Flink SQL编程套路(学习笔记三)

Flink版本1.8.0,

DataStream

  1. 获取执行环境;
  2. 创建一个TableEnvironment;
  3. 读取数据源;
  4. 数据转换;
  5. DataStream转成Table,定义字段名称;
  6. 注册table,定义table名称;
  7. SQL计算查询;
  8. table转成DataStream;
  9. 提交执行。
package FlinkDemo.flink_table;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class DataStreamTableApiTest01 {

    public static void main(String[] args) throws Exception {

        /*
          1.创建运行环境
         */
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setParallelism(1);

        /*
          2.创建TableEnvironment
         */
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(sEnv);

        /*
          3.读取数据源
         */
        DataStream<String> source = sEnv.readTextFile("D:\\scrtdownload\\tabletest.txt");

        
        /*
          4.数据转换
         */
        SingleOutputStreamOperator<Tuple2<String, String>> mapSource = source.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {

                return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
            }
        });

        
        /*
          5.DataStream 转换成表,定义字段名称
         */
        Table table = stEnv.fromDataStream(mapSource, "uid,name");
        /*
          6.注册表,定义表名后面使用
         */
        stEnv.registerTable("ac",table);
        
        /*
          7.查询
         */
        Table uid = stEnv.sqlQuery("select * from ac").select("uid");

        
        /*
          8.Table 转成DataStream
         */
        DataStream<String> dataStream = stEnv.toAppendStream(uid, String.class);

        dataStream.print();
        

        /*
          9.提交执行
         */
        sEnv.execute();

    }

}

DataSet

  1. 获取执行环境;
  2. 创建一个TableEnvironment;
  3. 读取数据源;
  4. 数据转换;
  5. 注册表,定义字段名称;
  6. SQL计算查询;
  7. table转成DataSet;
  8. 提交执行。
package FlinkDemo.flink_table;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class DataSetTableApiTest01 {

    public static void main(String[] args) throws Exception {

        /*
          1.创建运行环境
         */
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        /*
          2.创建TableEnvironment
         */
        BatchTableEnvironment batchEnv = BatchTableEnvironment.create(env);

        /*
          3.读取数据源
         */
        DataSet<String> source = env.readTextFile("D:\\scrtdownload\\tabletest.txt");

        /*
          4.数据转换
         */
        DataSet<Tuple2<String, String>> map = source.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<>(s.split(",")[0],s.split(",")[1]);
            }
        });

        /*
          5.注册表,定义表名后面使用,定义字段名称
         */
        batchEnv.registerDataSet("wc",map,"id,name");

        /*
          6.表查询
         */
        Table name = batchEnv.sqlQuery("select id,name from wc").select("name");

        /*
          7.表转成DataSet
         */
        DataSet<String> stringDataSet = batchEnv.toDataSet(name, String.class);

        /*
          8.提交执行
         */
        stringDataSet.print();


    }

}

DataSet转换成table还有另外一种方式

        //使用fromDataSet注册成表,定义字段名称
        Table table = batchEnv.fromDataSet(map, "id,name");

        //table直接使用select,where方法,和SQL中select和where一样
        Table name = table.select("id").where("id > '100'");

总结:DataStream和DataSet转换成表的方式有一点差异,但是使用方法上相同的。


版权声明:本文为qq_42185491原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。