打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
基于flume

flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/

 

在apache flume的官网上没有找到sql数据源数据抓取的source,

可以利用github上的plugin插件:https://github.com/keedio/flume-ng-sql-source,1.4.3版本基于hibernate实现,已可以适配所有的关系型数据库。

目前的实验环境是在windows下,所以kafka在windows下相关的配置使用,参考了http://blog.csdn.net/linsongbin1/article/details/48022941

文章写的很到位,推荐给大家。

 

 

下面主要说一下flume-ng在windows下的启动,及具体的配置

启动:

“F:\Java\jdk1.8.0_101\bin\java.exe” -Xmx512m -Dlog4j.configuration=file:///E:\apache-flume-1.6.0-bin\conf\log4j.properties -cp "E:\apache-flume-1.6.0-bin\lib\*;E:\apache-flume-1.6.0-bin\plugins.d\sql-source\lib\flume-ng-sql-source-1.4.3-SNAPSHOT.jar;E:\apache-flume-1.6.0-bin\plugins.d\sql-source\libext\mysql-connector-java-5.1.35-bin.jar" org.apache.flume.node.Application -f E:\apache-flume-1.6.0-bin\conf\sql-kafka-conf.properties -n a1

 

主要的sql-kafka-conf.properties

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1

###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1/test

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = password
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = E://apache-flume-1.6.0-bin
a1.sources.src-1.status.file.name = sqlSource.status

# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,name from test_user where id > $@$ order by id asc

a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

##############################

a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hellotest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1


a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

 

红色部分是需要注意的,有s


至此就完成了利用flume将mysql数据实时导入到kafka中

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Flume NG 简介及配置实战【转】
Flume 中文入门手冊
Flume
flume的负载均衡load balancer
大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密
Flume多Sink方案修正
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服