评论

收藏

[NoSQL] Spark学习--Structured Streaming

数据库 数据库 发布于:2021-07-21 16:02 | 阅读数:448 | 评论:0

Structured StreamingStructured Streaming 是 Spark Streaming 的进化版

Spark 编程模型的进化过程
DSC0000.png

总结
RDD 的优点

  • 面向对象的操作方式
  • 可以处理任何类型的数据
RDD 的缺点

  • 运行速度比较慢, 执行过程没有优化
  • API 比较僵硬, 对结构化数据的访问和操作没有优化
DataFrame 的优点

  • 针对结构化数据高度优化, 可以通过列名访问和转换数据
  • 增加 Catalyst 优化器, 执行过程是优化的, 避免了因为开发者的原因影响效率
DataFrame 的缺点

  • 只能操作结构化数据
  • 只有无类型的 API, 也就是只能针对列和 SQL 操作数据, API 依然僵硬
Dataset 的优点

  • 结合了 RDD 和 DataFrame 的 API, 既可以操作结构化数据, 也可以操作非结构化数据
  • 既有有类型的 API 也有无类型的 API, 灵活选择

Spark 的 序列化 的进化过程
总结

  • 当需要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫做序列化和反序列化
  • 在 Spark 中有很多场景需要存储对象, 或者在网络中传输对象

    • Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行
    • 缓存 RDD 的时候, 需要保存 RDD 中的数据
    • 广播变量的时候, 需要将变量序列化, 在集群中广播
    • RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据
    • 算子中如果引入了外部的变量, 这个外部的变量也需要被序列化

  • RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器
  • Dataset 和 DataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

Spark Streaming和Structured Streaming区别

  • Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步
  • Structured Streaming 已经支持了连续流模型, 也就是类似于 Flink 那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式

Structured Streaming 案例
需求
DSC0001.png

  • 编写一个流式计算的应用, 不断的接收外部系统的消息
  • 对消息中的单词进行词频统计
  • 统计全局的结果
具体实现
1、开启Socket server并运行:nc -lk 9999 然后输入数据 
DSC0002.png
2、运行代码
package sparkStreaming
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SocketWordCount {
  def main(args: Array[String]): Unit = {
  // 1. 创建 SparkSession
  val spark = SparkSession.builder()
    .master("local[6]")
    .appName("socket_structured")
    .getOrCreate()
  spark.sparkContext.setLogLevel("WARN")
  import spark.implicits._
  // 2. 数据集的生成, 数据读取
  val source: DataFrame = spark.readStream
    .format("socket")
    .option("host", "192.168.47.100")
    .option("port", 9999)
    .load()
  val sourceDS: Dataset[String] = source.as[String]
  // 3. 数据的处理
  val words = sourceDS.flatMap(_.split(" "))
    .map((_, 1))
    .groupByKey(_._1)
    .count()
  // 4. 结果集的生成和输出
  words.writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .start()
    .awaitTermination()
  }
}
3、结果集
DSC0003.png
从结果集中可以观察到以下内容

  • Structured Streaming 依然是小批量的流处理
  • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
  • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样
 总结

  • Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地
  • Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset
  • Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream
  • Structured Streaming 和 SparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 read 和 write

Dataset和流式计算
可以理解为 Spark 中的 Dataset 有两种, 一种是处理静态批量数据的 Dataset, 一种是处理动态实时流的 Dataset, 这两种 Dataset 之间的区别如下

  • 流式的 Dataset 使用 readStream 读取外部数据源创建, 使用 writeStream 写入外部存储
  • 批式的 Dataset 使用 read 读取外部数据源创建, 使用 write 写入外部存储

从 HDFS 中读取数据
案例流程
DSC0004.png


  • 利用py产生文件源源不断向hdfs上传文件
  • 编写 Structured Streaming 程序处理数据
python代码以及spark处理流式代码
import os
for index in range(100):
  content = """
  {"name": "Michael"}
  {"name": "Andy", "age": 30}
  {"name": "Justin", "age": 19}
  """
  file_name = "/data/spark/test/text{0}.json".format(index)
  with open(file_name, "w") as file:
    file.write(content)
  os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -mkdir -p /spark/dataset/")
  os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -put {0} /spark/dataset/".format(file_name))
package sparkStreaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
object HDFSSource {
  def main(args: Array[String]): Unit = {
  // 1. 创建 SparkSession
  val spark = SparkSession.builder()
    .appName("hdfs_source")
    .master("local[6]")
    .getOrCreate()
  // 2. 数据读取, 目录只能是文件夹, 不能是某一个文件
  val schema = new StructType()
    .add("name", "string")
    .add("age", "integer")
  val source = spark.readStream
    .schema(schema)
    .json("hdfs://node01:8020/spark/dataset")
  // 3. 输出结果
  source.writeStream
    .outputMode(OutputMode.Append())
    .format("console")
    .start()
    .awaitTermination()
  }
}
总结

  • Python 生成文件到 HDFS, 这一步在真实环境下, 可能是由 Flume 和 Sqoop 收集并上传至 HDFS
  • Structured Streaming 从 HDFS 中读取数据并处理
  • Structured Streaming 讲结果表展示在控制台



关注下面的标签,发现更多相似文章