打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
利用Flink stream从kafka中写数据到mysql

眼看到年底了,许久也没更新博客,最近也比较清闲,顺带学习了下大数据框架Flink,  这框架跟Spark类似,上手比较容易,使用的话两三天就可以开始写代码,在有些方面比spark要强,比如说流处理,下面就用flink中的Stream从kafka中读取数据写入到mysql中,废话不多说,具体上代码吧:

首先看配置文件:

  1. #mysql  
  2. mysql.driver=com.mysql.jdbc.Driver  
  3. mysql.url=jdbc:mysql://localhost:3306/mybatis  
  4. mysql.user=root  
  5. #kafka  
  6. kafka.topic=mytopic  
  7. kafka.hosts=localhost:9092  
  8. kafka.zookper=localhost:2181  
  9. kafka.group=group  

maven依赖情况:
  1. <dependency>  
  2.             <groupId>org.apache.flink</groupId>  
  3.             <artifactId>flink-streaming-java_2.10</artifactId>  
  4.             <version>1.1.3</version>  
  5.         </dependency>  
  6.         <!-- Use this dependency if you are using the DataSet API -->  
  7.         <dependency>  
  8.             <groupId>org.apache.flink</groupId>  
  9.             <artifactId>flink-java</artifactId>  
  10.             <version>1.1.3</version>  
  11.         </dependency>  
  12.         <dependency>  
  13.             <groupId>org.apache.flink</groupId>  
  14.             <artifactId>flink-clients_2.10</artifactId>  
  15.             <version>1.1.3</version>  
  16.         </dependency>  
  17.         <dependency>  
  18.             <groupId>org.apache.flink</groupId>  
  19.             <artifactId>flink-table_2.10</artifactId>  
  20.             <version>1.1.3</version>  
  21.         </dependency>  
  22.         <!-- https://mvnrepository.com/artifact/org.nd4j/nd4j-api -->  
  23.         <dependency>  
  24.             <groupId>org.apache.flink</groupId>  
  25.             <artifactId>flink-connector-kafka-0.8_2.10</artifactId>  
  26.             <version>1.1.3</version>  
  27.         </dependency>  
  28.         <dependency>  
  29.             <groupId>mysql</groupId>  
  30.             <artifactId>mysql-connector-java</artifactId>  
  31.             <version>5.1.17</version>  
  32.         </dependency>  

发送数据到kafka的类:
  1. package com.wanda.flink;  
  2. import java.util.Properties;  
  3. import java.util.Random;  
  4. import org.apache.commons.lang3.RandomStringUtils;  
  5. import com.wanda.common.Config;  
  6. import kafka.javaapi.producer.Producer;  
  7. import kafka.producer.KeyedMessage;  
  8. import kafka.producer.ProducerConfig;  
  9. public class KafkaProducte {  
  10.     public static void main(String[] args) throws InterruptedException {  
  11.         String broker = Config.getString("kafka.hosts");  
  12.         System.out.println("broker:" + broker);  
  13.         String topic = Config.getString("kafka.topic");  
  14.         int count = 10;  
  15.         Random random=new Random();  
  16.         Properties props = new Properties();  
  17.         props.put("metadata.broker.list", broker);  
  18.         props.put("serializer.class""kafka.serializer.StringEncoder");  
  19.         props.put("request.required.acks""1");  
  20.         ProducerConfig pConfig = new ProducerConfig(props);  
  21.         Producer<String, String> producer = new Producer<String, String>(  
  22.                 pConfig);  
  23.         for (int i = 0; i < count; ++i) {  
  24.             String josn=random.nextInt(10)+":"+RandomStringUtils.randomAlphabetic(3)+":"+random.nextInt(1000);  
  25.             producer.send(new KeyedMessage<String, String>(topic, josn));  
  26.             Thread.sleep(1000);  
  27.             System.out.println("第"+i+"条数据已经发送");  
  28.         }  
  29.     }  
  30. }  



重写RichSinkFunction,用于写入到mysql中:


  1. package com.wanda.flink;  
  2.   
  3. import java.sql.DriverManager;  
  4. import java.sql.Connection;  
  5. import java.sql.PreparedStatement;  
  6. import org.apache.flink.api.java.tuple.Tuple3;  
  7. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;  
  8.   
  9. import com.wanda.common.Config;  
  10.   
  11. public class MySQLSink extends  
  12.         RichSinkFunction<Tuple3<Integer, String, Integer>> {  
  13.   
  14.     private static final long serialVersionUID = 1L;  
  15.     private Connection connection;  
  16.     private PreparedStatement preparedStatement;  
  17.     String username = Config.getString("mysql.user");  
  18.     String password = "";  
  19.     String drivername = Config.getString("mysql.driver");  
  20.     String dburl = Config.getString("mysql.url");  
  21.   
  22.     @Override  
  23.     public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {  
  24.         Class.forName(drivername);  
  25.         connection = DriverManager.getConnection(dburl, username, password);  
  26.         String sql = "replace into orders(order_id,order_no,order_price) values(?,?,?)";  
  27.         preparedStatement = connection.prepareStatement(sql);  
  28.         preparedStatement.setInt(1, value.f0);  
  29.         preparedStatement.setString(2, value.f1);  
  30.         preparedStatement.setInt(3, value.f2);  
  31.         preparedStatement.executeUpdate();  
  32.         if (preparedStatement != null) {  
  33.             preparedStatement.close();  
  34.         }  
  35.         if (connection != null) {  
  36.             connection.close();  
  37.         }  
  38.   
  39.     }  
  40.   
  41. }  


最后的用启动Flink中流计算类,用于写入数据到mysql中去:

  1. package com.wanda.flink;  
  2.   
  3. import java.util.Properties;  
  4.   
  5. import org.apache.commons.lang3.StringUtils;  
  6. import org.apache.flink.api.common.functions.FilterFunction;  
  7. import org.apache.flink.api.common.functions.MapFunction;  
  8. import org.apache.flink.api.common.restartstrategy.RestartStrategies;  
  9. import org.apache.flink.api.java.tuple.Tuple3;  
  10. import org.apache.flink.streaming.api.datastream.DataStream;  
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
  12. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;  
  13. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;  
  14. import com.wanda.common.Config;  
  15. public class KafkaToDB {  
  16.     public static void main(String[] args) throws Exception {  
  17.         Properties pro = new Properties();  
  18.         pro.put("bootstrap.servers", Config.getString("kafka.hosts"));  
  19.         pro.put("zookeeper.connect", Config.getString("kafka.zookper"));  
  20.         pro.put("group.id", Config.getString("kafka.group"));  
  21.         StreamExecutionEnvironment env = StreamExecutionEnvironment  
  22.                 .getExecutionEnvironment();  
  23.         env.getConfig().disableSysoutLogging();  //设置此可以屏蔽掉日记打印情况  
  24.         env.getConfig().setRestartStrategy(  
  25.                 RestartStrategies.fixedDelayRestart(410000));  
  26.         env.enableCheckpointing(5000);  
  27.         DataStream<String> sourceStream = env  
  28.                 .addSource(new FlinkKafkaConsumer08<String>(Config  
  29.                         .getString("kafka.topic"), new SimpleStringSchema(),  
  30.                         pro));  
  31.   
  32.         DataStream<Tuple3<Integer, String, Integer>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {   
  33.             @Override  
  34.             public boolean filter(String value) throws Exception {  
  35.                 return StringUtils.isNotBlank(value);  
  36.             }  
  37.         }).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {  
  38.             private static final long serialVersionUID = 1L;  
  39.                     @Override  
  40.                     public Tuple3<Integer, String, Integer> map(String value)  
  41.                             throws Exception {  
  42.                         String[] args = value.split(":");  
  43.                         return new Tuple3<Integer, String, Integer>(Integer  
  44.                                 .valueOf(args[0]), args[1],Integer  
  45.                                 .valueOf(args[2]));  
  46.                     }  
  47.                 });  
  48.           
  49.         sourceStreamTra.addSink(new MySQLSink());  
  50.         env.execute("data to mysql start");  
  51.     }  
  52. }  




本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
从来没有一个人能把Flink讲的这么透彻,小编的出现算是一个意外
Flink DataStream API
Flink Java Demo(Windows)
一文彻底读懂高性能消息组件Apache Pulsar
简单之美 | Kafka+Spark Streaming+Redis实时计算整合实践
自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服