clickhouse从MySQL同步数据

MySQL引擎

MySQL引擎用于将远程的MySQL服务器中的表映射到clickhouse中,并允许对表进行insert和select操作,以方便在clickhouse与MySQL之间进行数据交换.
MySQL数据库引擎会将对其的查询转换为MySQL语法并发送到MySQL服务器中,因此以执行诸如SHOW TABLES 或 SHOW CREATE TABLE之类的操作.
但无法对其执行以下操作:

  • ATTACH/DETACH
  • DROP
  • RENAME
  • CREATE TABLE
  • ALTER

CREATE DATABASE:

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster] ENGINE = MySQL('host:port','database','user','password')

PS:该database就是MySQL的database的映射,MySQL中的数据发生变化,clickhouse中的数据也会跟着变化.

单次单表同步

此为将数据从MySQL的某表中全量同步到clickhouse中,clickhouse中的数据不会随MySQL中数据的变化而变化.

CREATE TABLE tmp ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('hostip:3306','db','table','user','passwd');

性能:

数据量时间
500万60S

使用CSV文件导入(麻烦且慢)

  1. 从MySQL中导出数据到csv文件
//查询MySQL导出文件地址
show variables like '%secure';
select * from mysql_table into outfile '/var/lib/mysql-files/test.csv' 
fields terminated by ',' optionally enclosed by '"' escaped by '"'
lines terminated by '\r\n';
  1. clickhouse导入csv文件
//创建表
echo 'create table test(col1 colType, col2 colType ...) ENGINE = MergeTree;' | clickhouse-client

导入数据
cat test.csv | clickhouse_client --query='insert into test FORMAT CSV'

定时增量同步

参考博主 打卤 的博客

import pymysql
from clickhouse_driver import Client

click_cllient = Client("host","port","db","user","passwd")
mysql_client = pymysql.connect(host='***',port='***',user='***',passwd='***',db='***',charset='utf8')

def get_id():
	click_sql = """select id from test order by id desc limit 1"""
	try:
		list = click_client.execute(click_sql,types_check = True)
		for i in list:
			id = i[0]
			return id
	except Exception as e:
		print(e)

def get_data():
	log_id = get_id()
	cursor = mysql_cliet.cursor()
	sql = """select * from test where id > '%s'"""%log_id
	cursor.execute(sql)
	results = cursor.fetchall()
	mysql_client.close()
	return results

def insert_data(data):
	try:
		click_client.execute('insert into test values',[data],types_check=True)
		return "success!!!"
	except Exception e:
		return(e)

def main():
	print(get_id())
	list = get_data()
	for data in list:
		print(data)
		print(insert_data(data))

if __name__ == '__main__':
	main()

版权声明:本文为weixin_43929380原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。