ogg for bigdata 写入 kafka 写入hbase
首先是写入kafka的模板,ogg的相关操作在之前的博客里有写
OGG日常操作
这里不做过多描述
REPLICAT xxx
TARGETDB LIBFILE libggjava.so SET property=dirprm/xxx.props
SOURCECHARSET GBK
sourcedefs ./dirdef/xxx.def
REPORTCOUNT EVERY 1 MINUTES, RATE
HANDLECOLLISIONS
GROUPTRANSOPS 2000
MAXTRANSOPS 500
--IGNOREDELETES 忽略删除操作
--IGNOREUPDATES 忽略更新操作
map xxx.xxx,target xxx.xxx;
--可以写多个
xxx.props的内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=xxx.properties
gg.handler.kafkahandler.TopicName =xxx -- topic名字
gg.handler.kafkahandler.format =delimitedtext
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.pkUpdateHandling=delete-insert
gg.handler.kafkahandler.format.fieldDelimiter=xxx -- 在kafka里的分割符
gg.handler.kafkahandler.format.nullValueRepresentation= CDATA[]
gg.handler.kafkahandler.format.encoding=UTF-8
gg.handler.kafkahandler.mode =tx -- tx表示一个事务放入一个offset,设置为op为一个sql放一个
gg.contentreplaceregex=CDATA[\n]
gg.contentreplacestring=CDATA[]
gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/xxx/kafkalib/*: -- 把kafka下面的lib都拷贝过来
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=xxx/ggjava.jar
xxx.properties的配置如下:
bootstrap.servers=xxx:xxx
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
max.request.size = 5024000
send.buffer.bytes = 5024000
ogg写入hbase的配置文件如下
REPLICAT xxx
TARGETDB LIBFILE ./libggjava.so SET property=dirprm/xxx.props
SOURCECHARSET PASSTHRU
sourcedefs ./dirdef/xxx.def
REPORTCOUNT EVERY 1 MINUTES, RATE
HANDLECOLLISIONS
GROUPTRANSOPS 2000
getupdatebefores
MAXTRANSOPS 2000
REPLACEBADCHAR SKIP
MAP xxx.xxx,TARGET xxx.xxx;
其中xxx.props的内容如下:
gg.handlerlist=hbase
gg.handler.hbase.type=hbase
gg.handler.hbase.hBase98Compatible=true
gg.handler.hbase.hBaseColumnFamilyName=xxx --hbase的列簇名
gg.handler.hbase.keyValueDelimiter=CDATA[=]
gg.handler.hbase.keyValuePairDelimiter=CDATA[,]
gg.handler.hbase.encoding=UTF-8
gg.handler.hbase.pkUpdateHandling=delete-insert
gg.handler.hbase.nullValueRepresentation=CDATA[]
gg.handler.hbase.authType=none
gg.handler.hbase.includeTokens=false
gg.handler.hbase.rowkeyDelimiter=CDATA[_]
gg.handler.hbase.hbaseautoflush=true
gg.handler.hbase.mode=tx
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=xxx/hbase/lib/*:xxx/hbase/conf -- hbase的配合和lib目录
javawriter.bootoptions=-Xmx1024m -Xms512m -Djava.class.path=xxx/ggjava.jar
版权声明:本文为qq_38151907原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。