Spark SQL explaind中的统计信息-深入了解CBO优化
本文翻译自Statistic in Spark SQl explained
Spark SQL 优化器使用两种优化方式:基于规则的和基于代价的。前者依赖于启发式规则,而后者依赖于数据的统计属性。在这篇文章里,我们解释一下在底层这些统计信息是怎么被用到,以及哪些场景下是有用的,并且怎么来使用这些统计信息。
大部分基于启发式的优化规则都没有考虑到被处理的数据属性。比如:基于启发式的PredicatePushDown规则就是基于先过滤再计算的假设。
然而有些场景spark能够通过数据的统计信息来得出更好的计划,这通常被称作基于代价的优化或者CBO,在这篇文章里,我们来探讨一下细节。
怎么看到统计信息
为了能够看到一个表的统计信息首先我们需要通过运行sql语句来计算(所有的SQL语句可以通过使用sql()函数来指定,spark.sql(需要指定的sql字符串)):
ANALYZE TABLE table_name COMPUTE STATISTICS
运行完这个以后,表级别的统计信息就会统计出来并且被存储在元数据中,我们可以通过以下语句来查看:
DESCRIBE EXTENDED table_name
这将会展现一些表属性以及表级别的统计信息。这有两种维度信息:rowCount和sizeBytes:
除了表级别的统计信息,这也有列级别的统计信息,我们可以通过一下语句去计算和查看:
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col_name
DESCRIBE EXTENDED table_name column_name
这将展示给我们类似一下的表(在这个例子中我们使用的列是user_id):
就像你看到的,这里有各种各样的列维度信息,像最大最大最小值,null值的数量,去重的值的数量 (近似值)等。
从Spark 3.0以来,这里有更多的选项去展示这些信息,能够展示的不仅仅是表也包括了实际的查询语句。可以通过explain的的mode参数来实现:
spark.table(table_name).explain(mode='cost')
这个将会给我们展示两种查询计划,物理计划和优化的逻辑计划,该逻辑计划将会展示一些统计信息,就像以下图片展示的:
这个重要的一点是你能看到计划的每个操作的的统计信息,所以在各种各样的转变之后你能看到统计信息的估算。这些统计信息首先通过Relation操作也就是所谓的叶子节点计算出来的,并且每个叶子节点都负责计算,后续经过一些规则通过逻辑计划进行传播。
接下来,我们将会了解叶子节点是这么计算统计信息,以及怎么传播的。
统计信息怎么被计算的
叶子节点计算统计信息有三种方式:第一种也是最好的一种是从元数据中获取的统计信息。第二种是spark将会使用InMemoryFileIndex,他将会调用底层的 Hadoop API去收集数据源中的每个文件的的大小并且求和得到总值sizeInBytes(这里只有sizeInBytes度量会被计算到),最后一种方式是spark将会使用默认的sizeInBytes维度值,该值由spark.sql.defaultSizeInBytes配置 并且该默认值为8EIB,所以基本上spark对于Relation sizeInBytes将会尽可能的进行重新计算覆盖。(这也是只有sizeInBytes这种度量用到),这三种方式可以通过一下图表进行描述:
这个图标是一个树形,每个节点是一条件,假如条件为真,我们将转到T,否则转到F。叶子将会代表统计信息将会计算的实际方式。例如:InMemoryFI 意味着只有sizeInBytes将调用Hadoop API进行计算。另一方面,Stats From M 意味着统计信息将会从元数据获得,然而在左边的数 所有统计信息将会从元数据获取,而右边只有metricsInBytes维度将会被获取。叶子节点CatalogFileIndex 代表着最后一种方法-默认值为8EIB的sizeInBytes将会被使用到。
在图表中,我们有四种条件,第一种决定了统计信息怎么被获取:假如我们读取数据作为一个表df=spark.table(table_name),那我们就进入到左边,否则进入到右边,下一个条件是 是否基于代价的优化(CBO)是否开启,这个通过spark.sql.cbo.enabled配置,默认值是false(到spark 3.0.0).第三个条件是在元数据的统计信息是否通过analyzed table command(ATC)计算出来的,最后一个是表是否分区。
最好的情况是 我们读取数据作为一个表,CBO是开启的,而且已经运行了ATC,这种情况下,所有的统计信息将会从元数据中获取(除了从rowCount计算的sizeInBytes),另一个方面,最坏的情况是,我们读取数据作为一个表,但是ATC没有运行,而且表是分区的,这种情况下默认的sizeInBytes将会从配置中读取,并且计算是很不精确的,注意最坏的情况跟CBO是否开启是无关的。注意一点:假如表不是分区的,spark将会使用Hadoop API计算sizeInBytes,所以表是否分区直接影响了叶子节点的统计信息被计算的方式。
统计信息怎么通过计划被传播的
一旦叶子节点的统计信息被计算出来,该统计信息会被传播到其他节点。这里有两种传播方式:第一种(我们称之为老方式)是非常基本的而且只有一种维度sizeInBytes被传播,并且在各种操作中该维度被调整的的方式也是很基本的。例如,Filter操作并不调整sizeInBytes的值,如下所示:
(
spark.table(table_name)
.filter(col("user_id") < 0)
).explain(mode="cost")
在这个查询中,我们过滤除了所有user_id是负数的记录,实际上是没有该记录的,但是spark并不能获取这种信息,因为这种需要列级别的统计信息,这再老方式中不会被使用到。所以从这查询计划中可以看到,只有sizeInBytes被传播,并且在两个操作中该值保持不变.
第二种统计信息传播的方式更加成熟,从spark 2.2开始但是它要求开启CBO,而且要求通过ATC让元数据储存统计信息。这个时候所有的信息都会被传播,加入我们提供了列级别的维度,spark将会将会计算filter操作,并且计算出一个更好的统计信息:
如你所见,在fiter操作的统计信息被改变了,rowCount非零,sizeInBytes 是1B,是最小值,从这个user_id列级别的统计信息,spark能够知道负user_id的记录是存在的,这个在查询计划中可以反映出来。
在这种新方式中,为了计算sizeInBytes,spark首先根据每个数据类型计算出单行的大小,之后再乘以rowCount去得到最终的sizeInBytes。假如rowCount是零,则sizeInBytes将会设置为1去避免在其他统计算的除零问题。这也适用于project操作(spark知道哪些列将会被投影,所以需要提前计算出单行的大小)
统计信息怎么被使用
此时我们已经知道了统计信息怎么被计算的以及怎么通过逻辑计划传播的,现在让我们来看一下在查询计划中怎么被使用以获取更优的计划。
这有两个地方统计信息会被使用:第一个是JoinSelection策略,这里spark将会决定使用哪种算法进行join两个DataFrame(更多的细节参考 。基本的逻辑就是假如一个df小于某个阈值,spark将会使用BraodcastHashJoin(BHJ),因为假如被广播的df如果很小的话,这将是一个非常有效的方式。这个阈值通过spark.sql.autoBroadcastJoinThreshold 配置,默认是10MB,所以对于df的大小有个很好的预估的话,能够帮助我们选择一个更好的join优化短发。
第二个地方也是跟join相关,即joinRecorder规则,使用这个规则 spark将会找到join操作最优化的顺序(如果你join多于两个表的话)。这个规则默认是关闭的,假如你想开启的话,通过如下配置:
spark.conf.set("spark.sql.cbo.joinReorder.enabled",True)
我们可以通过一下属性控制df的最大数量:
spark.conf.set("spark.sql.cbo.joinReorder.dp.threshold",n)
n的默认值是12。
什么时候使用 ANALYZE TABLE command(ATC)?
我们已经知道假如一个表是分区的,并且我们没有运行ATC,spark将会使用默认的值 8EIB,这是很大的。所以在我们join很多表并且这些表是分区且十分小的情况下,他们是可以进行BHJ的,并且运行ATC是有意义的。当然我们必须清楚,加入一个表的数据被追加或者被覆盖了,之前的统计信息就会被删除,所以我们必须重新运行ATC。在某些情况下,更新元数据的统计信息是比较复杂的。一个解决方法是利用自适应查询-spark 3.0的新特性。
自适应查询
在spark 3.0 自适应查询(AQE)这个新特性被开发,它将会以一种更加高级的方式使用统计信息。假如开启了AQE(默认不开启),在每个stage执行完后,统计信息会被重新计算。这就可以获取更加精确的统计信息,以便能够决定是否使用BHJ,AQE自身是一个很大的主题,我们分几篇文章来介绍它。
结论
在这篇文章中,我们可以知道数据的统计信息如何被使用,以便实现更优化的执行计划。我们也知道了有三种计算叶子节点统计信息的方法,以及这些信息怎么被传播。我们指出了好几种跟分区表相关的案例,以便进行广播join。假如我们没有运行ATC命令,spark将会使用默认的sizeInBytes值,这种情况下广播连接是不可能的。我们也展示了通过开启CBO来更好的传播统计信息,特别是在filter过滤条件中用到的列统计信息。
作者:鸿乃江边鸟
原文链接:https://www.jianshu.com/p/9b8c1e83293d