SparkSQL
发展过程
解决的问题
- Spark SQL 使用 Hive 解析 SQL 生成 AST 语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖 Hive
- 执行计划和优化交给优化器 Catalyst
- 内建了一套简单的 SQL 解析器, 可以不使用 HQL, 此外, 还引入和 DataFrame 这样的 DSL API, 完全可以不依赖任何 Hive 的组件
- Shark 只能查询文件, Spark SQL 可以直接降查询作用于 RDD, 这一点是一个大进步
适用场景
定义特点举例结构化数据
有固定的 Schema
有预定义的 Schema
关系型数据库的表
半结构化数据
没有固定的 Schema, 但是有结构
没有固定的 Schema, 有结构信息, 数据一般是自描述的
指一些有结构的文件格式, 例如 JSON
非结构化数据
没有固定 Schema, 也没有结构
没有固定 Schema, 也没有结构
指文档图片之类的格式
- Spark 的 RDD 主要用于处理 非结构化数据 和 半结构化数据
- SparkSQL 主要用于处理 结构化数据
SparkSession
SparkContext在读取文件的时候,读取出来的是 RDD, 不包含 Schema(结构化)信息。所以出现了SparkSession作为SparkSQL的入口点,包括了 SQLContext, HiveContext, SparkContext 等组件的功能val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
Catalyst优化器
SparkSQL 大部分情况用于处理结构化数据和半结构化数据, 所以 SparkSQL 可以获知数据的 Schema, 从而根据其 Schema 来进行优化
SparkSQL整体架构
- API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句
- 收到 SQL 语句以后, 将其交给 Catalyst, Catalyst 负责解析 SQL, 生成执行计划等
- Catalyst 的输出应该是 RDD 的执行计划
- 最终交由集群运行
简单优化过程
Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行
Dataset
- Dataset 是一个新的 Spark 组件, 其底层还是 RDD
- Dataset 提供了访问对象中某个特定字段的能力, 不用像 RDD 一样每次都要针对整个对象做操作
- Dataset 和 RDD 不同, 如果想把 Dataset[T] 转为 RDD[T], 则需要对 Dataset 底层的 InternalRow 做转换, 是一个比价重量级的操作
DataFrame
- DataFrame 是一个类似于关系型数据库表的函数式组件
- DataFrame 一般处理结构化数据和半结构化数据
- DataFrame 具有数据对象的 Schema 信息
- 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame
- DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
Dataset 和 DataFrame 的异同
- DataFrame 是 Dataset 的一种特殊情况, 也就是说 DataFrame 是 Dataset[Row] 的别名。
- DataFrame 和 Dataset 所表达的语义不同
- DataFrame 表达的含义是一个支持函数式操作的 表, 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象
- DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象
- DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同
- DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查
代码解析:
def dataframe4(): Unit = {
val spark = SparkSession.builder()
.appName("dataframe1")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
// DataFrame 是弱类型的
val df: DataFrame = personList.toDF()
df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
.show()
// DataFrame 所代表的弱类型操作是编译时不安全
// df.groupBy("name, school")
// Dataset 是强类型的
val ds: Dataset[Person] = personList.toDS()
ds.map( (person: Person) => Person(person.name, person.age * 2) )
.show()
// Dataset 所代表的操作, 是类型安全的, 编译时安全的
// ds.filter( person => person.school )
}
|