Case Study 篇:介绍大型企业如何使用 StarRocks 在数据湖上实时且灵活的洞察数据的价值,从而帮助业务进行更好的决策,帮助读者进一步理解理论是如何在实际场景落地的。
什么是数据湖
什么是数据湖,根据 Wikipedia 的定义,“A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or files”。通俗来说可以将数据湖理解为在廉价的对象存储或分布式文件系统之上包了一层,使这些存储系统中离散的 object 或者 file 结合在一起对外展现出一个统一的语义,例如关系型数据库常见的“表”语义等。
Rule Based Optimization (RBO) 是传统分析引擎常用的优化策略。RBO 的本质是核心是基于关系代数的等价变换,通过一套预先制定好的规则来变换查询,从而获得代价更低的执行计划。常见的 RBO 规则谓词下推、Limit 下推、常量折叠等。在 RBO 中,有着一套严格的使用规则,只要你按照规则去写查询语句,无论数据表中的内容怎样,生成的执行计划都是固定的。但是在实际的业务环境中,数据的量级会严重影响查询的性能,而 RBO 是没法通过这些信息来获取更优的执行计划。
为了解决 RBO 的局限性,Cost Based Optimization (CBO) 的优化策略应运而生。CBO 通过收集数据的统计信息来估算执行计划的代价,这些统计信息包括数据集的大小,列的数量和列的基数等信息。举个例子,假设我们现在有三张表 A,B 和 C,在进行 A join B join C 的查询时如果没有对应的统计信息我们是无法判断不同 join 的执行顺序代价上的差异。如果我们收集到这三张表的统计信息,发现 A 表和 B 表的数据量都是 1M 行,但是 C 表的 数据量仅为 10 行,那么通过先执行 B join C 可以大大减少中间结果的数据量,这在没有统计信息的情况下基本不可能判断。
随着查询复杂度的增加,执行计划的状态空间会变的非常巨大。刷过算法题的小伙伴都知道,一旦状态空间非常大,通过暴力搜索的方式是不可能 AC 的,这时候一个好的搜索算法格外重要。通常 CBO 使用动态规划算法来得到最优解,并且减少重复计算子空间的代价。当状态空间达到一定程度之后,我们只能选择贪心算法或者其他一些启发式算法来得到局部最优。本质上搜索算法是一种在搜索时间和结果质量做 trade-off 的方法。
next:
for:
row = source.next()
if filterExpr.Eval(row):
// return a new row containing just column o
returnedRow row
for col in selectedCols:
returnedRow.append(row[col])
return returnedRow
根据 DBMSs On A Modern Processor: Where Does Time Go? 的评估,这种执行方式存在大量的 L2 data stalls 和 L1 I-cache stalls、分支预测的效率低等问题。
随着磁盘等硬件技术的蓬勃发展,各种通过 CPU 换 IO 的压缩算法、Encoding 算法和存储技术的广泛使用,CPU 的性能逐渐成为成为分析引擎的瓶颈。为了解决 Row Oriented 执行所存在的问题,学术界开始思考解决方案,Block oriented processing of Relational Database operations in modern Computer Architectures 这篇论文提出使用按 block 的方式在 operator 之间传递数据,能够平摊条件检查和分支预测的工作的耗时,MonetDB/X100: Hyper-Pipelining Query Execution 在此基础上更进一步,提出将通过将数据从原来的 Row Oriented,改变成 Column Oriented,进一步提升 CPU Cache 的效率,也更有利于编译器进行优化。在 Column Oriented 的模型中,执行计划的执行过程可以用如下伪码表示:
// first create an n + 1 result, for all values in the n column
projPlusIntIntConst.Next():
batch = source.Next()
for i < batch.n:
outCol[i] = intCol[i] + constArg
return batch
// then, compare the new column to the m column, putting the result into
// a selection vector: a list of the selected indexes in the column batch
selectLTIntInt.Next():
batch = source.Next()
for i < batch.n:
if int1Col < int2Col:
selectionVector.append(i)
return batch with selectionVector
// finally, we materialize the batch, returning actual rows to the user,
// containing just the columns requested:
materialize.Next():
batch = source.Next()
for s < batch.n:
i = selectionVector[i]
returnedRow row
for col in selectedCols:
returnedRow.append(cols[col][i])
yield returnedRow
可以看到,Column Oriented 拥有更好的数据局部性和指令局部性,有利于提高 CPU Cache 的命中率,并且编译器更容易执行 SIMD 优化等。
Pull Based vs Push Based
数据库系统中,通常是将输入的 SQL 语句转化为一系列的算子,然后生成物理执行计划用于实际的计算并返回结果。在生成的物理执行计划中,通常会对算子进行 pipeline。常见的 pipeline 方式通常有两种:
基于数据驱动的 Push Based 模式,上游算子推送数据到下游算子
基于需求的 Pull Based 模式,下游算子主动从上游算子拉取数据。经典的火山模型就是 Pull Based 模式。
Push Based 的执行模式提高了缓存效率,能够更好地提升查询性能。
参考:Push vs. Pull-Based Loop Fusion in Query Engines
如上图所示,StarRocks 的架构非常简洁,整个系统的核心只有 Frontend (FE)、Backend (BE) 两类进程,不依赖任何外部组件,方便部署与维护。其中 FE 主要负责解析查询语句(SQL),优化查询以及查询的调度,而 BE 则主要负责从数据湖中读取数据,并完成一系列的 Filter 和 Aggregate 等操作。 Frontend
FE 的主要作用将 SQL 语句通过一系列转化和优化,最终转换成 BE 能够认识的一个个 Fragment。一个不那么准确但易于理解的比喻,如果把 BE 集群当成一个分布式的线程池的话,那么 Fragment 就是线程池中的 Task。从 SQL 文本到 Fragment,FE 的主要工作包含以下几个步骤: