Flink-connector-sql写入doris on zeppelin 实践

目标:在Flink代码里面支持对doris的读写操作,Flink sql on doris,将flink sql作为计算引擎。

1,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html#full-stack-example

整个流程:

2,自己开发

1)创建解析和验证选项的工厂,

2)连接器,实现表

3)实现和发现自定义格式,

4)并使用提供的工具,如数据结构转换器和FactoryUtil。

 

doris sink的实现开发

 

1,先实现 DorisDynamicTableFactory:

package org.apache.flink.connector.doris.table;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;

/**
 * @program: flink-neiwang-dev
 * @description: 1
 * @author: Mr.Wang
 * @create: 2020-11-12 14:04
 **/
public class DorisDynamicTableSourceFactory implements DynamicTableSinkFactory {
    //todo 名称叫doris
    public static final String IDENTIFIER = "doris";

    public static final ConfigOption<String> URL = ConfigOptions
            .key("url")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc database url.");
    public static final ConfigOption<String> TABLE_NAME = ConfigOptions
            .key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc table name.");
    public static final ConfigOption<String> USERNAME = ConfigOptions
            .key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc user name.");
    public static final ConfigOption<String> PASSWORD = ConfigOptions
            .key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc password.");
    private static final ConfigOption<String> DRIVER = ConfigOptions
            .key("driver")
            .stringType()
            .noDefaultValue()
            .withDescription("the class name of the JDBC driver to use to connect to this URL. " +
                    "If not set, it will automatically be derived from the URL.");
    // read config options
    private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions
            .key("scan.partition.column")
            .stringType()
            .noDefaultValue()
            .withDescription("the column name used for partitioning the input.");
    private static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions
            .key("scan.partition.num")
            .intType()
            .noDefaultValue()
            .withDescription("the number of partitions.");
    private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions
            .key("scan.partition.lower-bound")
            .longType()
            .noDefaultValue()
            .withDescription("the smallest value of the first partition.");
    private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions
            .key("scan.partition.upper-bound")
            .longType()
            .noDefaultValue()
            .withDescription("the largest value of the last partition.");
    private static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions
            .key("scan.fetch-size")
            .intType()
            .defaultValue(0)
            .withDescription("gives the reader a hint as to the number of rows that should be fetched, from" +
                    " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The" +
                    " default value is zero.");

    // look up config options
    private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions
            .key("lookup.cache.max-rows")
            .longType()
            .defaultValue(-1L)
            .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +
                    "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " +
                    "specified. Cache is not enabled as default.");
    private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions
            .key("lookup.cache.ttl")
            .durationType()
            .defaultValue(Duration.ofSeconds(10))
            .withDescription("the cache time to live.");
    private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions
            .key("lookup.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("the max retry times if lookup database failed.");

    // write config options
    private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
            .key("sink.buffer-flush.max-rows")
            .intType()
            .defaultValue(100)
            .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
                    " of records, will flush data. The default value is 100.");
    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
            .key("sink.buffer-flush.interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(1))
            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
                    "default value is 1s.");
    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
            .key("sink.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("the max retry times if writing records to database failed.");
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

        //todo 调用 DorisDynamicTableSink ,传入参数
        return new DorisDynamicTableSink(options,physicalSchema);

    }

    @Override
    public String factoryIdentifier() {
        //todo 设置名称
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        /**
         "WITH (\n" +
         " 'connector' = 'jdbc',\n"+
         " 'url' = 'jdbc:mysql://192.168.x.xx:3306/doris_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC',\n"+
         "   'driver' = 'com.mysql.jdbc.Driver',\n"+
         "   'table-name' = 'flink_insert',\n"+
         "   'username' = 'root',\n"+
         "   'password' = '123456'\n"+
         ")";
         */

        //todo 必要的参数
        Set<ConfigOption<?>> requiredOptions = new HashSet();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        requiredOptions.add(USERNAME);
        requiredOptions.add(PASSWORD);
        requiredOptions.add(DRIVER);
        return requiredOptions;

    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        //todo 可选参数
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        optionalOptions.add(DRIVER);
        optionalOptions.add(USERNAME);
        optionalOptions.add(PASSWORD);
        optionalOptions.add(SCAN_PARTITION_COLUMN);
        optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
        optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
        optionalOptions.add(SCAN_PARTITION_NUM);
        optionalOptions.add(SCAN_FETCH_SIZE);
        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
        optionalOptions.add(LOOKUP_CACHE_TTL);
        optionalOptions.add(LOOKUP_MAX_RETRIES);
        optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
        optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
        optionalOptions.add(SINK_MAX_RETRIES);
        return optionalOptions;
    }
}

 

2,然后再实现DorisDynamicTableSink

package org.apache.flink.connector.doris.table;

import doris.DorisStreamLoad;
import lombok.SneakyThrows;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

import static org.apache.flink.util.Preconditions.checkState;

/**
 * @program: flink-neiwang-dev
 * @description:
 * @author: Mr.Wang
 * @create: 2020-11-12 12:59
 **/
public class DorisDynamicTableSink implements DynamicTableSink {

    private ReadableConfig options;

    private  TableSchema tableSchema;

    public DorisDynamicTableSink(ReadableConfig options,TableSchema schema) {
        this.options = options;

        this.tableSchema = schema;
    }


    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        //todo jdbc源码这里是做校验。
//        validatePrimaryKey(requestedMode);
        //todo 这里是使用模式,这里只做插入,
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
//                .addContainedKind(RowKind.DELETE)
//                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();
    }

    @SneakyThrows
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

        //创建doris连接
        String URL = options.get(DorisDynamicTableSourceFactory.URL);
        String TABLE_NAME = options.get(DorisDynamicTableSourceFactory.TABLE_NAME);


        final TypeInformation<RowData> rowDataTypeInformation = (TypeInformation<RowData>) context
                .createTypeInformation(tableSchema.toRowDataType());

        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();

        SinkFunction dorisSinkFunction = new DorisSinkFunction2("", "", fieldDataTypes, tableSchema.getFieldNames());

        return SinkFunctionProvider.of(dorisSinkFunction);


    }

    @Override
    public DynamicTableSink copy() {

        return new DorisDynamicTableSink(this.options,this.tableSchema);
    }

    @Override
    public String asSummaryString() {
        //todo jdbc里面是"JDBC:" + dialectName
        return "doris_sink";
    }
}


3,因为doris是支持的jdbc协议, 在实际测试中,发现通过jdbc协议写入doris太慢,我们需要修改插入方式,这里是实际就是stream 流写入,就是需要考虑的是

这里是单线程还是并发,在目前不确定的情况下,使用了队列,做到了单线程保证有序。

package org.apache.flink.connector.doris.table;

/**
 * @program: flink-neiwang-dev
 * @description:
 * @author: Mr.Wang
 * @create: 2020-11-12 15:25
 **/

import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import doris.DorisStreamLoad;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

import java.util.ArrayList;
import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantLock;


public class DorisSinkFunction2 implements SinkFunction<RowData> {
    private final String job;
    private final String metrics;
     private final String[] fieldNames;
    private final DataType[] fieldDataTypes;

    private ReentrantLock reentrantLock = new ReentrantLock();

    //todo 创建一个队列
    public LinkedBlockingQueue<JSONObject> dorisQueue = new LinkedBlockingQueue<JSONObject>();

    private List<JSONObject> jsonObjects = new ArrayList<>();
    public static Long preTime = 0L;
    public static Boolean firstInput = true;
    private  DorisStreamLoad dorisStreamLoad ;

    public DorisSinkFunction2(String job, String metrics, DataType[] fieldDataTypes, String[] fieldNames) {
        this.job = job;
        this.metrics = metrics;
         this.fieldDataTypes = fieldDataTypes;
        this.fieldNames = fieldNames;
        dorisStreamLoad = new DorisStreamLoad();
    }

    //todo 还需要考虑的问题是,如果一批数据失败了,怎么办。

    @Override
    public void invoke(RowData value, Context context) throws Exception {

        //todo 初次进入赋值。
        if (firstInput){
            preTime = System.currentTimeMillis();
            firstInput = false;
        }
        //todo 数据类型,我这里是有insert,不需要关心。
        final RowKind rowKind = value.getRowKind();


        //todo  这里是具体的数据发送逻辑,对接外部中间件
//         RowData rowData = (RowData) value;

        //todo 手动解析
//        System.out.println("value = " + value);
//        System.out.println("fieldNames = " + fieldNames);
//        System.out.println("fieldDataTypes = " + fieldDataTypes);
        JSONObject json = new JSONObject();
        //
        for (int i = 0; i < fieldNames.length; i++) {
            String dataType = fieldDataTypes[i].toString();
            String fieldName = fieldNames[i].toString();
            //todo 这里写一个枚举

            if ("INT".equals(dataType)) {
                json.put(fieldName, value.getInt(i));
            } else if ("STRING".equals(dataType)) {
                //todo 这个地方一定要注意 .toString() 坑了我好久啊。
                json.put(fieldName, value.getString(i).toString());
            }

        }
//        jsonArray2.add(json);
        //todo 放入队列
        dorisQueue.add(json);
        long timeMillis = System.currentTimeMillis();
        long time = timeMillis - preTime;


        if (dorisQueue.size() > 1000 || time > 1000) {
            preTime = System.currentTimeMillis();
            reentrantLock.lock();
            try {
                dorisQueue.drainTo(jsonObjects);
                JSONObject [] v = new JSONObject[jsonObjects.size()];
                jsonObjects.toArray( v);
                dorisStreamLoad.sendData(JSON.toJSONString(v), "example_db", "join_test");
            } catch (Exception ex) {

            } finally {
                reentrantLock.unlock();
            }

        }


    }


}

 

4,doris stream load实现方式

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class DorisStreamLoad {
    private final static String DORIS_HOST = "dev-hadoop-ct7-6-143";
    private final static String DORIS_DB = "stream_load";
//    private final static String DORIS_TABLE = "join_test";
    private final static String DORIS_USER = "root";
    private final static String DORIS_PASSWORD = "root";
    private final static int DORIS_HTTP_PORT = 8030;

    public void sendData(String content,String  db ,String table) throws Exception {
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                DORIS_HOST,
                DORIS_HTTP_PORT,
                db,
                table);
//                DORIS_TABLE);

        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });

        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(content, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(DORIS_USER, DORIS_PASSWORD));
            put.setHeader("strip_outer_array", "true");
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
//            put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
            put.setHeader("format", "json");

            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that doris be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }

                System.out.println(loadResult);
            }
        }
    }

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public static void main(String[] args) throws Exception {
        int id1 = 1;
        int id2 = 10;
        String id3 = "ddddd";
        int rowNumber = 100;
        String oneRow = id1 + "\t" + id2 + "\t" + id3 + "\n";

        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < rowNumber; i++) {
            stringBuilder.append(oneRow);
        }

        //in doris 0.9 version, you need to comment this line
        //refer to https://github.com/apache/incubator-doris/issues/783
        stringBuilder.deleteCharAt(stringBuilder.length() - 1);

        String loadData = stringBuilder.toString();
        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad();
//        dorisStreamLoad.sendData(loadData);
    }
}

5,整合代码,优化格式.

不想整了 。。

后面注意点,要加入services:

6,测试

public class DorisSqlTest2 {
    private  static final String DORIS_SQL =  "CREATE TABLE doris_test (\n"+
            " id STRING,\n" +
            " name STRING\n"+
            " ) WITH (\n"+
            "   'connector' = 'doris',\n"+
            "   'doris.host' = '192.168.6.143',\n"+
            "   'doris.port' = '8030',\n"+
            "   'database-name' = 'example_db',\n"+
            "   'table-name' = 'test',\n"+
            "   'username' = 'root',\n"+
            "   'password' = 'root',\n"+
            "   'window' = '10000',\n"+
            "   'max.batch' = '500'\n"+
            " )";



    private static final String DATA_GEN =  "CREATE TABLE datagen (\n" +
            " id STRING,\n" +
            " name STRING,\n" +
            " user_age INT,\n" +
            " user_other STRING,\n" +
            " ts AS localtimestamp\n" +
            ") WITH (\n" +
            " 'connector' = 'datagen',\n" +
            " 'rows-per-second'='500',\n" +
            " 'fields.id.length'='7',\n" +
            " 'fields.user_age.min'='1',\n" +
            " 'fields.user_age.max'='100',\n" +
            " 'fields.name.length'='2',\n" +
            " 'fields.user_other.length'='10'\n" +
            ")";



    private  static final String KAFKA_SQL ="CREATE TABLE kafkaTable (" +
            " id STRING,\n" +
            " name STRING\n" +
            ") WITH (\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'test1',\n" +
            " 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'format' = 'json',\n" +
            " 'scan.startup.mode' = 'earliest-offset'\n" +
            ")";

    public static void main(String[] args) {

//        System.out.println("DORIS_SQL = " + DORIS_SQL);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//		StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env);
        bsTableEnv.executeSql(DORIS_SQL);
        bsTableEnv.executeSql(DATA_GEN);
        bsTableEnv.executeSql(KAFKA_SQL);

        bsTableEnv.executeSql("insert into doris_test select id,name from datagen");
//        bsTableEnv.executeSql("insert into kafkaTable select id,name from datagen");
    }
}

 

doris的表:

 

 

集群运行:

 

 

在zeppelin提交任务:

 

注意点:

在本地运行没问题,在集群运行报错的情况下,可能是因为打包的问题,在pom文件加上:

 

复制地址:

https://github.com/lonelyGhostisdog/flinksql/blob/master/pom.xml

 

最后要感谢:

博客地址:

https://blog.csdn.net/u010034713

 


版权声明:本文为qq_31866793原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。