flink学习(五)DataStream API

编程的模型

编程的主要是用以下三步来写

  1. data source
    获取数据的来源例如基于集合中,文件,socket,MySQL
  2. transforations
    对获取到的数据进行相关的规则计算
  3. sink
    将处理好的数据输出或者保存
    在这里插入图片描述

Source

基于集合的Source

  • env.fromElements():元素
    在这里插入图片描述
    在这里插入图片描述

  • env.fromCollection():集合
    在这里插入图片描述

在这里插入图片描述

  • env.generateSequence():产生序列
    在这里插入图片描述
    在这里插入图片描述

  • env.fromSequence():来自于序列
    在这里插入图片描述

在这里插入图片描述

全代码

package datastream;

/**
 * @author 公羽
 * date 2021/7/3
 */
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class source_set {
    public static void main(String[] args) throws Exception {
//        1.环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//        流处理模式
//        2.source
//        env.formElements() 元素
        DataStream<String> ds1 = env.fromElements("test1","test2","test3");
//        env.fromcollection 集合
        DataStream<String> ds2 = env.fromCollection(Arrays.asList("test4","test5","test6"));
//        env.generateSequence() 产生序列 已经放弃的用法
        DataStream<Long> ds3 = env.generateSequence(1,10);
//        env.fromSequence() 来自序列
        DataStream<Long> ds4 =  env.fromSequence(1,10);
//        3.transformation
//        4.sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();
//        5.执行
        env.execute();
    }
}

基于文件的Source

  • env.readTextFile(本地/hdfs文件/文件夹/压缩包)

在pom.xml文件中加入
在这里插入图片描述

要是想要读取hdfs上的数据需要把hadoop的配置文件hdfs-site.xml和core-site.xml下载复制到项目resource是文件下
在这里插入图片描述

  • 本地

在这里插入图片描述
在这里插入图片描述

  • 本地文件夹
  • 在这里插入图片描述

在这里插入图片描述

  • hdfs
    在这里插入图片描述
    在这里插入图片描述
  • 压缩包
    在这里插入图片描述

在这里插入图片描述

全代码

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 公羽
 * date 2021/7/3
 */
public class source_file {
    public static void main(String[] args) throws Exception {
//        1.环境准备
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//        流处理模式
//        2.source
//        读取单个文件
        DataStream<String> ds1 =  env.readTextFile("E:\\input\\test.txt");
//        读取文件夹
        DataStream<String> ds2 =  env.readTextFile("E:\\input");
//        读取hdfs上单个文件
        DataStream<String> ds3 =  env.readTextFile("hdfs://spark01:9000/test/test.txt");
//        读取压缩包
        DataStream<String> ds4 = env.readTextFile("hdfs://spark01:9000/test/test.gz");
//        3.transformation
//        4.sink
//        ds1.print();
//        ds2.print();
//        ds3.print();
        ds4.print();
//         5.executor
        env.execute();
    }
//
}

基于socket的Source

监听9999端口
在这里插入图片描述
在这里插入图片描述

全代码

package datastream;

import javafx.util.converter.DateStringConverter;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Date;

/**
 * @author 公羽
 * date 2021/7/3
 */
public class source_socket {
    public static void main(String[] args) throws Exception {
//        1.环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//        流处理模式
//        2.source
        DataStream<String> ds1 = env.socketTextStream("spark01",9999);
//        3.transformation
        DataStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {
            @Override
//            重写map方法
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split(" ");//按照" "切词
                for (String word : words){
                    collector.collect(word);
                }
            }
        });
         DataStream<Tuple2<String, Integer>> ds3 = ds2.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
//            重写reduce方法
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s,1);//组成二元组的形式
            }
        });
//         按照key结合进行统计
         KeyedStream<Tuple2<String,Integer>, String > ds4 = ds3.keyBy(t -> t.f0);
         DataStream<Tuple2<String,Integer>> ds5 = ds4.sum(1);
//         4.sink
        ds5.print();
//        5.execute
        env.execute();
    }
}

自定义source–MySQL

安装插件Lombok
在这里插入图片描述
添加依赖
在这里插入图片描述
在这里插入图片描述

数据库中的数据
在这里插入图片描述
创建学生实体类

package datastream;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author 公羽
 * date 2021/7/3
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class student {
    private Integer id;
    private String name;
    private Integer age;
}

创建自定义来源类

package datastream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;
public class source_mysql extends RichParallelSourceFunction<student> {
    private Connection connection = null;
    private PreparedStatement preparedStatement = null;
    private boolean flag = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","123456");
        String sql = "select * from test_flink";
        preparedStatement = connection.prepareStatement(sql);
    }

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while (flag) {
            ResultSet rs = preparedStatement.executeQuery();
            while (rs.next()) {
                int id = rs.getInt("id");
                String name = rs.getString("name");
                int age = rs.getInt("age");
                sourceContext.collect(new student(id,name,age));
            }
            TimeUnit.SECONDS.sleep(5);
        }
    }
    @Override
    public void cancel() {
        flag = false;
    }
    @Override
    public void close() throws Exception {
        super.close();
        preparedStatement.close();
        connection.close();
    }
}

编写主类

package datastream;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 公羽
 * date 2021/7/3
 */

public class driver_mysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<student> ds = env.addSource(new source_mysql()).setParallelism(1);
        ds.print();
        env.execute();
    }
}

在这里插入图片描述

Transformation

基本操作

flatmap
将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

  • map
    将函数作用在集合的每一个元素上,并返回作用后的结果
  • keyBy
    按照指定的key对流中的数据进行分组,注意流处理中没有groupBy,而是keyBy
  • filter
    按照指定的条件对集合中的元素进行过滤,返回符合条件的元素
  • sum
    按照指定的字段对集合中的元素进行求和
  • reduce
    对集合中的元素进行聚合

案列

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author 公羽
 * date 2021/7/4
 */
public class trans_basic {
    public static  void  main(String args[]) throws Exception {
        //1、准备环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式
        //2、准备数据-source
        DataStream<String> lineDS = env.fromElements("spark heihei sqoop hadoop","spark flink","hadoop fink heihei spark");
        //3、处理数据-transformation
        //3.1 将每一行数据切分成一个个的单词组成一个集合
        DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //s就是一行行的数据,再将每一行分割为一个个的单词
                String[] words = s.split(" ");
                for (String word : words) {
                    //将切割的单词收集起来并返回
                    collector.collect(word);
                }
            }
        });
        //3.1.5 对数据进行敏感词过滤
        DataStream<String>  filterDS = wordsDS.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return !s.equals("heihei");
            }
        });
        //3.2 对集合中的每个单词记为1
        DataStream<Tuple2<String,Integer>> wordAndOnesDS = filterDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                //s就是进来的一个个单词,再跟1组成一个二元组
                return Tuple2.of(s,1);
            }
        });
        //3.3 对数据按照key进行分组
        KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);
        //3.4 对各个组内的数据按照value进行聚合也就是求sum
        DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
        //3.5 对结果聚合
        DataStream<Tuple2<String,Integer>> redResult = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
                return Tuple2.of(t2.f0,t2.f1 + t2.f1);
            }
        });
        //4、输出结果-sink
        aggResult.print();
        redResult.print();
//        5、触发执行-execute
        //说明:如果有print那么Dataset不需要调用execute,DataStream需要调用execute
        env.execute();
    }
}

在这里插入图片描述

合并和拆分

合并

  • union和connect

union合并多个同类型的数据流,并生成一个同类型的新的数据流,connect连接两个数据流,这两个数据流可以是不同的类型

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * @author 公羽
 * date 2021/7/4
 */
public class trans_con {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<String> dataStreamSource = env.fromElements("test1", "test2", "test3");
        DataStream<String> stringDataStreamSource = env.fromCollection(Arrays.asList("test4", "test5", "test6"));
        DataStream<Long> longDataStreamSource = env.generateSequence(1, 10);
        DataStream<Long> longDataStreamSource1 = env.fromSequence(1, 10);
        DataStream<String> union = dataStreamSource.union(stringDataStreamSource);
        ConnectedStreams<String, Long> connect = dataStreamSource.connect(longDataStreamSource);
        DataStream<String> map = connect.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String s) throws Exception {
                return "String -> String" +" "+ s;
            }
            @Override
            public String map2(Long aLong) throws Exception {
                return "Long -> String" + " " + aLong.toString();
            }
        });
//        union.print();
        map.print();
        env.execute();
    }
}

在这里插入图片描述
拆分

  • Side Outputs

Side Outputs可以使用process方法对流中的数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author 公羽
 * date 2021/7/4
 */
public class trans_split {
    public static void main(String[] args) throws Exception {
//        环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//        source
        DataStream<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        transformation
//        拆分
//        定义标签
        OutputTag<Integer> tag1 = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));
        OutputTag<Integer> tag2 = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
        SingleOutputStreamOperator<Integer> process = integerDataStreamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer integer, Context context, Collector<Integer> collector) throws Exception {
                    if (integer % 2 == 0){
                        context.output(tag1,integer);
                    }else {
                        context.output(tag2,integer);
                    }
            }
        });
//        取出标签
        DataStream<Integer> sideOutput = process.getSideOutput(tag1);
        DataStream<Integer> sideOutput1 = process.getSideOutput(tag2);
//        sink
        sideOutput.print();
        sideOutput1.print();
//        execute
        env.execute();
    }
}

在这里插入图片描述

分区

  • rebalance重平衡分区

解决数据倾斜,数据倾斜指的是大量的数据集中于一台节点上,而其他节点的负载较轻

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 公羽
 * date 2021/7/4
 */
public class trans_para {
    public static void main(String[] args) throws Exception {
//        环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//        source
        DataStream<Long> longDataStreamSource = env.fromSequence(1, 10000);
//        transformation
//        将数据随机分配一下,有可能出现数据倾斜
        DataStream<Long> filter = longDataStreamSource.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong > 10;
            }
        });
//        直接处理,有可能出现数据倾斜
        DataStream<Tuple2<Integer, Integer>> map = filter.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long aLong) throws Exception {
                int id = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(id,1);
            }
        }).keyBy(t -> t.f0).sum(1);
        //在数据输出前进行了rebalance重平衡分区,解决数据的倾斜
        DataStream<Tuple2<Integer, Integer>> map1 = filter.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long aLong) throws Exception {
                int id = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(id,1);
            }
        }).keyBy(t -> t.f0).sum(1);
//        sink
        map.print();
        map1.print();
//        execute
        env.execute();
    }
}

在这里插入图片描述

sink

预定义Sink

  • ds.print():直接输出到控制台
    在这里插入图片描述
    在这里插入图片描述

  • ds.printToErr():直接输出到控制台,用红色
    在这里插入图片描述

在这里插入图片描述

  • ds.writeAsText(“本地/HDFS”,WriteMode.OVERWRITE).setParallelism(n):输出到本地或者hdfs上,如果n=1,则输出为文件名,如果n>1,则输出为文件夹
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
全代码

package datastream;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 公羽
 * date 2021/7/4
 */
public class sink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<String> ds = env.fromElements("test1", "test2", "test3");
//        sink
//        ds.print();
//        ds.printToErr();
//        ds.writeAsText("f:/test/test1").setParallelism(1);
        ds.writeAsText("f:/test/test2").setParallelism(2);
        env.execute();
    }
}

自定义Sink

将数据存入MySQL中
定义sink类

package datastream;
import datastream.student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
 * @author 公羽
 * date 2021/7/4
 */
public class sink_mysql extends RichSinkFunction<student> {
    private Connection connection = null;
    private PreparedStatement preparedStatement = null;
        @Override
    public void open(Configuration parameters) throws Exception {
        //调用父类的构造方法,可删除
        super.open(parameters);
        //加载mysql驱动,建立连接
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","123456");
        String sql = "insert into test_flink(name,id,age) values(?,?,?)";
        //建立Statement
        preparedStatement = connection.prepareStatement(sql);
    }
    @Override
    public void invoke(student value, Context context) throws Exception {
        //给ps中的?设置具体的值
        preparedStatement.setString(1,value.getName());//获取姓名
        preparedStatement.setInt(2,value.getId());//获取id
        preparedStatement.setInt(3,value.getAge());//获取年龄
        //执行sql
        preparedStatement.executeUpdate();
    }
    @Override
    public void close() throws Exception {
        super.close();
        preparedStatement.close();
        connection.close();
    }
}

定义sink主类

package datastream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author 公羽
 * date 2021/7/4
 */
public class sink_driver {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<student> test1 = env.fromElements(new student(10,"test1",100));
        test1.addSink(new sink_mysql());
        env.execute();
    }
}

在这里插入图片描述


版权声明:本文为qq_43659234原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。