ogg for bigdata 写入 kafka 写入hbase

ogg for bigdata 写入 kafka 写入hbase

首先是写入kafka的模板,ogg的相关操作在之前的博客里有写

OGG日常操作

这里不做过多描述

REPLICAT xxx
TARGETDB LIBFILE libggjava.so SET property=dirprm/xxx.props
SOURCECHARSET GBK
sourcedefs ./dirdef/xxx.def
REPORTCOUNT EVERY 1 MINUTES, RATE
HANDLECOLLISIONS
GROUPTRANSOPS 2000
MAXTRANSOPS 500
--IGNOREDELETES  忽略删除操作
--IGNOREUPDATES  忽略更新操作

map xxx.xxx,target xxx.xxx;
--可以写多个

xxx.props的内容如下:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=xxx.properties
gg.handler.kafkahandler.TopicName =xxx  -- topic名字
gg.handler.kafkahandler.format =delimitedtext
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.pkUpdateHandling=delete-insert  
gg.handler.kafkahandler.format.fieldDelimiter=xxx   -- 在kafka里的分割符
gg.handler.kafkahandler.format.nullValueRepresentation= CDATA[]
gg.handler.kafkahandler.format.encoding=UTF-8
gg.handler.kafkahandler.mode =tx   -- tx表示一个事务放入一个offset,设置为op为一个sql放一个
gg.contentreplaceregex=CDATA[\n]
gg.contentreplacestring=CDATA[]
gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec

gg.classpath=dirprm/:/xxx/kafkalib/*:   -- 把kafka下面的lib都拷贝过来
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=xxx/ggjava.jar

xxx.properties的配置如下:

bootstrap.servers=xxx:xxx
acks=1
compression.type=gzip
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

max.request.size = 5024000
send.buffer.bytes = 5024000

ogg写入hbase的配置文件如下

REPLICAT xxx
TARGETDB LIBFILE ./libggjava.so SET property=dirprm/xxx.props
SOURCECHARSET PASSTHRU
sourcedefs ./dirdef/xxx.def
REPORTCOUNT EVERY 1 MINUTES, RATE
HANDLECOLLISIONS
GROUPTRANSOPS 2000
getupdatebefores
MAXTRANSOPS 2000
REPLACEBADCHAR SKIP

MAP xxx.xxx,TARGET xxx.xxx;

其中xxx.props的内容如下:

gg.handlerlist=hbase
gg.handler.hbase.type=hbase
gg.handler.hbase.hBase98Compatible=true
gg.handler.hbase.hBaseColumnFamilyName=xxx --hbase的列簇名
gg.handler.hbase.keyValueDelimiter=CDATA[=]
gg.handler.hbase.keyValuePairDelimiter=CDATA[,]
gg.handler.hbase.encoding=UTF-8
gg.handler.hbase.pkUpdateHandling=delete-insert
gg.handler.hbase.nullValueRepresentation=CDATA[]
gg.handler.hbase.authType=none
gg.handler.hbase.includeTokens=false
gg.handler.hbase.rowkeyDelimiter=CDATA[_]

gg.handler.hbase.hbaseautoflush=true
gg.handler.hbase.mode=tx

goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

gg.classpath=xxx/hbase/lib/*:xxx/hbase/conf  -- hbase的配合和lib目录

javawriter.bootoptions=-Xmx1024m -Xms512m -Djava.class.path=xxx/ggjava.jar

版权声明:本文为qq_38151907原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。