数据湖—Delta Lake -之基础操作
1.数据湖的简单介绍:
1.1 官网
https://delta.io/
看一张官网的图
1.2 特点:
1.不限格式,来之不拒,均可流入
2.集中存储、到处可访问。
3.高性能分析能力 -- 借助于Spark、MR、SparkSQL等高性能分析计算引擎,可以对海量的数据进行分析。
4.原始数据存储
5.数据湖是一个存储企业的各种各样原始数据的大型仓库,其中的数据可供存取、处理、分析及传输。
1.3 数据湖,数据仓库, 数据集市 的对比
比较 | 数据仓库 | 数据集市 | 数据湖 |
---|---|---|---|
应用范围 | 全公司 | 部门或者小组 | 全公司 |
数据类型 | 结构化数据处理 | 结构化数据处理 | 任意格式数据处理 |
存储规模 | 大量 | 中等规模(小型数仓) | 海量 |
数据应用 | 维度建模,指标分析 | 小范围的数据分析 | 海量任意格式分析,不限应用类型 |
新应用开发周期 | 长 | 长 | 短 |
1.3 写时的模式
数据在写入之前,就需要定义好数据的schema,数据按照schema的定义写入
1.4 读时模式
数据在写入的时候,不需要定义Schema,在需要使用的时候在使用Schema定义它写时模式和读时模式是两种截然不同的数据处理方法。
数据湖就是一种读时模式思想的具体体现
1.相比较写时模式而言,读时模式因为是数据在使用到的时候再定义模型结构(Schema),因此能够提高数据模型定义的灵活性,可以满足不同上层业务的高效率分析需求。
2.因为,对于写时模式而言,如果想要事后更改Schema是有很高的成本的。
3.而读时模式可以在用的时候再定义Schema就很灵活了,同一套数据可以用不同的Schema来定义,来获取不同的效。
1.5 特点:
1.轻松的收集数据(读时模式):数据湖与数据仓库的一大区别就是,Schema On Read,即在使用数据时才需要Schema信息;而数据仓库是Schema On Write,即在存储数据时就需要设计好Schema。这样,由于对数据写入没有限制,数据湖可以更容易的收集数据。
2.不需要关心数据结构:存储数据无限制,任意格式数据均可存储,只要你能分析就能存。
3.全部数据都是共享的(集中存储),多个业务单元或者研究人员可以使用全部的数据,以前由于一些数据分布于不同的系统上,聚合汇总数据是很麻烦的。
4.从数据中发掘更多价值(分析能力):数据仓库和数据市场由于只使用数据中的部分属性,所以只能回答一些事先定义好的问题;而数据湖存储所有最原始、最细节的数据,所以可以回答更多的问题。并且数据湖允许组织中的各种角色通过自助分析工具(MR、Spark、SparkSQL等),对数据进行分析,以及利用AI、机器学习的技术,从数据中发掘更多的价值。
5.具有更好的扩展性和敏捷性:数据湖可以利用分布式文件系统来存储数据,因此具有很高的扩展能力。开源技术的使用还降低了存储成本。数据湖的结构没那么严格,因此天生具有更高的灵活性,从而提高了敏捷性。
1.6 数据湖的要求
1.安全:数据集中存储,就对数据安全有了更高的要求,对权限的管控要求更加严格。
2.可拓展的:随着业务扩张、数据增多,要求数据湖体系可以随需求扩展其能力。
3.可靠的:作为一个集中存储的数据中心,可靠性也很重要,三天两头坏掉那是不可以的。
4.吞吐量:数据湖作为海量数据的存储,对数据的吞吐量要求就必须很高。
5.原有格式存储:数据湖我们定义为 所有数据的原始数据集中存储库,那么存储进入数据湖的数据就是未经修饰的、原始的数据
6.支持多种数据源的输入:不限制数据类型,任意数据可以写入
7.多分析框架的支持:因为数据格式各种各样,并不全是结构化数据,所以,要求支持多种分析框架对数据湖中的数据进行提取、分析。包括但不限于:批处理的、实时的、流的、机器学习的、图形计算的等等。
1.7数据湖的原则
1. 分离数据 和 业务
2.存储和计算的分离(可选,比较适用云平台)
3.Lambda架构 VS Kappa架构 VS IOTA架构 -
4.管理服务的重要性和选择合适的工具
4.1安全 (Kerberos)
4.2权限(Ranger)
2.Data Lake 的基本操作
2.1 Data Lake 的特点
1. ACID 事务控制 :Delta Lake将ACID事务带入您的数据湖。它提供了可序列化性,最强的隔离级别。
2. 可伸缩的元数据处理: Delta Lake可以轻松处理具有数十亿个分区和文件的PB级表
4. 数据版本控制 : Delta Lake提供了数据快照,使开发人员可以访问和还原到较早版本的数据以进行审核,回滚或重现实验。
5. 开放的数据格式 :Delta Lake中的所有数据均以Apache Parquet格式存储,从而使Delta Lake能够利用Parquet固有的高效压缩和编码方案。
6. 统一的批处理和流处理的source 和 sink : Delta Lake中的表既是批处理表,又是流计算的source 和 sink。
7. Schema执行: Delta Lake提供了指定和执行模式的功能。这有助于确保数据类型正确并且存在必需的列,从而防止不良数据导致数据损坏.
8. Schema演化: 大数据在不断变化。 Delta Lake使您可以更改可自动应用的表模式,而无需繁琐的DDL
9. 审核历史记录 :Delta Lake事务日志记录有关数据所做的每项更改的详细信息,从而提供对更改的完整审核跟踪
10. 更新和删除 : Delta Lake支持Scala / Java API进行合并,更新和删除数据集。
10.100%和 Apache Spark 的API兼容 : 和spark 完全兼容。
2.2 Data lake
的操作: Spark Scala Shell -- 要求只是使用的Spark
版本:>=2.4.2
bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
操作如图:
[root@master01 spark-2.4.7-bin-hadoop2.7]# bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/module/spark-2.4.7-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc;1.0
confs: [default]
found io.delta#delta-core_2.11;0.5.0 in central
found org.antlr#antlr4;4.7 in central
found org.antlr#antlr4-runtime;4.7 in central
found org.antlr#antlr-runtime;3.5.2 in central
found org.antlr#ST4;4.0.8 in central
found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
found org.glassfish#javax.json;1.0.4 in central
found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 376ms :: artifacts dl 6ms
:: modules in use:
com.ibm.icu#icu4j;58.2 from central in [default]
io.delta#delta-core_2.11;0.5.0 from central in [default]
org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
org.antlr#ST4;4.0.8 from central in [default]
org.antlr#antlr-runtime;3.5.2 from central in [default]
org.antlr#antlr4;4.7 from central in [default]
org.antlr#antlr4-runtime;4.7 from central in [default]
org.glassfish#javax.json;1.0.4 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 8 | 0 | 0 | 0 || 8 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc
confs: [default]
0 artifacts copied, 8 already retrieved (0kB/9ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/09 23:27:26 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://master01.pxx.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1623252446880).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.7
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val data = spark.range(0, 5)
2.2 官网命令:
bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
其实可以自行bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0
2.3 按照官网命令走
1.创建表, 并且读取表
scala> val data = spark.range(0, 5)
data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data.write.format("delta").save("/tmp/delta-table02")
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
| 2|
| 0|
| 4|
| 3|
| 1|
+---+
scala>
- 更新操作
scala> val data01 = spark.range(5,10)
data01: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data01.write.format("delta").mode("overwrite").save("/tmp/delta-table02")
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
| 8|
| 7|
| 5|
| 6|
| 9|
+---+
scala>
3.Delta Lake提供了编程api,用于有条件地更新、删除和合并(upsert)数据到表中
scala> import io.delta.tables._
import io.delta.tables._
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val deltaTable = DeltaTable.forPath("/tmp/delta-table02")
deltaTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@5e88e2e7
// 通过将每个偶数值加100来更新每个偶数值
scala> deltaTable.update(condition=expr("id % 2 ==0"), set = Map("id"->expr("id+100")))
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
|106|
| 7|
| 5|
|108|
| 9|
+---+
// 删除偶数
scala> deltaTable.delete(condition = expr("id % 2 ==0"))
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
| 7|
| 5|
| 9|
+---+
scala> val newData = spark.range(0,20).toDF
newData: org.apache.spark.sql.DataFrame = [id: bigint]
// 合并新数据
deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).excute()
^
scala> deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).execute()
[Stage 86:===================================> (135 + 51) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 51.98% for 13 writers
[Stage 86:===============================================> (180 + 20) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 48.27% for 14 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 45.05% for 15 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 42.24% for 16 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 39.75% for 17 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 37.54% for 18 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 35.57% for 19 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 33.79% for 20 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 35.57% for 19 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 37.54% for 18 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 39.75% for 17 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 42.24% for 16 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 45.05% for 15 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 48.27% for 14 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 51.98% for 13 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
scala> deltaTable.toDF.show()
+---+
| id|
+---+
| 0|
| 2|
| 6|
| 1|
| 10|
| 11|
| 15|
| 12|
| 4|
| 19|
| 14|
| 5|
| 9|
| 13|
| 8|
| 18|
| 16|
| 7|
| 3|
| 17|
+---+
scala>
作者:wudl
原文链接:https://www.jianshu.com/p/9011524df66d