阅读 127

如何采用cluster模式提交spark-sql

问题产生的背景

  • spark on yarn模式的客户端,想直接通过spark-submitspark-sql提交HQL脚本或SQL语句

  • spark-sql不支持cluter模式,需要在本地启动driver,占用内存较大

  • 若是在一个作业调度系统中,需要减少本地内存的使用

解决思路

  • 如果有spark coding能力,可以实现一个jar包专门接收HQL/SQL参数,采用spark-submit提交到集群

自己实现的jar包,处理起sql可能功能有限,支持变量,session管理等诸多问题,那我们可不可以既用spark-sql又用cluster模式呢?答案是可以,下面是分析过程。

为何官方提供spark-sql不可以采用cluster模式提交

  • 先来看一下spark-sql脚本内容

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"复制代码

到头来还是采用spark-submit提交,只不过是官方实现了一个jar包,main-class: SparkSQLCLIDriver

  • 再来看下spark-submit的参数:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]复制代码

--deploy-mode 正是指定运行模式的,所以这个client还是cluster的判断应该是在submit相关处理逻辑中

  • 查看 org.apache.spark.deploy.SparkSubmit 这个类的源码,prepareSubmitEnvironment 函数中有这样一段:

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
  case (STANDALONE, CLUSTER) if args.isPython =>
    error("Cluster deploy mode is currently not supported for python " +
      "applications on standalone clusters.")
  case (STANDALONE, CLUSTER) if args.isR =>
    error("Cluster deploy mode is currently not supported for R " +
      "applications on standalone clusters.")
  case (LOCAL, CLUSTER) =>
    error("Cluster deploy mode is not compatible with master "local"")
  case (_, CLUSTER) if isShell(args.primaryResource) =>
    error("Cluster deploy mode is not applicable to Spark shells.")
  case (_, CLUSTER) if isSqlShell(args.mainClass) =>
    error("Cluster deploy mode is not applicable to Spark SQL shell.")
  case (_, CLUSTER) if isThriftServer(args.mainClass) =>
    error("Cluster deploy mode is not applicable to Spark Thrift server.")
  case _ =>
}

......

/**
 * Return whether the given main class represents a sql shell.
 */
private[deploy] def isSqlShell(mainClass: String): Boolean = {
  mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}复制代码

从上面逻辑error("Cluster deploy mode is not applicable to Spark SQL shell.")可以看出原生的spark-sql不支持cluster提交。

  • 深层原因

对于更加深层次的原因,我猜测可能是因为spark-shell还提供了交互模式,这种模式下driver必须运行在本地才能持续不断接收用户的sql。但是官方既然有了-e, -f等提交参数的判断,至此多一步是否需要进入交互模式的判断,再来判断是否可以用cluster模式执行也没有问题,但是官方并未提供进一步判断,直接返回不支持的提示。有知道的老铁可以在评论区贴出具体原因。

那我们如何实现 spark-sql cluster模式 提交呢

如果我们能isSqlShell这个判断去除不就好了吗,改动 spark-submit 这个类代价风险有点大,你需要替换集群上所有相关jar包,想想还是算了吧。

如果我们不能去除那个判断,那我们把org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver这个类名替换掉不就可以了吗,args.mainClass 这个参数用户是可以指定的,然后我们只需要写个jar包,采用如下方式提交就可以了:

spark-submit \
  --name test-of-spark-sql-cluter-mode \
  --class <your-class-name> \
  --master yarn \
  --deploy-mode cluster \
  --executor-cores 2 \
  --executor-memory 4G \
  --num-executors 8 \
  --conf spark.executor.memoryOverhead=1G \
  --conf spark.submit.deployMode=cluster \
 <your-jar-name>.jar \
  --hivevar <key>=<2022-01-16> \
  -f hdfs://path/to/xxxx.sql复制代码

那如何写这个jar包呢,他需要满足什么条件呢? 其实只要参考下SparkSQLCLIDriver这个object(scala 代码)你就知道如何编写了,需要满足的条件

  1. 支持sparks-ql所有支持的参数

  2. 不影响 spark-submit 的参数

最好的方法就是把 SparkSQLCLIDriver 这个类拷贝出来,改个名字,可惜 SparkSQLCLIDriver中很多成员变量及函数调用访问级别都是private。所以把它拷贝出来的同时需要注意在coding的时候需要把你的包名写成org.apache.spark.sql.hive.<your-package>,继承org.apache.spark.sql.hive这个包就可以了,需要注意以下问题:

  1. main-class 包名继承org.apache.spark.sql.hive

  2. 去除交互部分的代码

  3. 稍加改造 System.exit,防止程序过早退出

if (sessionState.execString != null) {
  System.exit(cli.processLine(sessionState.execString))
}

try {
  if (sessionState.fileName != null) {
    System.exit(cli.processFile(sessionState.fileName))
  }
} catch {
  case e: FileNotFoundException =>
    logError(s"Could not open input file for reading. (${e.getMessage})")
    System.exit(3)
}复制代码

执行 -e 或 -f 命令时,以上代码会导致程序过早退出,当交给spark-submit接管时,导致shutdown hook 在最后状态接收前执行,spark会判断程序执行失败。

最终实现(由于sbt构建是在太慢,采用了maven)

代码地址:github.com/zhfk/spark-…

提交执行

比如jar包名称为:my-spark-sql-cluster.jar

spark-submit \
  --name test-of-spark-sql-cluter-mode \
  --class org.apache.spark.sql.hive.my.MySparkSQLCLIDriver \
  --master yarn \
  --deploy-mode cluster \
  --executor-cores 2 \
  --executor-memory 4G \
  --num-executors 8 \
  --conf spark.executor.memoryOverhead=1G \
  --conf spark.submit.deployMode=cluster \
  my-spark-sql-cluster.jar.jar \
  --hivevar <key>=<2022-01-16> \ #spark-sql参数放jar包后面
  -f hdfs://path/to/xxxx.sql     #spark-sql参数放jar包后面复制代码

cluster模式提交spark-sql执行后无法拿到日志

运行过程中,sql的输出你当然无法看到,因为driver是运行在yarn上的。不过,还是可以拿到执行过程的日志,里面包含了执行状态和applcationId, 我们只需要稍稍修改一下spark-conf下的log4j.properties文件,将日志级别提高到INFO。(也可以自己指定log4j.properties,指定方法请参考spark官网)

sed -i "s/log4j.rootCategory=WARN, console/log4j.rootCategory=INFO, console/g" $SPARK_CONF_DIR/log4j.properties


作者:zhfk
链接:https://juejin.cn/post/7056334152408236046

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐