绝代码农 发表于 2021-7-21 16:02:27

Spark学习--Structured Streaming

Structured StreamingStructured Streaming 是 Spark Streaming 的进化版

Spark 编程模型的进化过程

总结
RDD 的优点

[*]面向对象的操作方式
[*]可以处理任何类型的数据
RDD 的缺点

[*]运行速度比较慢, 执行过程没有优化
[*]API 比较僵硬, 对结构化数据的访问和操作没有优化
DataFrame 的优点

[*]针对结构化数据高度优化, 可以通过列名访问和转换数据
[*]增加 Catalyst 优化器, 执行过程是优化的, 避免了因为开发者的原因影响效率
DataFrame 的缺点

[*]只能操作结构化数据
[*]只有无类型的 API, 也就是只能针对列和 SQL 操作数据, API 依然僵硬
Dataset 的优点

[*]结合了 RDD 和 DataFrame 的 API, 既可以操作结构化数据, 也可以操作非结构化数据
[*]既有有类型的 API 也有无类型的 API, 灵活选择

Spark 的 序列化 的进化过程
总结

[*]当需要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫做序列化和反序列化
[*]在 Spark 中有很多场景需要存储对象, 或者在网络中传输对象

[*]Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行
[*]缓存 RDD 的时候, 需要保存 RDD 中的数据
[*]广播变量的时候, 需要将变量序列化, 在集群中广播
[*]RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据
[*]算子中如果引入了外部的变量, 这个外部的变量也需要被序列化

[*]RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器
[*]Dataset 和 DataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

Spark Streaming和Structured Streaming区别

[*]Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步
[*]Structured Streaming 已经支持了连续流模型, 也就是类似于 Flink 那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式

Structured Streaming 案例
需求


[*]编写一个流式计算的应用, 不断的接收外部系统的消息
[*]对消息中的单词进行词频统计
[*]统计全局的结果
具体实现
1、开启Socket server并运行:nc -lk 9999 然后输入数据 

2、运行代码
package sparkStreaming

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object SocketWordCount {

def main(args: Array): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .master("local")
      .appName("socket_structured")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    // 2. 数据集的生成, 数据读取
    val source: DataFrame = spark.readStream
      .format("socket")
      .option("host", "192.168.47.100")
      .option("port", 9999)
      .load()

    val sourceDS: Dataset = source.as

    // 3. 数据的处理
    val words = sourceDS.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey(_._1)
      .count()

    // 4. 结果集的生成和输出
    words.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
}
}
3、结果集

从结果集中可以观察到以下内容

[*]Structured Streaming 依然是小批量的流处理
[*]Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
[*]从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样
 总结

[*]Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地
[*]Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset
[*]Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream
[*]Structured Streaming 和 SparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 read 和 write

Dataset和流式计算
可以理解为 Spark 中的 Dataset 有两种, 一种是处理静态批量数据的 Dataset, 一种是处理动态实时流的 Dataset, 这两种 Dataset 之间的区别如下

[*]流式的 Dataset 使用 readStream 读取外部数据源创建, 使用 writeStream 写入外部存储
[*]批式的 Dataset 使用 read 读取外部数据源创建, 使用 write 写入外部存储

从 HDFS 中读取数据
案例流程


[*]利用py产生文件源源不断向hdfs上传文件
[*]编写 Structured Streaming 程序处理数据
python代码以及spark处理流式代码
import os

for index in range(100):
    content = """
    {"name": "Michael"}
    {"name": "Andy", "age": 30}
    {"name": "Justin", "age": 19}
    """

    file_name = "/data/spark/test/text{0}.json".format(index)

    with open(file_name, "w") as file:
      file.write(content)

    os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -mkdir -p /spark/dataset/")
    os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -put {0} /spark/dataset/".format(file_name))package sparkStreaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

object HDFSSource {

def main(args: Array): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("hdfs_source")
      .master("local")
      .getOrCreate()

    // 2. 数据读取, 目录只能是文件夹, 不能是某一个文件
    val schema = new StructType()
      .add("name", "string")
      .add("age", "integer")

    val source = spark.readStream
      .schema(schema)
      .json("hdfs://node01:8020/spark/dataset")

    // 3. 输出结果
    source.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
}
}总结

[*]Python 生成文件到 HDFS, 这一步在真实环境下, 可能是由 Flume 和 Sqoop 收集并上传至 HDFS
[*]Structured Streaming 从 HDFS 中读取数据并处理
[*]Structured Streaming 讲结果表展示在控制台



文档来源:51CTO技术博客https://blog.51cto.com/u_15307704/3134927
页: [1]
查看完整版本: Spark学习--Structured Streaming