成都网站建设设计

将想法与焦点和您一起共享

使用flink将mysql数据入湖delta-创新互联

使用flink将mysql数据入湖delta 1.简介

Delta数据湖原来是强绑定于Spark引擎,而近期社区实现了使用Flink引擎将数据入湖,简单写个demo使用以下。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名雅安服务器托管、营销软件、网站建设、连云网站维护、网站推广。
  • Flink 1.13.0
  • delta 1.0.0
  • flink-mysql-cdc 2.1.0
2.Mysql入湖代码 2.1 Flink运行环境

设置下checkpoint的时间大小

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
2.2 构建MysqlSouce

使用flink-cdc-mysql依赖中的方法,输入ip,表名等直接构建

MySqlSourcesource = MySqlSource
    .builder()
    .hostname("ip")
    .port(3306)
    .databaseList("database")
    .tableList("database.table")
    .username("username")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();
2.3 Mysql表的Schema转变成Flink-RowType

使用flink将数据入湖时,需要将source的Schema转换成Flink的RowType

通过RowType.RowField实现,这里定义三个字段的RowType

public static RowType getMysqlRowType(){return new RowType(Arrays.asList(
        new RowType.RowField("id", new BigIntType()),
        new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("dept_id",new IntType())
    ));
}
2.4 构建Sink

使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink

public static org.apache.hadoop.conf.Configuration getHadoopConf() {org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    conf.set("parquet.compression", "SNAPPY");
    return conf;
}

public static DeltaSinkcreateDeltaSink(String deltaTablePath, RowType rowType) {return DeltaSink
        .forRowData(
        new Path(deltaTablePath),
        getHadoopConf(),
        rowType).build();
}
2.5 String转为RowData

Source端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。

使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData

//存在于flink-table-runtime-blink_2.12依赖中 
public static final DataFormatConverters.DataFormatConverterMYSQL_CONVERTER =
            DataFormatConverters.getConverterForDataType(
                    TypeConversions.fromLogicalToDataType(getMysqlRowType())
            );

public static RowData mysqlJsonToRowData(String line){String body = JSON.parseObject(line).getString("after");
    Long id = JSON.parseObject(body).getLong("id");
    String name = JSON.parseObject(body).getString("name");
    Integer deptId = JSON.parseObject(body).getInteger("dept_id");
    Row row = Row.of(id,name,deptId);
    return MYSQL_CONVERTER.toInternal(row);
}
2.6 执行

依次将source,sink放入env中执行即可

env.fromSource(source, WatermarkStrategy.noWatermarks(),"demo-mysql-cdc")
    .setParallelism(2)
    //将json数据转为FlinkRowData
    .map(FlinkDeltaUtil::mysqlJsonToRowData)
    .sinkTo(deltaSink)
    .setParallelism(1);

env.execute("flink-cdc-to-delta");
3. 源码

仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


分享题目:使用flink将mysql数据入湖delta-创新互联
URL网址:http://chengdu.cdxwcx.cn/article/ddieeg.html