企业宣传,产品推广,广告招商,广告投放联系seowdb

在小米的运行 数据湖 Iceberg

Iceberg 是具备 SQL 行为的表的放开式规范,此定义由 Ryan Blue 提出。这个定义中包括了两点:

第一点,Iceberg 有 SQL 行为,象征着 Iceberg 是针关于结构化数据的,具备结构化数据的特性,如 Schema 等。

第二点,Iceberg 是一个放开性的规范,放开性规范体如今两方面。第一方面体如今设计上,Iceberg 允许多种文件格局,在存储介质上可以选用各种散布式存储或许云存储(如私有云),在下层运行上允许了 Flink、Spark、Hive 和 Trino 等多种查问引擎。第二方面则体如今社区上,目前曾经有多家公司介入设计和树立。

接上去引见 Iceberg 的几个特点。

1、Iceberg 可以防止异常出现

Iceberg 表可以安心经常使用,无需思考太多不欢快的事件出现。

对表的任何操作都是原子性操作,同时经常使用多快照提供了读写分别的特性。

可以对 Iceberg 表启动 Schema 修正,比如字段类型优化、加出列、删除列、重命名列、调整列顺序等。这里须要说明的是,字段类型不是可以轻易更改的,Iceberg 只允许字段类型优化。例如,int 改成 long,float 改成 double,或许精度增大等。

2、Iceberg 允许隐式分区

Iceberg 有多种分区函数供选用,如下图所示。当咱们须要依据某个 timestamp 类型的字段提取出的年、月、日或许小时启动分区时,可以间接经常使用 Iceberg 提供的分区函数。Iceberg 还允许多级分区,在分区选用上具备更高的灵敏性。

与 Hive 启动对比,隐式分区体如今:

(1)Iceberg 写入时,不须要像 Hive 一样指定分区,写入哪个分区是由 Iceberg 智能治理的。这样的好处在于,可以保障数据分区是正确的,防止用户失误造成数据分区失误。

(2)用户查问时,不须要思考分区的物理结构。假设一张表经常使用 date 字段做了分区,用户查问时不须要思考这个字段是启动了月的分区,还是天的分区,只要要依照这个字段启动查问即可,Iceberg 会智能生成查问方案,如下图所示。

(3)在目录结构上,Iceberg 具备元数据层,经过记载分区和文件地址之间的相关,成功了物理结构和逻辑结构的分别。这样,可以十分繁难地启动 partition evolution 操作。

3、Iceberg 的行级更新的才干

Fomat version 2 中提供了行级更新的才干,在 Iceberg 中经常使用了两类文件启动标志删除。第一类是 position delete file,这类文件可以指定文件和行号启动删除。第二类是 equality delete file,这类文件记载了被删除记载的惟一键启动删除。Iceberg 只是规则了可以经常使用这两类文件启动删除,但详细由哪一类文件或两类文件独特经常使用以到达删除目的,是由引擎层来选择的。下图中是 Flink 引擎成功行级删除的形式,对事务写入的文件经常使用 position delete file,而关于之前的事务写入的文件会经常使用 equality delete file 启动删除。查问时,经常使用 Merge On Read 形式,可以失掉曾经删除成功的结果。

本节引见 Iceberg 数据湖在小米的几个运行。

1、日志入湖场景

小米原有的日志入湖的数据链路如下图所示,用户会在 Client 端经常使用 MQ 的 SDK,将数据发送到 MQ 中。小米经常使用 Talos 作为 MQ,对标于业界的 Kafka,MQ 中没有 Schema。之后经常使用 Spark streaming 将文件间接 flush 到 HDFS 上,而后经常使用 add partition 挂载到 Hive 上。

(1)经常使用了旧版本的 Spark streaming,成功的是 at least once 语义,数据或许会出现重复。

(2)由于 MQ 当中没有 Schema,只能经常使用上报的期间启动分区。这样,会在清晨的时刻出现分区漂移的疑问。

(3)间接 flush 文件到 Hive 上时,Hive 的 schema 与文件 schema 或许不婚配,造成历史数据读取时或许会出现疑问。

针对以上疑问,咱们经常使用 Iceberg 对日志入湖的流程从新启动了设计,修正后的数据链路如下图所示。在 MQ 上性能 Schema,经常使用 Flink SQL 启动解析,而后写入到 Iceberg 中。

这个数据链路有以下几个特性:

(1)经常使用 Flink SQL 的 exactly once,保障数据的不丢不重。

(2)经常使用了 Iceberg 的隐式分区特性,保障数据分区的正确性,防止了分区漂移疑问。

(3)Schema On Write 以及 schema evolution 特性,保障数据在 schema 演化环节中也永远是正确的。

链路在实践落地中,或许会出现数据失落的疑问。数据失落的基本要素是链路上的数据不规范。Talos 经常使用的 Schema On Read 形式,用户将 Schema 附加到 MQ 上,在 MQ 到 Iceberg 的环节中,有一个 Schema 同步的环节。但由于 MQ 中的 Schema 人为性能或许提前,会造成 MQ 的 SDK 发送的数据与 MQ 中 Schema 不分歧,使得 FlinkSQL 解析的时刻或许会丢掉一些列。最终用户角度看到的就是数据失落。想要处置这个疑问,要在流程中启动规范,首先定义 Schema,而后发送数据。

2、Flink+ Iceberg 构建的近实时数仓

小米有很多的 IOT 设备,在这些设备上打点有两个痛点疑问:

(1)设备打点数据提前上报疑问十分重大。 假定一台设备的一批数据没有上报,而后关机,过了一个月数据才上报,那么数据开发工程师须要将过去一个月的数据启动从新计算和存储。由于 Hive 不允许事务性,那么在启动从新计算而后笼罩过去一个月的数据的环节中,或许会造成下游读取的异常。

(2)由于 Spark 离线义务通常都是 T+1 的,所以清晨时会启动很多的 Spark 作业做目的拆分,将 ODS 的数据拆分到 DWD 层,这会造成集群的资源缓和,数据产出的提前危险十分大。

针对这些疑问,咱们经常使用了 Flink+Iceberg 对链路启动重构,重构后的数据链路如下图。

这个链路具备以下特点:

(1)首先在入湖侧,Iceberg 的隐式分区可以保障打点提前的数据能够正确分区,以刚才的例子,一个月之前的数据不须要笼罩写入,只要要将下游的数据启动回溯即可。

(2)联合 Iceberg 的灵敏分区,经常使用 date+event_name 启动了二级分区。这样,下游启动目的拆分时,只要要指定二级分区就可以启动消费,这样可以大大缩小数据的扫描量,进而节俭计算资源。

(3)整个链路中经常使用 Flink 来交流 Spark,这对用户来说十分关键,由于它象征着清晨的计算量可以平摊到全天,这样产出提前的危险可以大大降落。摊派到全天并不象征着危险变高了,相反,Flink 的 checkpoint 只要十几分钟到半个小时。这样,即使作业失败,复原的代价也会比拟小。

3、离线场景下遇到的一些疑问

Iceberg 的离线场景是比拟完善的。但是,若须要数据链路稳固,依然须要一些致力。

分区完备性校验,即如何感知到抢先的 T-1 数据曾经写入成功,从而开启下游作业。这里分红两个场景。

之前 Hive 表的校验逻辑是校验 success 文件。但是 Iceberg 写入并没有 success 文件。同时 Iceberg 表的分区散落在各元数据文件当中,而 list partition 操作十分耗时。针对这一疑问,咱们经常使用了义务依赖,不是经常使用数据依赖来依赖分区的检测,而是依赖于抢先的义务。当抢先义务写入成功之后,下游义务就可以启动调度。

Iceberg 表分区在写入第一条数据时就曾经生成,这样也不可校验分区。并且,在实时场景下,经常会有数据延早退达的疑问。针对这个疑问,咱们参考了 Flink 的 watermark 机制,经常使用了 Iceberg 的 watermark,依据用户提供的期间列来生成一个期间戳,如下图所示,咱们会在快照里参与一个期间戳,有一个独自的审核作业来对比分区和 watermark,当 watermark 超越火区时,即象征着分区写入成功,业界也称这种方式为流转批。

① 试图将 z-order 运行于 ETL,在通常中,z-order 在整个分区中口头的代价很高。而且,关于 ETL 底层的一些表(如 ODS,DWD),查问的次数比拟少,z-order 带来的收益不大。因此,倡导用户经常使用 local sort 启动排序写入的方式。

② 咱们在外部成功了 parquet 的 page column index,相比 parquet 之前的谓词下推的方式时 row group 级别的,一个 row group 是 128M 或 256M,而 parquet 最小的可读单位其实是一个 page,大略是 2MB 左右,page column index 会对 page 树立一个 min-max 索引,查问时可以应用查问谓词和 page 的 min-max 索引来对数据启动有效过滤,最终读入更大批的 page 启动计算,如下图所示。

在小米外部 benchmark 场景中,成果还是不错的。最好的状况下,可以过滤 80% 的数据。但若查问的是非排序列,比如下图的 Q7 到 Q9,基本上没有什么改善成果。

(3)隐式分区在离线场景的疑问

当咱们将 Iceberg 引入到离线场景之后,由 Iceberg 自带的隐式分区和 dynamic overwrite 带来的结果与用户希冀有所不同。例如,假定表结构中含有四个字段(如下图所示),咱们经常使用 date 按蠢才区之后再经常使用 hour 按小时分区。

当咱们经常使用语句 insert overwrite catalog.db.table_test values(1,‘a’,20230101,1),(2,‘b’,20230101,2) 启动笼罩写入后,会发现查问结果只笼罩了date=20230101/hour=1和date=20230101/hour=2分区,没有笼罩date=20230101/hour=3 的分区。这象征着 dynamic overwrite 对隐式分区操作时,不会笼罩一切的二级分区。此时,用户宿愿回归到 Hive 的经常使用方式,处置方法是经常使用 static overwrite 来指定分区启动笼罩。将笼罩语句修正为:

Iceberg 类型和多引擎类型的对齐上存在一些疑问。如 Iceberg 当中的 timestamp 类型有两类,第一种是带有时区的 timestamptz,第二种是无时区的 timestamp。

而 Spark 的 timestamp 类型只要一类,即有时区的 timestamp 类型。这样就带来一个疑问,如何经常使用 Spark DDL 来创立出 Iceberg 的无时区的 timestamp 类型呢?这时须要性能一个参数:

当经常使用Spark来读取Iceberg timestamp类型时,则须要性能另一个参数:

这时 Spark 会把无时区的当成有时区的启动处置,也就是说当期间戳是 UTC 的 0 点,那么 Spark 读进去的就曾经加了 8 个小时了(这里假定系统时区为 UTC+8)。这样用起来仿佛也没什么疑问,但是与 Trino 比拟起来就有疑问了。当咱们在平台上同时提供了 Spark 和 Trino 两种 adhoc 的查问方式,会发现结果是不同的。这个疑问在 Spark 3.4 之后应该会有所改善,由于设计中会引入一个新的无时区的期间戳类型。

4、实时集成入湖

咱们将 MySQL、TiDB、Oracle 等相关型数据库的 binlog 日志采集到 MQ 当中,再经常使用 Flink 写入到 Iceberg 的 format v2 上,如下图所示。

这种数据链路的特点包括:

(1)整个链路借助于 Flink 的 Exactly Once 和 Iceberg 的事务性,可以抵达一个端到端的 exactly once 的语义。

(2)Iceberg 对实时允许可以到达分钟级别。

(3)Iceberg 自身的 merge on read 设计,须要后盾定时口头 compaction 义务。Iceberg 的 compaction 是一种插件式设计,到目前还未实如今 Flink 当中。目前,当须要经常使用 Flink 启动相似于 HBase 的限流或写停等操作时,尚需自己开发。假设 Compaction 义务异常中断,写链路是感知不到的。会形成写入时没有疑问,但是查问时速度很慢的现象。

此外,咱们在 v2 中发现更多 Iceberg 存在的疑问:

Iceberg 自身并没有明白说明在表中可以性能一个主键,而是将这个权益交给引擎层去处置。这张表能否可以保障惟一主键,齐全取决于引擎及经常使用方式。即使经常使用了允许申明主键的引擎,也很难保障申明的主键的惟一性。除非自动开启 Upsert 方式,但这种方式代价比拟高。

Iceberg 的文件组织成功方式的 Upsert 的代价比拟高。由于 Iceberg 在设计时,宿愿数据尽或许入湖且没有索引,所以不会去校验这条数据能否曾经存在了。Upsert 的成功方式为 delete+insert 方式,即写入两条记载,一条删除一条新增。当数据量比拟大时,会造成 equality delete file 文件过多。处置方法有两种,一是参与 compaction 频次,二是经过 bloom filter 来过滤掉一些无用的 delete。

实时写入时,compaction 和写入会出现并发抵触,这往往是由于 compaction 环节中,有一条 position delete 数据写入了。这种方式下,Flink 是比拟友好的,由于 position delete 只会指向一个新增的文件,不会对历史的文件启动援用。因此在校验时,可以对 position delete file 在快照中打标志,从而疏忽由 position delete 带来的抵触进而造成 compaction 失败疑问。

Iceberg 与 Hudi 或 Paimon 不同,没有专门的 changelog 供 Flink 间接消费。咱们须要从文件组织中将 changelog 自行解析进去,这样的解析代价很高,并且或许出现由于 Upsert 操作而带来的 changelog 不准确。小米外部成功了单事务中解析出删除的数据和拔出的数据,而后以顺序的方式提供应下游消费。但是若单个快照中,先删后写的操作过多时,会造成下游动摇。Changelog 不准确(尤其在非主键聚合的场景下),是经过性能 changelog CDC 去重来处置的,依赖于 Flink 外部的state 撤回的机制来处置,性能语句为:set table.exec.source.cdc-event-duplicate=true。

5、列级数据加密

Iceberg 由于元数据层的设计,可以在 Iceberg 表上成功数据加密。列级数据加密关键是应用了 parquet 1.12.2 高版本的加密才干。之前,小米外部的数据加密是依赖于隐衷集群,独自的 IDC 机房的隔离会形成运维老本高,以及数据孤岛的疑问。因此咱们参照社区在 Iceberg 上成功了一个数据加密,这个方案称为单层数据加密。

与间接数据加密方式不同,间接数据加密的每条数据的写入都会调用一次性 KeyCenter 启动加密,而后写入。单层数据加密会在 Iceberg 表中保留加密之后的一个密钥,当写入程序写入时,会调用一次性 KeyCenter,对加密的密钥启动一次性解密以失掉明文密钥 DEK,而后对数据启动加密写入。读取环节与写入环节相似,读取时会对 Iceberg 元数据中保留的加密密钥启动解密,进而 对数据启动解密处置。这里会触及两个密钥,一个是 Iceberg 表自身保留的 DEK,另一个是对这个 DEK 加密的 KeyCenter 中的密钥。单层包裹的加密方案的优势是:

(1)parquet 列级数据加密,不须要对一切的列启动加密,用户可以选用须要加密的列。

(2)对 KeyCenter 压力较小,写入和读取时只要要对 KeyCenter 访问有限次数。

这个方案在小米外部成功的是简化版本,咱们会对一个 Iceberg 表保养一个 DEK 密钥。而社区的方案中,密钥粒度比拟细,可以是分区粒度的密钥,也可以是文件级别的密钥。

6、Hive 更新 Iceberg 的调研

可以经常使用社区提供的 migrate 原地更新的方案启动更新。社区提供了 Spark 的 procedure 语法,经常使用 CALL migrate 语法可以间接将 Hive 表更新为 Iceberg 表。上方的例子中,将 Spark_catalog.db.sample 表更新成了 Iceberg 表,同时将新增属性 foo 为 bar。

但这种方式在实践落地中存在一些疑问:

① Iceberg 允许的文件只要 parquet/orc/avro 这三种格局,不允许 text、sequenceFile 等文件格局。造成一些 Hive 表不可允许更新为 Iceberg 表。

② 表下游消费离线作业的 Spark 必定是 2.4 以上的版本。而小米外部存在一些低版本的 HiveSQL 和低版本的 Spark 作业,因此这局部表是不可经常使用这个方案启动更新的。

出于缩小下游作业的改变的目的,咱们宿愿能够复用 Hive 的 location。写入的时刻写入到 Iceberg 表,让 Iceberg 表和 Hive 表的存储地址相反。这样咱们只要要更新抢先作业,下游表在 catalog 层依然存在,这样下游作业不须要改变,如下图所示。

这个想法是比拟好的,但是成功环节有些取巧,由于 Iceberg 是多快照的,因此一个分区下,或许会有多个正本,而 Hive 是经过 list 目录来读取数据的。这样,Hive 在读取时,或许会读取到反双数据。若想要让 Hive 读取单快照,那只能及时清算 Iceberg 快照和残留文件。但是这样又使得 Iceberg 失去了事务性,而且受限于 Hive 下游消费作业,Iceberg 的一些特性(如 schema evolution)也都遭到了限度。若是 Hive 的 parquet 版本和 Iceberg 的 parquet 版本不分歧,那么改变会十分大。最终这个方案被丢弃。

这是业界经常使用最多的方案,这个方案的思绪是:创立一张相反的 Iceberg 表,将 Hive 的历史数据回溯到 Iceberg 当中,而后更新抢先作业,随后测实验证和更新 Hive 的一切下游作业,让其消费 Iceberg。

为什么这个方案比拟费事,但是用户情愿迁徙呢?关键有两个要素:

① 咱们在 Iceberg 上经常使用了 ZSTD 的紧缩算法,得益于 ZSTD 更高的紧缩率,使得存储老本可以降落 30%。

② 在回溯历史数据的时刻,咱们对大字符串启动了排序,这样可以提高数据的相似度,进一步优化紧缩率。对一张表来说,存量数据在存储中占有更大的比例。若是能够对历史数据的存储空间缩小 30%,用户还是可以接受变革的。

7、Iceberg 在小米的运行现状

目前有 1 万 4 千多张表,日新增曾经超越了 Hive,总的数据量曾经到达 30PB。

首先,咱们将跟进物化视图的性能。在 OLAP 场景且没有谓词下推的状况下,咱们希冀经过估量算的方式来提高 Iceberg 的查问才干。

其次,咱们将跟进 Iceberg 在 Spark3.3 上的 changelog view。这特性能使得 Spark 可以失掉到 Iceberg 的 changelog,咱们宿愿在离线场景下也可以启动增量读取和更新。

最后,小米会在海外集群上探求数据上云。小米外部都是 EBS 挂载,EBS 自身比拟贵,而 HDFS 自身有 3 个正本,相比间接经常使用私有云老本较高。

Q1:为什么要 Spark streaming 切换为 Flink SQL,关键出于什么思考?

A1:关键是外部架构思考。第一是,Spark Streaming 的 2.3 版本的 At least once 语义会造成数据重复。第二是,引入 Flink 之后,开局踊跃向 Flink 方向聚拢,不再去保养 Spark streaming 的方向,在交流为 Flink SQL 之后,对整个数据链路启动了迭代。

Q2:watermark 是 Iceberg 曾经存在的,还是业务自己加的?

A2:这个须要业务自己性能经常使用什么字段来作为 watermark 的生成字段,须要用户自己性能。而后 Flink 在写入时,会在快照中生成 watermark。

Q3:小米在强实时场景中用到了 Hudi 吗?

A3:没有,小米在强实时场景走的 MQ 那套数据链路。

Q4:选型上为什么是 Iceberg 而不是 Hudi?

A4:最后为使得 kappa 架构和 lambda 架构失掉一致而调研了数据湖的组件,选用 Iceberg 的关键要素是 Iceberg 的放开性和多引擎允许。2021 年 4 月份,Iceberg 最先允许了 Flink。而过后,Hudi 和 Spark 还未解耦。咱们出于经常使用 Flink 的角度而选用了 Iceberg。通常中,Iceberg 在实时数据的处置中,尤其在 CDC 处置方面,或许没有 Hudi 那么易用。咱们也对 Iceberg 启动了二次开发,才把数据链路运转得稳固一些。

Q5:历史的离线作业仓库,数仓作业为 Hive 作业,假设切换到实时链路 Iceberg,如何做到无感知切换?比如说,SparkSQL 语法与 FlinkSQL 语法不同,以及 UDF 成功不同。

A5:目前没有方法做到无感知切换,SparkSQL 和 FlinkSQL 语义上就不大一样。若是切换到 Flink batch 还有或许,但若是想要离线切到实时,基本上要把整个逻辑的成功一遍。

Q6:目前实时数仓当中,append 形式和 Upsert 形式的数据延时可以做到几分钟?尽或许防止数据延早退达。

A6:这两种形式,目前最低都是 1 分钟。咱们解放了用户性能的 checkpoint 时长,最低不能低于 1 分钟。

Q7:如何经常使用 local sort 启动多列查问?

A7:这个可以写入时在算法上经常使用 z-order 排序交流自动的排序算法来成功。

Q8:切换 Iceberg 带来的切换老本是怎么的,业务需求能否很剧烈?

A8:Iceberg 带来的事务性、隐式分区、多引擎允许的特性可以实际处置用户的疑问。即使切换环节中有很大的老本,当数据湖方案确实可以处置用户的痛点时,用户也会想用这个新架构去交流。

© 版权声明
评论 抢沙发
加载中~
每日一言
不怕万人阻挡,只怕自己投降
Not afraid of people blocking, I'm afraid their surrender