阅读 107

Spark 常用的读取数据api

Spark读取数据API

spark.read.format("json").load(path)
spark.read.format("text").load(path)
spark.read.format("parquet").load(path)
spark.read.format("json").option("...","...").load(path)

实例

package com.imooc.bigdata.chapter05

import java.util.Properties

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

object DataSourceApp {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local").getOrCreate()

     text(spark)
     json(spark)
     common(spark)
     parquet(spark)

     convert(spark)

     jdbc(spark)
    jdbc2(spark)
    spark.stop()
  }

  // 代码打包,提交到YARN或者Standalone集群上去,注意driver的使用
  def jdbc2(spark:SparkSession): Unit = {
    import spark.implicits._

    val config = ConfigFactory.load()
    val url = config.getString("db.default.url")
    val user = config.getString("db.default.user")
    val password = config.getString("db.default.password")
    val driver = config.getString("db.default.driver")
    val database = config.getString("db.default.database")
    val table = config.getString("db.default.table")
    val sinkTable = config.getString("db.default.sink.table")

    val connectionProperties = new Properties()
    connectionProperties.put("user", user)
    connectionProperties.put("password", password)

    val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

    jdbcDF.filter($"game_top" > 100).show() //.write.jdbc(url, s"$database.$sinkTable", connectionProperties)
  }
  /**
    * 有些数据是在MySQL,如果使用Spark处理,肯定需要通过Spark读取出来MySQL的数据
    * 数据源是text/json,通过Spark处理完之后,我们要将统计结果写入到MySQL
    */
  def jdbc(spark:SparkSession): Unit = {
    import spark.implicits._

//    val jdbcDF = spark.read
//      .format("jdbc")
//      .option("url", "jdbc:mysql://10.133.3.10:3306")
//      .option("dbtable", "spark.browser_stat")
//      .option("user", "root")
//      .option("password", "root")
//      .load()
//
//    jdbcDF.filter($"cnt" > 100).show(100)

    // 死去活来法

    val url = "jdbc:mysql://10.133.3.10:3306"
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")

    val jdbcDF: DataFrame = spark.read
      .jdbc(url, "spark.taptap", connectionProperties)

    jdbcDF.filter($"game_top" > 100)
      .write.jdbc(url, "spark.taptaps", connectionProperties)
  }

  // 存储类型转换:JSON==>Parquet
  def convert(spark:SparkSession): Unit = {
    import spark.implicits._

    val jsonDF: DataFrame = spark.read.format("json").load("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json")
//    jsonDF.show()

    jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")

    spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\out").show()

  }

  // Parquet数据源
  def parquet(spark:SparkSession): Unit = {
    import spark.implicits._

    val parquetDF: DataFrame = spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\users.parquet")
    parquetDF.printSchema()
    parquetDF.show()

//    parquetDF.select("name","favorite_numbers")
//      .write.mode("overwrite")
//        .option("compression","none")
//      .parquet("out")

//    spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\out").show()
  }

  // 标准API写法
  def common(spark:SparkSession): Unit = {
    import spark.implicits._

    // 源码面前 了无秘密
//    val textDF: DataFrame = spark.read.format("text").load("file:///Users/rocky/IdeaProjects/imooc-workspace/sparksql-train/data/people.txt")
    val jsonDF: DataFrame = spark.read.format("json").load("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json")
//
//    textDF.show()
//    println("~~~~~~~~")
//    jsonDF.show()

    jsonDF.write.format("json").mode("overwrite").save("out")

  }

    // JSON
  def json(spark:SparkSession): Unit = {
    import spark.implicits._
    val jsonDF: DataFrame = spark.read.json("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json")

    //jsonDF.show()

    // TODO... 只要age>20的数据
    //jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")

    val jsonDF2: DataFrame = spark.read.json("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people2.json")
    jsonDF2.select($"name",$"age",$"info.work".as("work"), $"info.home".as("home")).write.mode("overwrite").json("out")
  }

  // 文本
  def text(spark:SparkSession): Unit = {

    import spark.implicits._
    val textDF: DataFrame = spark.read.text("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.txt")

    // textDF.show()
    val result: Dataset[(String)] = textDF.map(x => {
      val splits: Array[String] = x.getString(0).split(",")
      (splits(0).trim) //, splits(1).trim
    })

//     SaveMode.Append:保留上次结果新增   overwrite:先删除上次结果,然后在添加这次结果
    result.write.mode("overwrite").text("out") // 如果才能支持使用text方式输出多列的值呢?
  }
}

  

原文:https://www.cnblogs.com/yoyo1216/p/13534566.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐