如何采用cluster模式提交spark-sql
问题产生的背景
在
spark on yarn
模式的客户端,想直接通过spark-submit
或spark-sql
提交HQL脚本或SQL语句spark-sql
不支持cluter模式
,需要在本地启动driver
,占用内存较大若是在一个作业调度系统中,需要减少本地内存的使用
解决思路
如果有spark coding能力,可以实现一个jar包专门接收HQL/SQL参数,采用
spark-submit
提交到集群
自己实现的jar包,处理起sql可能功能有限,支持变量,session管理等诸多问题,那我们可不可以既用spark-sql
又用cluster模式
呢?答案是可以
,下面是分析过程。
为何官方提供spark-sql
不可以采用cluster模式
提交
先来看一下
spark-sql
脚本内容
#!/usr/bin/env bash if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]" exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"复制代码
到头来还是采用spark-submit
提交,只不过是官方实现了一个jar包,main-class: SparkSQLCLIDriver
再来看下
spark-submit
的参数:
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]复制代码
--deploy-mode
正是指定运行模式的,所以这个client
还是cluster
的判断应该是在submit相关处理逻辑中
查看
org.apache.spark.deploy.SparkSubmit
这个类的源码,prepareSubmitEnvironment
函数中有这样一段:
// Fail fast, the following modes are not supported or applicable (clusterManager, deployMode) match { case (STANDALONE, CLUSTER) if args.isPython => error("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") case (STANDALONE, CLUSTER) if args.isR => error("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master "local"") case (_, CLUSTER) if isShell(args.primaryResource) => error("Cluster deploy mode is not applicable to Spark shells.") case (_, CLUSTER) if isSqlShell(args.mainClass) => error("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.") case _ => } ...... /** * Return whether the given main class represents a sql shell. */ private[deploy] def isSqlShell(mainClass: String): Boolean = { mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver" }复制代码
从上面逻辑error("Cluster deploy mode is not applicable to Spark SQL shell.")
可以看出原生的spark-sql
不支持cluster
提交。
深层原因
对于更加深层次的原因,我猜测可能是因为spark-shell
还提供了交互模式,这种模式下driver
必须运行在本地才能持续不断接收用户的sql。但是官方既然有了-e, -f等提交参数的判断,至此多一步是否需要进入交互模式的判断,再来判断是否可以用cluster模式执行也没有问题,但是官方并未提供进一步判断,直接返回不支持的提示。有知道的老铁可以在评论区贴出具体原因。
那我们如何实现 spark-sql
cluster模式
提交呢
如果我们能isSqlShell
这个判断去除不就好了吗,改动 spark-submit
这个类代价风险有点大,你需要替换集群上所有相关jar包,想想还是算了吧。
如果我们不能去除那个判断,那我们把org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
这个类名替换掉不就可以了吗,args.mainClass
这个参数用户是可以指定的,然后我们只需要写个jar包,采用如下方式提交就可以了:
spark-submit \ --name test-of-spark-sql-cluter-mode \ --class <your-class-name> \ --master yarn \ --deploy-mode cluster \ --executor-cores 2 \ --executor-memory 4G \ --num-executors 8 \ --conf spark.executor.memoryOverhead=1G \ --conf spark.submit.deployMode=cluster \ <your-jar-name>.jar \ --hivevar <key>=<2022-01-16> \ -f hdfs://path/to/xxxx.sql复制代码
那如何写这个jar包呢,他需要满足什么条件呢?
其实只要参考下SparkSQLCLIDriver
这个object(scala 代码)你就知道如何编写了,需要满足的条件
支持sparks-ql所有支持的参数
不影响 spark-submit 的参数
最好的方法就是把 SparkSQLCLIDriver
这个类拷贝出来,改个名字,可惜 SparkSQLCLIDriver
中很多成员变量及函数调用访问级别都是private。所以把它拷贝出来的同时需要注意在coding的时候需要把你的包名写成org.apache.spark.sql.hive.<your-package>
,继承org.apache.spark.sql.hive
这个包就可以了,需要注意以下问题:
main-class 包名继承
org.apache.spark.sql.hive
去除交互部分的代码
稍加改造 System.exit,防止程序过早退出
if (sessionState.execString != null) { System.exit(cli.processLine(sessionState.execString)) } try { if (sessionState.fileName != null) { System.exit(cli.processFile(sessionState.fileName)) } } catch { case e: FileNotFoundException => logError(s"Could not open input file for reading. (${e.getMessage})") System.exit(3) }复制代码
执行 -e 或 -f 命令时,以上代码会导致程序过早退出,当交给spark-submit接管时,导致shutdown hook 在最后状态接收前执行,spark会判断程序执行失败。
最终实现(由于sbt构建是在太慢,采用了maven)
代码地址:github.com/zhfk/spark-…
提交执行
比如jar包名称为:my-spark-sql-cluster.jar
spark-submit \ --name test-of-spark-sql-cluter-mode \ --class org.apache.spark.sql.hive.my.MySparkSQLCLIDriver \ --master yarn \ --deploy-mode cluster \ --executor-cores 2 \ --executor-memory 4G \ --num-executors 8 \ --conf spark.executor.memoryOverhead=1G \ --conf spark.submit.deployMode=cluster \ my-spark-sql-cluster.jar.jar \ --hivevar <key>=<2022-01-16> \ #spark-sql参数放jar包后面 -f hdfs://path/to/xxxx.sql #spark-sql参数放jar包后面复制代码
cluster模式提交spark-sql执行后无法拿到日志
运行过程中,sql的输出你当然无法看到,因为driver是运行在yarn上的。不过,还是可以拿到执行过程的日志,里面包含了执行状态和applcationId
, 我们只需要稍稍修改一下spark-conf下的log4j.properties文件,将日志级别提高到INFO。(也可以自己指定log4j.properties,指定方法请参考spark官网)
sed -i "s/log4j.rootCategory=WARN, console/log4j.rootCategory=INFO, console/g" $SPARK_CONF_DIR/log4j.properties
作者:zhfk
链接:https://juejin.cn/post/7056334152408236046