首先繁难引见一下爱奇艺 OLAP 的基本状况:
存储方面,OLAP 目前允许三类存储:
① 离线 HDFS: 用于离线剖析、批处置等场景;
② 实时 Kafka: 用于实时剖析、在线处置等场景;
③近实时 Iceberg: 分钟级提前,是当天要重点引见的数据湖产品。
存储之上是查问引擎,咱们驳回 SparkSQL 做 ETL 处置,驳回 Trino 做 Ad-hoc 即席查问,ClickHouse 用于查问减速的场景。咱们经过 Pilot 提供对外的一致查问,允许各类运行场景。
上方来引见一下爱奇艺数据湖的树立背景。
1、数据湖技术减速数据流通
为什么要有数据湖?数据湖其实就是为了减速数据流通。
Pingback 是爱奇艺外部对端上埋点的习气称号,每个公司都会有相似的服务。在经典的 Lambda 架构处置方案里,Pingback 数据在投递后,有离线和实时两个通路。
离线通路写到 HDFS 里,而后由离线开发平台构建离线数仓。离线数仓的优势是老本很低,允许的容量也很大。缺陷是提前大,或许要 1 小时或许 1 天。为了处置这个时效性疑问,往往会再构建一个实时数仓。通罕用 Kafka 作为存储,用 Flink 或许 Spark 这类的流计算义务处置 Kafka 数据,构建实时数仓。
实时数仓的提前十分低,能做到秒级的提前,但缺陷是老本很高,只能放最近几个小时的数据,要基于 Kafka 做明细查问也是比拟的艰巨的。
其实很多实时剖析场景并不须要秒级的提前,分钟级的提前就足够了。譬如说广告、会员的运营场景,或许监控大盘等。数据湖产品提供了性价比很高,容量很大的分钟级提前的处置方案。
爱奇艺的数据湖选型用的是 Iceberg,Iceberg 是一种新设计的开源表格局,用于大规模数据剖析。
① Iceberg 实质上不是存储,由于它底层存储复用了 HDFS,或许对象存储。在存储之上构建了 Iceberg 表级形象,对标 Hive 的表设计。
② 它也不是查问引擎或许流计算引擎,它允许各类计算引擎,比如 Hive、 Flink、 Spark,也允许各类的 SQL 查问引擎。
为什么有了 Hive 表格局还要引入 Iceberg 表格局?
一个经典的 Hive 表或许会有天级分区、小时级分区,或许进一步的子分区。其设计**是用目录树去组织数据,能够很好地做分区级过滤。
然而它也有着以下缺陷:
① 元数据一致存在 Metastore,通常底下是 MySQL,很容易成为瓶颈。
② 由于元消息是分区级别的,没有文件级别的消息,因此当动员一个查问时,制订口头方案须要拿到分区下的文件列表。拿到文件列表实质上是对每一个分区恳求 NameNode 做 List 恳求。举个例子,一天有 200 多个分区,查 7 天的数据,分区数就会十分多,会动员 O(N) 复杂度的 NameNode 的 List 恳求调用,这个元数据的枚举环节会十分的慢。
③ 由于它的最小单位是分区级别的,最大的原子操作就是分区级别的笼罩,其余一些原子操作是不允许的。
Iceberg 新定义的表结构有元数据层和数据层。数据层就是数据文件。元数据层是它很关键的设计点,可以复用 Hive 的 MetaStore,指向最新的快照。元数据外面分多层,记载了详细的文件列表。
每次有新的 Commit,就会创立出新的快照,读恳求可以访问旧的快照,写恳求写新的。在写的环节中,新创立的数据文件读是无法见的,只要在提交后把最新的版本指过去,新写入的文件才可见。做到了读写分别。同时修正操作是原子的,能够允许细粒度的分区外部的修正。
繁难比拟一下 Hive 和 Iceberg:两者底层都驳回 HDFS 或许对象存储,都是 PB 级的便宜存储方案。区别 Hive 元消息是分区级,Iceberg 是文件级。比如 Hive 分区原本有 100 个文件,加了 5 个文件,那么 Hive 下游义务就须要从新计算 Hive 分区下的所有数据。Iceberg 能够失掉到修正的 5 个文件,可以做增量的下游计算。
时效性是 Iceberg 很清楚的优势,能够做到近实时,比如 5 分钟级,假设每分钟提交一次性则可以做到分钟级。
制订口头方案时,Iceberg 是常数级的,它只读取固定的元数据文件就能够拿到文件列表。
Iceberg 还允许文件级别的过滤,比如基于统计消息或许字典做过滤。
为了繁难用户经常使用,爱奇艺在引入数据湖,首先要做平台化树立。
这是爱奇艺数据湖全体的产品架构图:
最底下是数据源,比如前面提到的 Pingback、用户 MySQL 的 Binlog 解析、日志和监控消息,会区分进到实时、离线和 Iceberg 通道。在 Iceberg 之上,经过 RCP 平台、Babel 平台区分做流式入湖和离线入湖。经常使用 Trino 和 Spark SQL 去做查问。同时咱们开发了数据湖平台去成功元数据治理、权限治理等等。
爱奇艺经过实时计算平台,能够做到很繁难的入湖。一个 Kafka 的数据只要要三步,就可以成功性能流义务:首先性能从哪个 Kafka 开局读;而后在外面做 Transform 逻辑,比如挑选、重命名,最后定义写到哪个 Iceberg。
入湖的下一步是查问,也就是出湖。目前 Iceberg 有两类文件格局,V1 格局允许 Append Only 数据,不允许行级修正。Iceberg 颁布的最新版本 V2 格局能允许行级更新。
目前 V1 格局是经过 Trino 引擎查问,V2 格局经过 SparkSQL 查问。前端是经过 Pilot,咱们的自研 SQL 引擎做散发,能够基于文件格局智能地选用引擎,允许各类用户场景。
上方引见一些性能优化的上班。
说到数据湖,无论哪个产品都绕不开的一个疑问就是小文件疑问。Hive 可以批量,比如每小时做一次性计算,可以写出很大的文件。在 Iceberg 中,由于须要做到近实时,每分钟或许每 5 分钟写文件,文件就比拟小,肯定会有小文件疑问。咱们关键经过两个方面去处置小文件疑问:
依据表的生命周期做处置。比如一张表或许只要要保管一年,或许保管 30 天,历史的数据可以删除。
目前平台会限度用户建表必需性能生命周期,经过数据湖平台智能地成功清算逻辑。
清算用的是 Iceberg 官网提供的处置方案,Spark 的 Procedure,先是 Drop 分区,而后 Expire 历史的 Snapshot,再删除孤儿文件,最后重写元数据文件。
这套流程间接跑,有些环节是存在性能疑问的,并不能够满足清算的效率:
① 第一: Spark 的经常使用形式,每次跑义务都须要提交一个 Spark 义务,须要先放开Yarn 资源,再启动 Application,跑完这个义务后这个 Application 就监禁掉了。这里可以驳回 Spark 的常驻形式,生命周期清算 SQL 可以跑得很快, 资源是不监禁的,防止了放开和启动的耗时。
② 第二: 天级的目录删除,Iceberg 官网的成功是比拟慢的。它用的是孤儿文件删除的战略,在文件数比拟多的时刻,扫描环节比拟慢。咱们做了改良,由于明白知道整个天级目录都不须要,可以间接删除整个目录。
③ 第三: 咱们增加了回收站的机制,生命周期误删除时能有复原的手腕。
做了这些优化,线上大略几千个表,都能够按时成功生命周期的清算。比如 Venus 库原先或许有 2 亿个 iNode,清算完稳固在 0 万的数量级。
另外一个处置小文件疑问的形式就是兼并。最繁难的就是性能一个定时兼并。
人工性能定时兼并比拟大的疑问是:定时战略比拟难性能。比如,什么机遇应该做兼并,这次兼并应该要兼并什么范畴的数据,假设让业务去配这些消息,每一个 Iceberg 用户就须要十分深化地去了解小文件发生的机理才干够比拟好地控制兼并的范畴。
为了处置这个疑问,咱们参考了 Netflix 的文章,做了智能兼并,它的**理想是:
不再由用户指定兼并行为,而是统计 Iceberg 表每个分区上方的文件数,计算均方差,再联合表的权重因子,算进去哪些表兼并成果是最好的,增加到待兼并的分区列表外面。而后由兼并义务依照优先级成功兼并环节,用户无需做性能。
有了智能兼并,还要处置兼并的性能优化疑问,咱们也不时追随社区的开展。在经常使用环节中,最后 Iceberg 在文件兼并这块做得还不是很好。最早的时刻,有个疑问,Delete File 在兼并并没有被真正地删除,目前曾经修复。举个例子,假设 Delete 马上有个 Rewrite> 还有一些大表兼并义务经常失败。这里咱们可以性能 Bucket 分区,将全表兼并改为每次兼并其中一个 Bucket 分区,缩小单次兼并的数据量。
还可以运行 Binpack 兼并战略去控制兼并选用的逻辑。运行 Bucket 分区和 Binpack兼并战略, 如右上示用意表现的是文件数的变动, 可以判别这个文件数不时在增长,这个小的降低是小时级分区兼并,到肯活期间做全表兼并,它的文件数据缩小得比拟多,存在周期性的震荡。
还有一个例子,咱们发如今做兼并的时刻经常会和写入义务抵触,会报一个失误,要兼并的这个文件有一个 Position Delete 在援用,其实是一个误判,由于在社区的自动的参数外面,去判别这个>
前文引见了当小文件曾经发生的时刻如何优化,但咱们更宿愿小文件最好不要发生,在写入的时刻就把文件数控制住。咱们须要去了解 Flink 义务写入的时刻是怎样控制文件数量的。
左上角示用意中这个 Flink 义务有 100 个并行度,在自动参数 Distribution-mode = None 时每一个并行度都会往分区下写文件,就会写入 100 个文件,一分钟写 100 个文件每个数据文件都很小。
假设性能 Distribution-mode = Hash,如左下角的图中,在写入的时刻会先做 Shuffle,基于 Partition Key Shuffle 到特定的 Sink,这个 Flink 义务会把数据都集中到一个 Sink,写到一个文件,就处置了小文件疑问。
但又会引入新的疑问,数据量比拟大的时刻,单个义务写文件的效率跟不上,就会形成 Flink 义务反压。这个时刻咱们用哈希战略联合 Bucket 分区。比如,可以控制 1 个 Hour 上方 10 个 Bucket,经过两者联合起来就可以很准确地去控制 1 个分区究竟要消费多少个文件。普通倡导写入文件大略在 100 MB 左右是比拟适合的。上图的表格中列出了各个参数性能下的文件数量。
处置了小文件疑问,接上去是查问的性能疑问。在最后做 Iceberg 性能验证的时刻,咱们发现它的批量 Scan 性能是十分好的,然而点查问的性能就比拟蹩脚。
举个例子,在订单表中,用特定 ID,如订单 ID 或许用户 ID 去查问明细,简化后的SQL 就是 order_id = ‘555’。自动的状况下,Iceberg 会基于 MinMax 做过滤,但数据依照期间戳排序,MinMax 过滤其实是不失效的,比如 File A 的 MinMax 范畴蕴含 555,File N MinMax 321 到 987 也蕴含 555,其实是过滤不掉的。因此点查问理想上就是全表扫描。
针对点查问场景,BloomFilter 是十分实用的。最后社区没有这特性能,Parquet 在 1.12 的时刻允许 BloomFilter,Iceberg 的自动存储格局也是 Parquet,所以咱们思考修正 Iceberg 引入这一性能。
先引见一下 BloomFilter 的作用,在这个架构图中,比如,针对 order_id 开启了 BloomFilter,为每一个数据文件构建 BloomFilter,将 order_id 启动哈希后映射到对应 bit,假设值存在就把对应的位设为 1,假设不存在对应的位自动是 0。在 Bloom Filter 外面,假设标记位为 1,这个值不肯定存在,但假设标记位为 0,这个值肯定不存在。经过致力,咱们在 Iceberg 的内核外面增加了相应的允许。在 Spark 读取 Iceberg 和 Trino 读取的时刻也增加了相应的才干。
BloomFilter 允许 Equals 和 In 过滤。假设标记位为 0 是肯定能过滤的。不允许 not equals、not in、比拟符等过滤条件。
示用意中 order_id = 555 这个条件,哈希后另外两个文件对应的标记位值都是 0,在查问的时刻就可以很快地把其余文件过滤掉了,能够准确命中订单所在的数据文件。
经过测试,在 Spark SQL 中的订单 ID 查问,原来全表扫描须要将近 1000 秒,开启 BloomFilter 后只要要 10 秒钟。Trino 开启 BF 后,可以过滤 98.5% 的查问,CPU 消耗只要以前的 5%。
BloomFilter 会带来额外的空间开支。经过繁难的测试,大略有 3% 的额外空间损耗。即 3% 的存储代价可以带来点查问 100 倍的优化。
查问优化另外一个上班是
缓存减速
,如经常使用 Alluxio 做缓存减速。
这是爱奇艺 Trino 查数据湖的架构图。业务经过 Pilot 引擎散发到 Trino 网关,智能地选用经常使用哪个 Trino 集群口头查问。原本 Trino Worker 上方的 SSD 存储是糜费的,咱们在之上混布了 Alluxio,复用了原本闲置的 SSD 存储,简直没有什么额外机器开支。
以前去查 HDFS 或许会有性能颤抖,比如,业务有一个大的批义务,造成 HDFS 性颤抖,查问性能会降得很凶猛,Alluxio 缓存能够很好地屏蔽这一点。经过测试 Venus 日志运行 Alluxio ,P90 从 18 秒可以降低到 1 秒。
在实践的经常使用环节中发现 Trino 查问有个意想不到的疑问,元数据读取性能远比咱们构想中的要慢。比如,读取一个 5 M 的元数据居然要 3 秒钟,前面查数据或许只要要 1 秒,元数据反而更慢。
经过分焰图和阿里的 Arthas 做定位,发现 Read 的方法被调用了百万次,文件总共 5 M,读取 100 多万次是十分不正当的。进一步跟踪,定位要素是父类外面一个 Read 方法的自动成功会一一 Byte 读取,Trino 这边没有笼罩这个方法的成功,就会升级到自动方法,每次读 1 个 Byte ,所以调用次数十分多,造成很慢,优化耗时缩短到了 0.5 秒。
最起初引见业务落地的状况,在运行了上述优化后,业务能取得什么样的成果。
第一个例子是广告的流批一体场景。原来的实时链路中,实时数据经过 Kafka 写到 Kudu,离线数据同步到 Hive,经过 Impala 来一致查问,基于离线笼罩的进展将查问散发到 Kudu 和 Hive。
经常使用 Iceberg ,实时和离线数据都更新 Iceberg,不须要进展治理,间接查问 Iceberg 表即可。Iceberg 成功了两方面的一致,一是存储一致,不须要有两个类型的存储,查问不须要做拆分。二是义务开发一致为 SQL,原先离线是 HiveSQL,实时是 Spark Jar 包,一致为 SQL 开发。数据入湖后联合散布式变革,广告智能出价全链路由 35
Venus 是爱奇艺外部的日志剖析平台。之前的架构中 Kafka 数据往 ElasticSearch 外面存储,假设业务流量较大就给它一个独立集群,小流量业务则用公共集群。这个方案存在一些疑问:一是流量调度很难做,当集群流量有瓶颈时,须要把流量拆分走;二是 ES 的存储老本十分高。
存储改用 Iceberg 方案后,一切业务的流量都写到一个 Iceberg 集群,不须要拆分流量。Venus 接入层经过日志查问平台,数据存储的切换对用户是透明的。Iceberg 带来的好处包括:
① 老本清楚降低。
不须要独立的 ES 集群了,Iceberg 和 Trino 都复用现有的资源,并没有什么额外的老本。
② 稳固性大幅优化。
由于 ES 的老本太贵,没有配正本,一旦单个磁盘或节点有疑问,都会引发用户的报障。用 Iceberg ,写入带宽十分大而且稳固性很好,报障缩小了 80% 以上。
接上去是爱奇艺外部的检查场景,检查场景须要对一些历史的行记载做修正。没有 Iceberg 以前,没有很好的技术方案允许行级更新。
原来处置方案里用 MongoDB 存全量的数据,做行级的更新,而后用 ES 构建二级索引,改用 Iceberg 两个存储都一致到 Iceberg 外面。对业务带来的好处是:
① 原本的监控诉警要活期查 ES 做聚合,用 MySQL 开发报表,如今不须要了,报表间接查 Iceberg 就可以,能够允许实时告警。
②数据湖大幅提高业务的效率。原本剖析义务开发十分复杂,要从 Mongo 外面导数十分不繁难。有了数据湖可以一致为 SQL 查问。
最后是 CDC 类数据入湖,此处以订单为例。基于 MySQL 数据做大数据剖析,有两类处置方案:
第一类
是每天导出一份到 Hive,缺陷是每次导出都是全量,提前很大,只能看一天以前的数据。另外全量导的性能也很差,对 MySQL 压力也比拟大。
第二类
是实时处置方案,增质变卦写在 Kudu 外面,Kudu 是一个老本很高的处置方案。假设 Kudu 写入带宽动摇,同步义务担任人须要去做运维操作。
经常使用数据湖方案,爱奇艺实时计算平台,经过 Flink CDC 技术很繁难地可以将 MySQL 数据入湖。
数据湖方案具有如下优势,
一是近实时,数据提前在分钟级,远优于之前的离线方案;二是老本低,相比于 Kudu 无需独立节点,大幅降低机器老本;三是省运维,Iceberg 写入带严惩且稳固,大幅降低运维代价。
最后引见一下未来布局。爱奇艺未来会在流批一体外面有更多的落地,包括广告的片面推行、Pingback 在 BI 场景的落地。另外,咱们方案把数据湖落地在特色消费,可以由以前离线或许批的特色消费,变成近实时,能够允许晚到数据,允许样本的行级的修正。
在技术方面会尝试把 Iceberg 的 Puffin 统计消息用于查问减速的场景。还会对社区在做的 Branch 和 Tag 启动调研,寻觅外部的落地场景。