评论

收藏

[NoSQL] Spark学习--SparkSQL02

数据库 数据库 发布于:2021-07-21 13:41 | 阅读数:255 | 评论:0

数据读写
DataFrameReader
组件解释schema
结构信息, 因为 Dataset 是有结构的, 所以在读取数据的时候, 就需要有 Schema 信息, 有可能是从外部数据源获取的, 也有可能是指定的
option
连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV 文件是否引入 Header 等
format
外部数据源的格式, 例如 csv, jdbc, json 等
读取文件的方式
def reader(): Unit = {
  // 1. 创建 SparkSession
  val spark = SparkSession.builder()
    .master("local[6]")
    .appName("reader")
    .getOrCreate()
  // 2. 第一种形式
  spark.read
    .format("csv")
    .option("header", value = true)
    .option("inferSchema", value = true)
    .load("XXX.csv")
    .show(10)
  // 3. 第二种形式
  spark.read
    .option("header", value = true)
    .option("inferSchema", value = true)
    .csv("XXX.csv")
    .show()
  }
DataFrameWriter
组件解释source
写入目标, 文件格式等, 通过 format 方法设定
mode
写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定
extraOptions
外部参数, 例如 JDBC 的 URL, 通过 options, option 设定
partitioningColumns
类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定
bucketColumnNames
类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定
sortColumnNames
用于排序的列, 通过 sortBy 设定
mode 指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等
Scala 对象表示字符串表示解释SaveMode.ErrorIfExists
"error"
将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错
SaveMode.Append
"append"
将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中
SaveMode.Overwrite
"overwrite"
将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标
SaveMode.Ignore
"ignore"
将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS
写入文件的方式
def writer(): Unit = {
  // 1. 创建 SparkSession
  val spark = SparkSession.builder()
    .master("local[6]")
    .appName("write")
    .getOrCreate()
  // 2. 读取数据集
  val df = spark.read.option("header", value = true).csv("dataset/BeijingPM20100101_20151231.csv")
  // 3. 写入数据集
  df.write.json("dataset/beijing_pm.json")
  df.write.format("json").save("dataset/beijing_pm2.json")
  }

读写 Parquet 格式文件
def parquetReadWritePartitions(): Unit = {
   //1. 读取数据
  val df = spark.read
    .option("header", value = true)
    .csv("dataset/BeijingPM20100101_20151231.csv")
   //2. 写文件, 表分区
  df.write
    .partitionBy("year", "month")
    .save("dataset/beijing_pm")
  //3. 读文件, 自动发现分区
  // 写分区表的时候, 分区列不会包含在生成的文件中
  // 直接通过文件来进行读取的话, 分区信息会丢失
  //spark sql 会进行自动的分区发现
  spark.read
    .parquet("dataset/beijing_pm4")
    .printSchema()
  }
DSC0000.png

 总结

  • Spark 不指定 format 的时候默认就是按照 Parquet 的格式解析文件
  • Spark 在读取 Parquet 文件的时候会自动的发现 Parquet 的分区和分区字段
  • Spark 在写入 Parquet 文件的时候如果设置了分区字段, 会自动的按照分区存储

读写 JSON 格式文件
def json(): Unit = {
  val df = spark.read
    .option("header", value = true)
    .csv("xxx.csv")
  //直接转换
  df.toJSON.show()
  
  df.write
    .json("xxx.json")
  spark.read
    .json("xxx.json")
    .show()
  }
Spark访问Hive
Hive的MetaStore
概念
Hive 的 MetaStore 是一个 Hive 的组件, 一个 Hive 提供的程序, 用以保存和访问表的元数据
配置hive-site.xml
<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hive/warehouse</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://node01:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>username</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>password</value>
</property>
<property>
  <name>hive.metastore.local</name>
  <value>false</value>
</property>
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://node01:9083</value>  //当前服务器
</property>
启动
nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
Hive操作
shell操作
CREATE EXTERNAL TABLE student
(
  name  STRING,
  age   INT,
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'  //行分隔符
  LINES TERMINATED BY '\n'   //列分隔符
STORED AS TEXTFILE
LOCATION '/spark/hive';
LOAD DATA INPATH '/data/spark/studenttab10k' OVERWRITE INTO TABLE student; //文件存储位置
API操作
def main(args: Array[String]): Unit = {
  // 1. 创建 SparkSession
  //  1. 开启 Hive 支持
  //  2. 指定 Metastore 的位置
  //  3. 指定 Warehouse 的位置
  val spark = SparkSession.builder()
    .appName("hive access")
    .enableHiveSupport()
    .config("hive.metastore.uris", "thrift://node01:9083")
    .config("spark.sql.warehouse.dir", "/spark/hive")
    .getOrCreate()
  import spark.implicits._
  // 2. 读取数据
  //  1. 上传 HDFS, 因为要在集群中执行, 没办法保证程序在哪个机器中执行
  //    所以, 要把文件上传到所有的机器中, 才能读取本地文件
  //    上传到 HDFS 中就可以解决这个问题, 所有的机器都可以读取 HDFS 中的文件
  //    它是一个外部系统
  //  2. 使用 DF 读取数据
  val schema = StructType(
    List(
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("gpa", FloatType)
    )
  )
  val dataframe = spark.read
    .option("delimiter", "\t")
    .schema(schema)
    .csv("hdfs:///saprk/data/studenttab10k")
  val resultDF = dataframe.where('age > 50)
  // 3. 写入数据, 使用写入表的 API, saveAsTable
  resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark03.student")
  }
Spark访问MySQL
def main(args: Array[String]): Unit = {
  // 1. 创建 SparkSession 对象
  val spark = SparkSession.builder()
    .master("local[6]")
    .appName("mysql write")
    .getOrCreate()
  // 2. 读取数据创建 DataFrame
  //  1. 拷贝文件
  //  2. 读取
  val schema = StructType(
    List(
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("gpa", FloatType)
    )
  )
  val df = spark.read
    .schema(schema)
    .option("delimiter", "\t")
    .csv("dataset/studenttab10k")
  // 3. 处理数据
  val resultDF = df.where("age < 30")
  // 4. 落地数据
  resultDF.write
    .format("jdbc")
    .option("url", "jdbc:mysql://node01:3306/spark02")
    .option("dbtable", "student")
    .option("user", "spark03")
    .option("password", "Spark03!")
    .mode(SaveMode.Overwrite)
    .save()
  }


关注下面的标签,发现更多相似文章