数据团队依赖许多其余“第三方”发送数据的状况并不少见,他们经常在没有启动任何沟通或让数据团队知道太晚的状况下更改数据的形式。
每当出现这种状况时,数据管道就会受到破坏,数据团队须要修双数据湖。这是一个充溢惨重担务的人工环节。在通常状况下,数据团队或者会推 ,试图证实形式曾经扭转。
但是随着开展和提高,数据团队看法到,以智能继续集成(CI)/继续交付(CD)的方式繁难地阻止形式一同更改是更理智的。
形式更改和形式验证给数据团队带来了很多痛苦,但是市场上有一些处置方案可以协助处置这个疑问——幸运的是,其中一些是开源的。
一个墨守成规的教程,引见如何经常使用开源数据版本控制工具lakeFS处置形式验证疑问。
形式验证准许用户为数据湖创立验证规定,例如准许的数据类型和值范围。它保障保留在数据湖中的数据遵照已树立的形式,该形式形容了数据的结构、格局和限度。
这是一个须要处置的疑问——假设不极速采取执行,就会在数据处置环节中看到不分歧和失误。
为什么须要处置形式验证?
破费一些期间正确地控制形式是值得的,有以下四个要素:
处置数据湖中的形式并非善报多磨
在数据仓库中,用户处置的是严厉的数据模型和严厉的形式。数据湖与之相反。大少数状况下,它们最终蕴含宽泛的数据源。
为什么这很关键?由于在数据湖中,形式的定义可以在数据源之间出现变动,并且当参与新数据时,形式或者会随着期间的推移而变动。这使得在数据湖中的一切数据上实施一致的形式成为一个渺小的应战。假设不能处置这个疑问,将不得不处置数据处置疑问。
但这还不是所有。由于构建在数据湖之上的数据管道的复杂性不时参与,不可领有一个分歧的形式。数据管道可以包括多个流程和转换,每个流程和转换都须要一个惟一的形式定义。
形式或者随着数据的处置和修正而变动,因此很难确保跨整个管道启动形式验证。
这就是版本控制系统可以派上用场的中央。
在数据湖中成功形式验证的数据版本控制
lakeFS是一个开源工具,它可以将数据湖转换为相似Git的存储库,让用户像软件工程师控制代码一样控制它。这就是数据版本控制的意义所在。
与其余源代码控制系对抗样,lakeFS有一个称为hook的个性,它是定制的脚本或程序,lakeFS平台可以运转这些脚本或程序来照应指定的事情或操作。
这些事情可以包括提交更改、兼并分支、创立新分支、参与或删除标志等等。例如,当兼并出现时,在兼并成功之前,在源分支上运转一个预兼并挂钩。
它如何运行于形式验证呢? 用户可以创立一个预 来验证Parquet文件的形式与形式能否相反。
在这个场景中,将在一个摄取分支中创立一个delta表,并将其兼并到消费中。接上去将更改表的形式,并尝试再次兼并它,模拟将数据优化到消费的环节。
首先,将设置一些全局变量并装置将在本例中经常使用的包,这些包将在Python笔记本中运转。
在设置好lakeFS凭证后,可以开局创立一些蕴含存储库和分支称号的全局变量:
Pythonrepo = "schema-validation-example-repo" mainBranch = "main" ingestionBranch = "ingestion_branch"
每个lakeFS存储库都须要有自己的存储命名空间,所以也须要创立一个:
PythonstorageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"
在本例中,经常使用AWS S3存储。为了使一切顺利启动,用户的存储须要性能为与lakeFS一同运转,lakeFS与AWS、Azure、Google Cloud或外部部署对象存储(如MinIO)一同上班。
假设在云中运转lakeFS,则可以经过复制示例存储库的存储称号空间并将字符串附加到其上,将其链接到存储。所以,假设lakeFS Cloud提供了这个
可以经过以下方式启动性能:
PythonstorageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string'
在笔记本中,将经常使用Python代码,因此也必定导入lakeFS Python客户端包:
Pythonimport lakefs_client from lakefs_client import models from lakefs_client.client import LakeFSClient import osfrom pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField
Python%xmode Minimal if not 'client' in locals(): # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = lakefsAccessKey configuration.password = lakefsSecretKey configuration.host = lakefsEndPoint client = LakeFSClient(configuration) print("Created lakeFS client.")
以下将在本例中创立delta表,因此须要包括以下包:
Pythonos.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensinotallow=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'
lakeFS地下了一个S3网关,它准许运行程序以与S3通讯的方式与lakeFS启动接口。要性能网关,并执行以下步骤:
Pythonfrom pyspark.context import SparkContext from pyspark.sql.session import SparkSession sc = SparkContext.getOrCreate() spark = SparkSession(sc) sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey) sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey) sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
如今曾经预备好在笔记本中大规模经常使用lakeFS版本控制。
以下将经常使用Python客户端创立存储库:
Pythonclient.repositories.create_repository( repository_creatinotallow=models.RepositoryCreation( name=repo, storage_namespace=storageNamespace, default_branch=mainBranch))
在这种状况下,将经常使用预兼并挂钩来确保架构没有更改。操作文件应提交到lakeFS存储库,前缀为_lakeFS_actions/。未能剖析操作文件将造成运转失败。
将提交以下钩子性能操作文件,pre-merge-schema-validation.yaml:
Python#Parquet schema Validator #Args: # - locations (list of strings): locations to look for parquet files under # - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them #Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml): name: pre merge checks on main branch on:、 pre-merge: branches: - main hooks: - id: check_schema_changes type: lua properties: script_path: scripts/parquet_schema_change.lua # location of this script in the repository args: sample: false locations: - tables/customers/
中的子文件夹LuaHooks中。必定将文件提交到文件夹_lakeFS_actions下的lakeFS存储库:
Pythonhooks_config_yaml = "pre-merge-schema-validation.yaml" hooks_prefix = "_lakefs_actions" with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f: client.objects.upload_object(repository=repo, branch=mainBranch, path=f'{hooks_prefix}/{hooks_config_yaml}', cnotallow=f )
只是设置了一个举措脚本,在兼并到main之前运转scripts/parquet_schema_che.lua。
而后将创立脚本自身(parquet_schema_che.lua)并将其上载到脚本目录中。正如人们所看到的,经常使用嵌入式LuaVM来运转钩子,而不依赖于其余组件。
此文件也位于ample-repo中的LuaHooks子文件夹中:
Python--[[Parquet schema validatorArgs: - locations (list of strings): locations to look for parquet files under - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them ]] lakefs = require("lakefs")strings = require("strings")parquet = require("encoding/parquet") regexp = require("regexp") path = require("path")visited_directories = {} for _, location in ipairs(args.locations) doafter = ""has_more = trueneed_more = trueprint("checking location: " .. location)while has_more doprint("running diff, location = " .. location .. " after = " .. after)local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)if code ~= 200 thenerror("could not diff: " .. resp.message)endfor _, result in pairs(resp.results) dop = path.parse(result.path)print("checking: '" .. result.path .. "'")if not args.sample or (p.parent and not visited_directories[p.parent]) thenif result.path_type == "object" and result.type ~= "removed" thenif strings.has_suffix(p.base_name, ".parquet") then-- check it!code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)if code ~= 200 thenerror("could not fetch>