眼看到年底了,许久也没更新博客,最近也比较清闲,顺带学习了下大数据框架Flink, 这框架跟Spark类似,上手比较容易,使用的话两三天就可以开始写代码,在有些方面比spark要强,比如说流处理,下面就用flink中的Stream从kafka中读取数据写入到mysql中,废话不多说,具体上代码吧:
首先看配置文件:
- #mysql
- mysql.driver=com.mysql.jdbc.Driver
- mysql.url=jdbc:mysql://localhost:3306/mybatis
- mysql.user=root
- #kafka
- kafka.topic=mytopic
- kafka.hosts=localhost:9092
- kafka.zookper=localhost:2181
- kafka.group=group
maven依赖情况:- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>1.1.3</version>
- </dependency>
- <!-- Use this dependency if you are using the DataSet API -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.1.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.10</artifactId>
- <version>1.1.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.10</artifactId>
- <version>1.1.3</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.nd4j/nd4j-api -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
- <version>1.1.3</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.17</version>
- </dependency>
发送数据到kafka的类:- package com.wanda.flink;
- import java.util.Properties;
- import java.util.Random;
- import org.apache.commons.lang3.RandomStringUtils;
- import com.wanda.common.Config;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class KafkaProducte {
- public static void main(String[] args) throws InterruptedException {
- String broker = Config.getString("kafka.hosts");
- System.out.println("broker:" + broker);
- String topic = Config.getString("kafka.topic");
- int count = 10;
- Random random=new Random();
- Properties props = new Properties();
- props.put("metadata.broker.list", broker);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- ProducerConfig pConfig = new ProducerConfig(props);
- Producer<String, String> producer = new Producer<String, String>(
- pConfig);
- for (int i = 0; i < count; ++i) {
- String josn=random.nextInt(10)+":"+RandomStringUtils.randomAlphabetic(3)+":"+random.nextInt(1000);
- producer.send(new KeyedMessage<String, String>(topic, josn));
- Thread.sleep(1000);
- System.out.println("第"+i+"条数据已经发送");
- }
- }
- }
重写RichSinkFunction,用于写入到mysql中:
- package com.wanda.flink;
-
- import java.sql.DriverManager;
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
- import com.wanda.common.Config;
-
- public class MySQLSink extends
- RichSinkFunction<Tuple3<Integer, String, Integer>> {
-
- private static final long serialVersionUID = 1L;
- private Connection connection;
- private PreparedStatement preparedStatement;
- String username = Config.getString("mysql.user");
- String password = "";
- String drivername = Config.getString("mysql.driver");
- String dburl = Config.getString("mysql.url");
-
- @Override
- public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
- Class.forName(drivername);
- connection = DriverManager.getConnection(dburl, username, password);
- String sql = "replace into orders(order_id,order_no,order_price) values(?,?,?)";
- preparedStatement = connection.prepareStatement(sql);
- preparedStatement.setInt(1, value.f0);
- preparedStatement.setString(2, value.f1);
- preparedStatement.setInt(3, value.f2);
- preparedStatement.executeUpdate();
- if (preparedStatement != null) {
- preparedStatement.close();
- }
- if (connection != null) {
- connection.close();
- }
-
- }
-
- }
最后的用启动Flink中流计算类,用于写入数据到mysql中去:
- package com.wanda.flink;
-
- import java.util.Properties;
-
- import org.apache.commons.lang3.StringUtils;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
- import com.wanda.common.Config;
- public class KafkaToDB {
- public static void main(String[] args) throws Exception {
- Properties pro = new Properties();
- pro.put("bootstrap.servers", Config.getString("kafka.hosts"));
- pro.put("zookeeper.connect", Config.getString("kafka.zookper"));
- pro.put("group.id", Config.getString("kafka.group"));
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
- env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
- env.getConfig().setRestartStrategy(
- RestartStrategies.fixedDelayRestart(4, 10000));
- env.enableCheckpointing(5000);
- DataStream<String> sourceStream = env
- .addSource(new FlinkKafkaConsumer08<String>(Config
- .getString("kafka.topic"), new SimpleStringSchema(),
- pro));
-
- DataStream<Tuple3<Integer, String, Integer>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String value) throws Exception {
- return StringUtils.isNotBlank(value);
- }
- }).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple3<Integer, String, Integer> map(String value)
- throws Exception {
- String[] args = value.split(":");
- return new Tuple3<Integer, String, Integer>(Integer
- .valueOf(args[0]), args[1],Integer
- .valueOf(args[2]));
- }
- });
-
- sourceStreamTra.addSink(new MySQLSink());
- env.execute("data to mysql start");
- }
- }
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。