Spark从入门到放弃—Spark SQL
简介
Spark SQL是Spark中用于结构化数据处理的一个模块。与Spark RDD API不同,Spark SQL相关接口提供了关于数据结构以及计算执行过程的更多信息。Spark SQL在内部会根据这些信息去执行额外的优化操作。Spark SQL将Spark的函数式编程API与SQL查询集成在一起,它支持通过SQL或者Hive语言来查询数据,同时提供了一种叫做DataFrame和Dataset的结构来对结构化数据进行抽象,并且具有相应的DataFrame API和Datasets API来与Spark SQL进行交互。Spark SQL特点
Spark SQL具有以下几个特点:
- 易整合
无缝地整合了SQL查询和Spark编程。 - 统一数据访问
使用相同的方式(DataFrames)来访问不同的数据源,包括Hive、Avro、Parquet、JSON、JDBC等。 - 兼容Hive
在已有的Hive仓库上直接运行SQL或者HiveQL - 标准数据库连接
通过JDBC或者ODBC来连接Spark SQL - 可扩展性
Spark SQL对于交互式查询和长查询均使用同一套引擎。 - 性能优化
Spark SQL中,查询优化引擎会将会每一条SQL语句都转换为逻辑计划,随后将其转化为物理执行计划。在执行阶段,它会选择最优的计划来执行,确保可以得到比Hive查询更快的执行速度。
DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统关系数据库中的二维表格。DataFrame可以通过多种数据源构造得到,比如:结构性数据、Hive表格、外部数据库、或者已有的RDD。DataFrame API支持多种编程语言,包括Scala、Java、Python以及R。DataFrame与RDD的主要区别在于,前者带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对隐藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame的特点如下:
- 能够在单节点以及集群上处理KB到PB量级的数据
- 支持不同的数据格式(Avro、csv、elastic等)和不同的存储系统(HDFS、HIVE表、MySQL等)。
- 通过Spark SQL的Catalyst优化器来进行代码优化和生成
- 可以通过Spark Core轻松地集成到所有的大数据工具和框架中
- 提供多种语言的API接口,包括Python、Java、Scala等。
DataFrame实践
在Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象Spark Context,Spark SQL 其实可以理解为对 Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
DataFrame代码实践
从数据源创建DataFrame
spark安装包目录下默认为我们准备了相关数据,以其中的people.json为例,路径为“/spark-x.x.x-bin-hadoop2.7/examples/src/main/resources/people.json”,内容为:使用下面的代码来加载json文件:
scala> val df = spark.read.json("./examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
打印出DataFrame的结构信息,代码如下:
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
可以看到DataFrame每行数据的第一个属性“age”是long类型,第二个属性“name”是string类型。
DataFrame操作
我们可以使用两种风格的语法来对数据进行操作,分别是SQL语句和DSL(domain-specific language)语法,使用DSL语法风格的话,就不必去创建临时视图了,下面分别介绍。
DSL语法
Select算子
比如我们想选择指定的"age"列,可以使用select方法,代码如下:
scala> df.select("age").show()
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
可以看到只有”age“那一列被输出来了。
Filter算子
使用filter操作来对数据进行筛选,比如我们需要筛选出年龄大于27岁的人,代码如下:
scala> df.filter(df("age") > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter('age > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
上述代码展示了3种写法,结果是相同的。
groupBy算子
使用groupBy算子来根据某一列对数据进行分组,并且查看条数,代码如下:
scala> df.groupBy("name").count.show()
+-------+-----+
| name|count|
+-------+-----+
|Michael| 1|
| Andy| 1|
| Justin| 1|
+-------+-----+
上述代码根据属性“name”对数据进行了分组,并统计了每一个分组的个数。
SQL语法
SQL语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助,即SQL是针对视图的,而不是针对DataFrame的。
还是以上面创建的DataFrame为例,为了使用SQL语法来对数据进行查询,我们首先创建一个临时表。代码如下:
scala> df.createOrReplaceTempView("people")
接下来我们就可以在这个视图上进行SQL查询了。首先全选表格再打印出来,代码如下:
scala> val sqldf = spark.sql("SELECT * FROM people")
sqldf: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqldf.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
筛选出指定的“name”列:
scala> val sqldf = spark.sql("SELECT name FROM people").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
对数据进行过滤,选出年龄大于27岁的数据:
scala> val sqldf = spark.sql("SELECT * FROM people WHERE age >= 27").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
根据name对数据进行分组,然后统计每个组的个数:
scala> val sqldf = spark.sql("SELECT name, COUNT(*) FROM people GROUP BY name").show()
+-------+--------+
| name|count(1)|
+-------+--------+
|Michael| 1|
| Andy| 1|
| Justin| 1|
+-------+--------+
注意:
普通临时表是SparkSession范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如global_temp.people。
全局表的创建如下:
df.createGlobalTempView("people1")
scala> spark.sql("SELECT * FROM global_temp.people1").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people1").show() //创建新的Session来进行SQL查询
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
如果使用第二种方式去访问之前创建的临时表“people”,则会报错,如下:
scala> spark.newSession().sql("SELECT * FROM people").show()
org.apache.spark.sql.AnalysisException: Table or view not found: people; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [people]
错误显示没有找到对应的表。
DataSet
DataSet是一个分布式数据集合,它是 Spark 1.6 版本中新增的一个接口, 它结合了 RDD(强类型,可以使用强大的 lambda 表达式函数) 和 Spark SQL 的优化执行引擎的好处。Dataset 可以从 JVM 对象构造得到,随后可以使用函数式的变换(map,flatMap,filter 等)进行操作。DataSet是具有强类型的数据集合,在构造的时候需要提供对应的类型信息。
DataSet创建
在Scala中创建Dataset,要定义Scala的case类,case类是具有以下特征的类:
- 不可变
- 通过模式匹配可分解,来获取类属性
- 允许基于结构的比较而不是基于引用的比较
- 易于使用和操作
- 不变性使你无需跟踪对象变化和时间和位置
- “按值比较”允许直接比较实例的值,就像它们是基本类型值(primitive v a l u e)一样,这样就避免了混淆类实例基于值比较或是基于引用比较所带来的不确定性。
- 模式匹配简化了分支逻辑,从而引入更少的bug和带来可读性更好的代码。
要创建DataSet,首先我们需要为数据集定义一个case class,以上文的people.json文件为例,定义case class如下:
scala> case class employee(name: String, salary: Long)
defined class employee
定义好case class之后,它表示数据中的单个记录,我们可以通过样例类的属性来直接获取对应的值。下面是从文件中创建Dataset的代码:
//注意我们默认读出来的数据是DataFrame,通过as方法转换为指定的行类型。
scala> val employeeDS = spark.read.json("./examples/src/main/resources/employees.json").as[employee]
employeeDS: org.apache.spark.sql.Dataset[employee] = [name: string, salary: bigint]
scala> employeeDS.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
DataSet操作
对数据进行过滤,筛选出薪水大于4000的数据:
scala> employeeDS.filter(line => line.salary > 4000).show
+----+------+
|name|salary|
+----+------+
|Andy| 4500|
+----+------+
对数据进行映射,只取每一行数据的name字段,如下:
scala> employeeDS.map(line => line.name).show
+-------+
| value|
+-------+
|Michael|
| Andy|
| Justin|
| Berta|
+-------+
对数据按照名称进行分组聚合:
scala> val fc = employeeDS.groupBy("name").count()
fc: org.apache.spark.sql.DataFrame = [name: string, count: bigint]
scala> fc.show
+-------+-----+
| name|count|
+-------+-----+
|Michael| 1|
| Andy| 1|
| Berta| 1|
| Justin| 1|
+-------+-----+
根据每行数据对应的key来进行分组:
scala> val ec = employeeDS.groupByKey(x => x.name).count()
ec: org.apache.spark.sql.Dataset[(String, Long)] = [key: string, count(1): bigint]
scala> ec.show
+-------+--------+
| key|count(1)|
+-------+--------+
|Michael| 1|
| Andy| 1|
| Berta| 1|
| Justin| 1|
+-------+--------+
对两个不同的DataSet进行连接操作,整合成一个。以上面形成的DataSet为例,我们构造一个假的DataSet,然后使用join方法来进行连接操作。
// 构造样例类
scala> case class EmployeeMetadata(name: String, number: BigInt)
defined class EmployeeMetadata
scala> val nameList = List("Michael","Andy","Berta","Justin")
nameList: List[String] = List(Michael, Andy, Berta, Justin)
scala> val metaData = spark.range(4).map(x => (nameList(x.toInt), x)).withColumnRenamed("_1", "name").withColumnRenamed("_2", "number").as[EmployeeMetadata]
metaData: org.apache.spark.sql.Dataset[EmployeeMetadata] = [name: string, number: bigint]
scala> metaData.join(fc, metaData.col("name") === fc.col("name")).show
+-------+------+-------+-----+
| name|number| name|count|
+-------+------+-------+-----+
|Michael| 0|Michael| 1|
| Andy| 1| Andy| 1|
| Berta| 2| Berta| 1|
| Justin| 3| Justin| 1|
+-------+------+-------+-----+
DataFrame、RDD、DataSet相互转换
实际开发程序的时候,我们经常需要在RDD、DataFrame、DataSet之间相互操作,此时需要引入:
import spark.implicits._
但是如果我们使用spark-shell的话,默认它已经帮我们引入了,所以就无需再单独添加这行代码了。这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
以上文的employee.json为例,我们首先创建一个DataFrame,如下:
scala> val df = spark.read.json("./examples/src/main/resources/employees.json")
rdd: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> df.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
使用rdd方法将其转化为RDD:
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[544] at rdd at <console>:25
scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([Michael,3000], [Andy,4500], [Justin,3500], [Berta,4000])
scala> array(0)
res157: org.apache.spark.sql.Row = [Michael,3000]
scala> array(0)(0)
res158: Any = Michael
scala> array(0)(1)
res159: Any = 3000
定义样例类,使用as方法将DataFrame转化为DataSet:
scala> case class Employee(name:String, salary:BigInt)
defined class Employee
scala> val ds = df.as[Employee]
ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint]
scala> ds.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
反之,使用toDF()方法可以将DataSet转换回DataFrame:
scala> val ddf = ds.toDF()
ddf: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> ddf.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
使用rdd方法将DataSet转换为RDD:
scala> val rddd = ds.rdd
rddd: org.apache.spark.rdd.RDD[Employee] = MapPartitionsRDD[557] at rdd at <console>:25
scala> rddd.first()
res165: Employee = Employee(Michael,3000)
我们再来定义一个RDD,并将其转换为DataFrame:
scala> val rdd = sc.makeRDD(List(("halo", 27), ("ice",18)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[561] at makeRDD at <console>:24
scala> rdd.collect
res169: Array[(String, Int)] = Array((halo,27), (ice,18))
使用toDF()方法,将其转换为DataFrame,注意要传入对应的列名,否则默认的列名是以"_"加编号构成,如下:
scala> rdd.toDF().show()
+----+---+
| _1| _2|
+----+---+
|halo| 27|
| ice| 18|
+----+---+
scala> rdd.toDF("name", "age").show
+----+---+
|name|age|
+----+---+
|halo| 27|
| ice| 18|
+----+---+
SparkSQL能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。
我们首先定义case类,然后使用toDS方法,将RDD转换为DataSet:
case class User(name:String, age:Int)
进行转换:
scala> val ds = rdd.map(t=>User(t._1, t._2)).toDS()
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> ds.show
+----+---+
|name|age|
+----+---+
|halo| 27|
| ice| 18|
+----+---+
用一张图来说明这三者之间的转换关系:DataFrame、RDD、DataSet三者的关系
在开始这三者的比较之前,首先回顾一下Spark中关于这三种数据类型的定义:
- Spark RDD APIs
RDD也叫做弹性分布式数据集。它是关于数据的只读分区集合。RDD是Spark的最基础的数据结构。它允许程序员以容错的方式在大型集群上执行内存计算操作。更多资料可以参考链接。 - Spark DataFrame APIs
与RDD不同,DataFrame的数据根据命名的列组织起来,类似于关系数据库中的表格。它也是不可变的分布式数据集合。Spark中的DataFrame允许程序员将结构追加到分布式数据集合上,从而可以进行更高级别的抽象。 - Spark DataSet APIs
DataSets是DataFrame APIs的一个扩展,它提供了类型安全,面向对象的编程接口。DataSets充分利用了Spark的Catalyst优化器。更多可以参考链接。
下面从这15个方面来对这三者进行全方位的对比总结:
1. 发布时间
- RDD
Spark 1.0发布 - DataFrames
Spark 1.3发布 - DataSet
Spark 1.6发布
2. 数据表示
- RDD
RDD是一个分布式的数据元素集合,它分布在计算机集群中的多台机器上。 - DataFrame
DataFrame也是一个分布式的数据集合,不过它将数据通过命名的列组织起来。它与关系数据库中的表类似。 - DataSet
它是DataFrame的一个扩展,具有强类型、面向对象等特点,并且充分利用了Catalyst优化器和堆外存储机制的优势。
3. 数据格式
- RDD
RDD可以轻松地处理有结构和无结构的数据。但是RDD并没有给数据指定具体的结构,需要用户显式地指定。 - DataFrame
DataFrame仅适用于结构化或者半结构化的数据。它以命名的列的方式来组织数据,同时允许Spark来管理数据的Schema。 - DataSet
DataSet也可以有效地处理结构化和非结构化数据。它以JVM行对象或者行对象集合的形式来表示数据。
4. 数据源API
- RDD
RDD可以从任何数据源中读取数据,比如text文件、通过JDBC连接的数据库等,而且可以轻松地处理这些数据,而无需指定预定义好的结构。 - DataFrame
DataFrame APIs允许程序员处理不同格式的数据,包括(ARVO、CSV、JSON),以及不同的存储系统,包括HDFS、HIVE表格、MySQL等。DataFrame可以从这些数据源中读写数据。 - DataSet
DataSet APIs也支持从不同的数据源中读取数据。
5. 不变性和可操作性
- RDD
RDD包含的数据集是已经被划分好分区的。RDD中中体现并行性的基本单元称为分区。每个分区是对数据的一个逻辑划分,该分区是是不可变的,并且是通过对现有分区进行某种Transformation变换之后创建得到的。不变性有利于实现计算的一致性。我们可以使用toDF()方法将一个RDD转换为DataFrame对象;反之,也可以通过rdd方法转换回来。 - DataFrame
转换为DataFrame对象之后,无法重新生成域对象。举个例子,如果你从testRDD中转换得到testDF,那么你将无法恢复得到原先的RDD对象。 - DataSet
DataSet克服了DataFrame的局限性,可以从Dataframe中重新生成RDD。 DataSet使你可以将现有的RDD和DataFrame转换为DataSets。
6. 编译期类型安全
- RDD
RDD提供了熟悉的面向对象的编程风格以及编译时类型的安全性。 - DataFrame
如果你尝试访问table中不存在的列,那么DataFrame无法在编译期间产生错误,它只有在运行期间才能察觉到这个错误。 - DataSet
它提供了编译期间类型安全功能
7. 优化
- RDD
RDD中并没有可用的内置优化引擎。当处理结构化数据的时候,RDD无法利用spark内置的高级优化器。比如Catalyst优化器和Tungsten执行引擎。开发人员可以根据每个RDD的属性来自行优化。 -
DataFrame
DataFrame可以使用Catalyst树转化框架来优化,优化分为四步进行:a)根据引用分析逻辑计划。b)逻辑计划优化。c)物理执行计划。d)代码生成,将Query的一部分生成Java字节码。整体优化过程如下图:
- DataSet
与DataFrame类似,也提供了Catalyst优化功能。
8. 序列化
- RDD
无论什么时候Spark需要在集群中分发数据或者需要将数据写入到磁盘的时候,它使用了Java序列化。序列化单个Java和Scala对象的开销非常高,并且需要在节点之间同时发送数据和结构信息。 - DataFrame
Spark DataFrame可以将数据以二进制的格式序列化到堆外存储空间中,然后直接在堆外内存上执行Transformation操作,因为Spark可以读懂这些格式。这样就不需要通过Java序列化来对数据进行编码了。 - DataSet
在序列化数据的时候,DataSet APIs具有编码器的概念,该编码器可以处理JVM对象和表格表示之间的转换。它使用了Spark内部的Tungsten二进制格式来存储表格数据。DataSet可以在序列化后的数据上直接执行操作以提高内存使用效率。它同时也允许按需访问单个属性,而无需对整个对象进行反序列化。
9. 垃圾回收
- RDD
创建和销毁单个RDD对象会导致GC的开销。 - DataFrame
对DataSet中的每一行创建独立对象的时候,这样避免GC的开销。 - DataSet
DataSet使用了堆外数据序列化,因此在对象销毁的时候也无需进行GC操作。
10. 效率/内存使用
- RDD
当对单个Java和Scala对象进行序列化的时候,这会花费大量时间,从而降低了效率。 - DataFrame
使用堆外内存来进行序列化减少了GC的负荷。它可以动态产生字节码,以便对该序列化数据执行许多操作,对于一些小型的操作而言,无需反序列化就可以执行。 - DataSet
它允许在序列化数据上执行操作,从而提高了内存的使用效率。
11. 惰性机制
-
RDD
RDD是惰性执行的,意味着它们并不是立马就被计算。相反,它首先记住在原始RDD上各种Transformation操作,只有当一个Action操作要求从driver节点中获取结果的时候,Spark才会开始计算过程。具体示意图如下:
- DataFrame
Spark对DataFrame也是采用惰性评估的方法,即只有当一个Action操作发生的时候,Spark才会开始真正的计算过程。 - DataSet
与RDD和DataFrame一致。
12. 编程语言支持
- RDD
RDD APIs支持Java、Scala、Python以及R语言。 - DataFrame
与RDD一致 - DataSet
DataSet API目前只支持Java和Scala。
13. 聚合操作
- RDD
RDD API执行简单的Group和aggregation操作会相对较慢。 - DataFrame
DataFrame API非常易用。它在大型数据集上速度更快。 - DataSet
在大型数据集上的聚合操作较快。
14. 应用领域
- RDD
1)当在你的数据集上执行一些低级Transformation和Action的时候,可以使用RDD。
2)在需要高级抽象的时候,也可以使用RDD。 - DataFrame和DataSet
1)当我们需要更高阶的抽象的时候,可以使用DataFrame和DataSet。
2)对于无结构的数据,比如媒体流和文本流
3)当需要使用特定领域API,可以使用DataFrame和DataSet
4)当需要使用高级表达式,比如filter、map、aggregation、SQL时,可以考虑使用DataFrame和DataSet
5) 如果需要更高级别的编译期类型安全检查,可以使用DataFrame和DataSet
参考
- https://www.bilibili.com/video/BV11A411L7CK?p=186&spm_id_from=pageDriver
- https://spark.apache.org/docs/2.4.3/sql-distributed-sql-engine.html
- https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/
- https://www.tutorialspoint.com/spark_sql/spark_sql_data_sources.htm
- https://www.edureka.co/blog/spark-sql-tutorial/
- https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/sql-guide.html
- http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/index.html
作者:HaloZhang
原文链接:https://www.jianshu.com/p/ccabdb4d88e0