第三讲代码
导入jar(df_connect_finance-0.1-jar-with-dependencies.jar)包到 /opt/lib/ 下
--------------------------------------------------------->>>>
POST http://localhost:8083/connectors/
body:
--------------------
{
"name": "finace_source_connector_01",
"config": {
"cuid": "finace_source_connector_01",
"connector.class": "com.datafibers.flink.stream.FinanceSourceConnector",
"tasks.max": "1",
"topic": "stock_source",
"portfolio": "Top 10 IT Service",
"interval": "5",
"spoof": "RAND",
"schema.registry.uri": "http://localhost:8081"
}
}
---------------------------
GET http://localhost:8002/subjects
GET http://localhost:8002/subjects/stock_source/versions
GET http://localhost:8002/schemas/ids/1
在第一个窗口输入:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic stock_source
//然后就会不断有数据更新进来
在第二个窗口打开:mysql -u root -pmypassword
show databases; 查看有没有simple_stream数据库
有的话use simple_stream;
没有的话create simple_stream;
进去看有没有stock_source
有的话drop掉
然后进入postman:
POST http://localhost:8083/connectors/
body:
---------------
{
"name": "jdbc-sink-mysql-01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "stock_source",
"connection.url": "jdbc:mysql://localhost:3306/simple_stream",
"connection.user": "root",
"connection.password": "mypassword",
"auto.create": "true"
}
}
-----------------
回到mysql:
show tables;
select count(*) from simple_stream;//查看表内数据量
select * from simple_stream limit 0,10;//查看表内前十条;
版权声明:本文为qq_42612645原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。