原标题:使用 SQL 操作 Delta Lake
Delta Lake 0.7.0 是随着 Apache Spark 3.0 版本发布之后发布的,这个版本比较重要的特性就是支持使用 SQL 来操作 Delta 表,包括 DDL 和 DML 操作。本文将详细介绍如何使用 SQL 来操作 Delta Lake 表,关于 Delta Lake 0.7.0 版本的详细 Release Note 可以参见这里。
使用 SQL 在 Hive Metastore 中创建表
Delta Lake 0.7.0 支持在 Hive Metastore 中定义 Delta 表,而且这些操作支持使用 SQL 进行,包括创建表和修改表,如下:
—Createtable inthe metastoreCREATE TABLE events (date DATE,eventId STRING,eventType STRING,data STRING)USING DELTAPARTITIONED BY (date)LOCATION ‘/delta/events’—Ifa table withthe same name already exists,the table isreplaced withthe newconfiguration,elseit iscreatedCREATE OR REPLACE TABLE events (date DATE,eventId STRING,eventType STRING,data STRING)USING DELTAPARTITIONED BY (date)
修改表:
—Altertable andschemaALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment][FIRST|AFTER colA_name],…)
当然,我们也可以使用 Scala/Java/Python APIs:
•DataFrame.saveAsTable(tableName) 和 DataFrameWriterV2 APIs (#307). •使用 DeltaTable.forName(tableName) API 来创建 io.delta.tables.DeltaTable 实例,这个实例在 Scala/Java/Python 中运行 Update/Delete/Merge 操作是非常有用的。
支持使用 SQL 进行 Insert, Delete, Update 和 Merge 操作
在 Delta Lake 0.7.0 之前,被问的最多的问题就是支不支持使用 SQL 对 Delta Lake 表进行 delete, update 和 merge 操作。下面就是这些操作的例子
—Usingappend mode,you can atomically add newdata to an existing DeltatableINSERT INTO events SELECT *FROM newEvents—Toatomically replace all of the data ina table,you can useoverwrite modeINSERT OVERWRITE events SELECT *FROM newEvents—DeleteeventsDELETE FROM events WHERE date <‘2017-01-01’—UpdateeventsUPDATE events SET eventType =‘click’WHERE eventType =‘click’—Upsertdata to a target Delta—table usingmergeMERGE INTO eventsUSING updatesON events.eventId =updates.eventIdWHEN MATCHED THEN UPDATESET events.data =updates.dataWHEN NOT MATCHED THEN INSERT(date,eventId,data)VALUES (date,eventId,data)
值得注意的是,Delta Lake 中的 MERGE 操作比标准 ANSI SQL 语法支持更高级的语法。例如,Delta Lake 的 merge 支持:
•Delete 操作 – 当数据与输入行匹配时删除当前行的数据。比如 “… WHEN MATCHED THEN DELETE …” •具有子条件的多个匹配操作 – 目标行和源行匹配时可以进行更多灵活的操作。例如:
WHEN MATCHED AND events.shouldDelete THEN DELETEWHEN MATCHED THEN UPDATE SET events.data =updates.data
星形语法 – 将数据匹配的源列值设置目标列值的简写。例如:
WHEN MATCHED THEN SET *WHEN NOT MATCHED THEN INSERT *—equivalent to updating/inserting withevent.date =updates.date,events.eventId =updates.eventId,event.data =updates.data
自动或增量的形式生成 Presto/Athena manifest 文件
Delta Lake 0.5.0 新增通过 manifest 文件的方式支持在 Presto/Athena 中查询 Delta Lake 的表,manifest 文件里面包含了生成 manifest 文件时刻的最新版本文件。为了支持在 Presto/Athena 中读取 Delta Lake 表需要做以下的操作:
生成 Delta Lake Manifest 文件;配置 Presto 或 Athena 以支持读取生成的 manifests 文件;手动再生成 Manifest 文件。现在在 Delta Lake 0.7.0 里面支持使用以下命令来自动生成 manifest 文件:
ALTER TABLE delta.`pathToDeltaTable`SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
通过表属性来对表进行配置
通过使用 ALTER TABLE SET TBLPROPERTIES 来为表设置表属性。我们可以启用、禁用或配置 Delta 的许多特性,比如自动清单生成。可以使用表属性 delta.appendOnly=true 来阻止 Delta 表中的删除和更新。
我们还可以通过以下属性轻松控制 Delta Lake 表保留的历史:
•delta.logRetentionDuration: 控制表的历史记录(比如交易记录)保留多长时间。默认情况下,会保留三十天的历史记录,但是我们可能需要根据自己的要求(例如 GDPR 历史背景)更改此值; •delta.deletedFileRetentionDuration:控制在成为VACUUM候选者之前必须删除文件的时间。默认情况下,超过7天的数据文件将被删除。在 Delta Lake 0.7.0 版本, 我们可以使用 ALTER TABLE SET TBLPROPERTIES 来配置这些属性
ALTER TABLE delta.`pathToDeltaTable`SET TBLPROPERTIES(delta.logRetentionDuration =“interval “delta.deletedFileRetentionDuration =“interval “)
支持在 Delta 表 commit 文件中添加用户定义的元数据
我们可以使用 DataFrameWriter 选项 userMetadata 或在 SparkSession 中配置 spark.databricks.delta.commitInfo.userMetadata 来支持在 Delta 表的 commit 文件中保存用户定义的字符串作为提交中的元数据。
在以下示例中,我们将根据每个用户请求从数据湖中删除一个用户(1xsdf1)。为确保我们将用户的请求与删除相关联,我们还将 DELETE 请求 ID 添加到userMetadata 中。
SET spark.databricks.delta.commitInfo.userMetadata={“GDPR”:“DELETE Request 1x891jb23”};DELETE FROM user_table WHERE user_id =‘1xsdf1’

本文参考:https://databricks.com/blog/2020/08/27/enabling-spark-sql-ddl-and-dml-in-delta-lake-on-apache-spark-3-0.html
猜你喜欢
责任编辑: