成都网站建设设计

将想法与焦点和您一起共享

FlinkCDC怎么监听MySQL表

这篇文章主要讲解了“Flink CDC怎么监听MySQL表”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink CDC怎么监听MySQL表”吧!

创新互联是一家专注于成都网站设计、网站制作与策划设计,温江网站建设哪家好?创新互联做网站,专注于网站建设十年,网设计领域的专业建站公司;建站业务涵盖:温江等地区。温江做网站价格咨询:18982081108

// 前景提要:开启mysql binlog监控。(目录:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 为隐藏目录。注意:binlog_format=ROW
// 创建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 创建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制台bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 将CDC数据源和下游数据表对接起来bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 将CDC数据与 kafka表对接起来bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 将CDC数据存储到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");

-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp`  (
  `id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `temp` decimal(10, 2) NOT NULL,
  `times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
 

注意:此处 表中 temp 字段为 decimal 类型,在SQL中使用  DECIMAL 、DOUBLE 类型 存储到hbase中都会出现乱码问题,遂 都换成 STRING

感谢各位的阅读,以上就是“Flink CDC怎么监听MySQL表”的内容了,经过本文的学习后,相信大家对Flink CDC怎么监听MySQL表这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


文章名称:FlinkCDC怎么监听MySQL表
标题路径:http://chengdu.cdxwcx.cn/article/pscpie.html