Skip to main content

Spark Notes

info

Spark Notes and Getting Started

1. Spark Overview

Deployment Diagram 部署图

从部署图中可以看到

  • 整个集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。
  • Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。
  • Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 executors。
  • Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。Application 就是用户自己写的 Spark 程序(driver program),比如 WordCount.scala。如果 driver program 在 Master 上运行,比如在 Master 上运行

A simple Spark application

The example here is the GroupByTest application under the examples package in Spark. We assume that the application is launched on the Master node, with the following command:

/* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */

bin/run-example GroupByTest 100 10000 1000 36

GroupByTest 具体代码如下:

package org.apache.spark.examples

import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}

/**
* Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
var numMappers = if (args.length > 0) args(0).toInt else 2
var numKVPairs = if (args.length > 1) args(1).toInt else 1000
var valSize = if (args.length > 2) args(2).toInt else 1000
var numReducers = if (args.length > 3) args(3).toInt else numMappers

val sc = new SparkContext(sparkConf)

val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count()

println(pairs1.groupByKey(numReducers).count())

sc.stop()
}
}

The code runs like the following figure in our brains:

Since this is a simple application, let's estimate the runtime data size in each step:

  1. 初始化 SparkConf()。
  2. 初始化 numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36
  3. 初始化 SparkContext。这一步很重要,是要确立 driver 的地位,里面包含创建 driver 所需的各种 actors 和 objects。
  4. 每个 mapper 生成一个 arr1: Array[(Int, Byte[])],length 为 numKVPairs。每一个 Byte[] 的 length 为 valSize,Int 为随机生成的整数。Size(arr1) = numKVPairs * (4 + valSize) = 10MB,所以 Size(pairs1) = numMappers * Size(arr1) =1000MB 。这里的数值计算结果都是约等于。
  5. 每个 mapper 将产生的 arr1 数组 cache 到内存。
  6. 然后执行一个 action 操作 count(),来统计所有 mapper 中 arr1 中的元素个数,执行结果是 numMappers * numKVPairs = 1,000,000。这一步主要是为了将每个 mapper 产生的 arr1 数组 cache 到内存。
  7. 在已经被 cache 的 paris1 上执行 groupByKey 操作,groupByKey 产生的 reducer (也就是 partition) 个数为 numReducers。理论上,如果 hash(Key) 比较平均的话,每个 reducer 收到的 <Int, Array[Byte]> record 个数为 numMappers * numKVPairs / numReducer = 27,777,大小为 Size(pairs1) / numReducer = 27MB
  8. reducer 将收到的 <Int, Byte[]> records 中拥有相同 Int 的 records 聚在一起,得到 <Int, list(Byte[], Byte[], ..., Byte[])>
  9. 最后 count 将所有 reducer 中 records 个数进行加和,最后结果实际就是 pairs1 中不同的 Int 总个数。

Job Logical Plan 逻辑执行图

Job 的实际执行流程比用户头脑中的要复杂,需要先建立逻辑执行图(或者叫数据依赖图),然后划分逻辑执行图生成 DAG 型的物理执行图,然后生成具体 task 执行。分析一下这个 job 的逻辑执行图:

The function call of RDD.toDebugString can return the logical plan:

  MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions)
ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions)
FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions)
ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions)

需要注意的是 data in the partition 展示的是每个 partition 应该得到的计算结果,并不意味着这些结果都同时存在于内存中。

根据上面的分析可知:

  • 用户首先 init 了一个 0-99 的数组: 0 until numMappers
  • parallelize() 产生最初的 ParrallelCollectionRDD,每个 partition 包含一个整数 i。
  • 执行 RDD 上的 transformation 操作(这里是 flatMap)以后,生成 FlatMappedRDD,其中每个 partition 包含一个 Array[(Int, Array[Byte])]。
  • 第一个 count() 执行时,先在每个 partition 上执行 count,然后执行结果被发送到 driver,最后在 driver 端进行 sum。
  • 由于 FlatMappedRDD 被 cache 到内存,因此这里将里面的 partition 都换了一种颜色表示。 groupByKey 产生了后面两个 RDD,为什么产生这两个在后面章节讨论。
  • 如果 job 需要 shuffle,一般会产生 ShuffledRDD。该 RDD 与前面的 RDD 的关系类似于 Hadoop 中 mapper 输出数据与 reducer 输入数据之间的关系。
  • MapPartitionsRDD 里包含 groupByKey() 的结果。
  • 最后将 MapPartitionsRDD 中的 每个 value(也就是Array[Byte])都转换成 Iterable 类型。
  • 最后的 count 与上一个 count 的执行方式类似。

The logical plan represents the application dataflow, including the data transformations , the intermediate RDDs, and the data dependency between these RDDs. 可以看到逻辑执行图描述的是 Job 的数据流: Job 会经过哪些 transformation(),中间生成哪些 RDD 及 RDD 之间的依赖关系。

Job Physical Plan 物理执行图

逻辑执行图表示的是数据上的依赖关系,不是 task 的执行图。在 Hadoop 中,用户直接面对 task,mapper 和 reducer 的职责分明: 一个进行分块处理,一个进行 aggregate。Hadoop 中 整个数据流是固定的,只需要填充 map()reduce() 函数即可。

Spark 面对的是更复杂的数据处理流程,数据依赖更加灵活,很难将数据流和物理 task 简单地统一在一起。因此 Spark 将数据流和具体 task 的执行流程分开,并设计算法将逻辑执行图转换成 task 物理执行图,转换算法后面的章节讨论。

针对这个 job,我们先画出它的物理执行 DAG 图如下:

可以看到 GroupByTest 这个 application 产生了两个 job,第一个 job 由第一个 action(也就是 pairs1.count)触发产生,分析一下第一个 job:

  • 整个 job 只包含 1 个 stage(不明白什么是 stage 没关系,后面章节会解释,这里只需知道有这样一个概念)。
  • Stage 0 包含 100 个 ResultTask。
  • 每个 task 先计算 flatMap,产生 FlatMappedRDD,然后执行 action() 也就是 count(),统计每个 partition 里 records 的个数,比如 partition 99 里面只含有 9 个 records。
  • 由于 pairs1 被声明要进行 cache,因此在 task 计算得到 FlatMappedRDD 后会将其包含的 partitions 都 cache 到 executor 的内存。
  • task 执行完后,driver 收集每个 task 的执行结果,然后进行 sum()。
  • job 0 结束。

第二个 job 由 pairs1.groupByKey(numReducers).count 触发产生。分析一下该 job:

  • 整个 job 包含 2 个 stage。
  • Stage 1 包含 100 个 ShuffleMapTask,每个 task 负责从 cache 中读取 pairs1 的一部分数据并将其进行类似 Hadoop 中 mapper 所做的 partition,最后将 partition 结果写入本地磁盘。
  • Stage 0 包含 36 个 ResultTask,每个 task 首先 shuffle 自己要处理的数据,边 fetch 数据边进行 aggregate 以及后续的 mapPartitions() 操作,最后进行 count() 计算得到 result。
  • task 执行完后,driver 收集每个 task 的执行结果,然后进行 sum()。
  • job 1 结束。

2. RDD Overview

2.1 RDD Intro

RDD 全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:

  • 一个 RDD 由一个或者多个分区 (Partitions)组成。对于 RDD 来说,每个分区会被一个计算任务所处理,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数;
  • RDD 拥有一个用于计算分区的函数 compute;
  • RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算;
  • Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
  • 一个优先位置列表 (可选),用于存储每个分区的优先位置 (prefered location)。对于一个 HDFS 文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算“的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。

RDD[T] 抽象类的部分相关代码如下:

// 由子类实现以计算给定分区
def compute(split: Partition, context: TaskContext): Iterator[T]

// 获取所有分区
protected def getPartitions: Array[Partition]

// 获取所有依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps

// 获取优先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 分区器 由子类重写以指定它们的分区方式
@transient val partitioner: Option[Partitioner] = None

2.2 RDD Creation

Created from a collection

object CreateRDDFromCollection {

def main(args: Array[String]): Unit = {
// 初始化 SparkSession
val spark = SparkSession.builder()
.appName("Create RDD from Collection")
.master("local[*]") // 本地运行模式,使用所有可用的核心
.getOrCreate()

// 获取 SparkContext
val sc = spark.sparkContext

// 创建一个 Scala 列表
val data = List(1, 2, 3, 4, 5)

// 使用 SparkContext 的 parallelize 方法从集合创建 RDD
val rdd = sc.parallelize(data)

// 对 RDD 进行操作
val rddSquared = rdd.map(x => x * x)

// 打印结果
println("Squared numbers:")
rddSquared.collect().foreach(println)

// 关闭 SparkSession
spark.stop()
}
}

Output:
1
4
9
16
25

Create from external storage systems

val fileRDD = sc.textFile("/usr/file/emp.txt")
// Get the first line of text
fileRDD.take(1)

使用外部存储系统时需要注意以下两点:

  • 如果在集群环境下从本地文件系统读取数据,则要求该文件必须在集群中所有机器上都存在,且路径相同;
  • 支持目录路径,支持压缩文件,支持使用通配符。

textFile & wholeTextFiles

两者都可以用来读取外部文件,但是返回格式是不同的:

textFile: 其返回格式是 RDD[String] ,返回的是就是文件内容,RDD 中每一个元素对应一行数据; wholeTextFiles: 其返回格式是 RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容;

两者都提供第二个参数来控制最小分区数; 从 HDFS 上读取文件时,Spark 会为每个块创建一个分区。

def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}

2.3 RDD Transformation

RDD 支持两种类型的操作: transformations(转换,从现有数据集创建新数据集)和 actions(在数据集上运行计算后将值返回到驱动程序)。RDD 中的所有转换操作都是惰性的,它们只是记住这些转换操作,但不会立即执行,只有遇到 action 操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。

val list = List(1, 2, 3)
// map 是一个 transformations 操作,而 foreach 是一个 actions 操作
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出: 10 20 30

3. RDD Cache

3.1 Cache Level

Spark 速度非常快的一个原因是 RDD 支持缓存。成功缓存后,如果之后的操作使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,但是由于 RDD 之间的依赖关系,如果某个分区的缓存数据丢失,只需要重新计算该分区即可。

Spark 支持多种缓存级别 :

Storage Level
存储级别
Meaning
MEMORY_ONLY默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。
MEMORY_AND_DISK将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在需要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER
将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增加 CPU 的计算负担。仅支持 Java 和 Scala 。
MEMORY_AND_DISK_SER
类似于 MEMORY_ONLY_SER,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。仅支持 Java 和 Scala。
DISK_ONLY只在磁盘上缓存 RDD
MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc
与上面的对应级别功能相同,但是会为每个分区在集群中的两个节点上建立副本。
OFF_HEAPMEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。这需要启用堆外内存。

启动堆外内存需要配置两个参数:

  • spark.memory.offHeap.enabled : 是否开启堆外内存,默认值为 false,需要设置为 true;
  • spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,需要设置为正值。

3.2 To use cache

缓存数据的方法有两个: persistcachecache 内部调用的也是 persist,它是 persist 的特殊化形式,等价于 persist(StorageLevel.MEMORY_ONLY)。示例如下:

// 所有存储级别均定义在 StorageLevel 对象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

3.3 Remove cache

Spark 会自动监视每个节点上的缓存使用情况,并按照最近最少使用(LRU)的规则删除旧数据分区。当然,你也可以使用 RDD.unpersist() 方法进行手动删除。

4. Shuffle Overview

4.1 Shuffle Intro

在 Spark 中,一个任务对应一个分区,通常不会跨分区操作数据。但如果遇到 reduceByKey 等操作,Spark 必须从所有分区读取数据,并查找所有键的所有值,然后汇总在一起以计算每个键的最终结果 ,这称为 Shuffle

4.2 Impact of Shuffle 影响

Shuffle 是一项昂贵的操作,因为它通常会跨节点操作数据,这会涉及磁盘 I/O,网络 I/O,和数据序列化。某些 Shuffle 操作还会消耗大量的堆内存,因为它们使用堆内存来临时存储需要网络传输的数据。Shuffle 还会在磁盘上生成大量中间文件,从 Spark 1.3 开始,这些文件将被保留,直到相应的 RDD 不再使用并进行垃圾回收,这样做是为了避免在计算时重复创建 Shuffle 文件。如果应用程序长期保留对这些 RDD 的引用,则垃圾回收可能在很长一段时间后才会发生,这意味着长时间运行的 Spark 作业可能会占用大量磁盘空间,通常可以使用 spark.local.dir 参数来指定这些临时文件的存储目录。

4.3 导致 Shuffle 的操作

由于 Shuffle 操作对性能的影响比较大,所以需要特别注意使用,以下操作都会导致 Shuffle:

  • 涉及到重新分区操作repartitioncoalesce
  • 所有涉及到 ByKey 的操作: 如 groupByKeyreduceByKey,但 countByKey 除外;
  • 联结操作: 如 cogroupjoin

5. Spark Accumulators and Broadcast 累加器与广播

5.1 Intro

在 Spark 中,提供了两种类型的共享变量: 累加器 (accumulator) 与广播变量 (broadcast variable):

  • 累加器: 用来对信息进行聚合,主要用于累计计数等场景;
  • 广播变量: 主要用于在节点间高效分发大对象。

5.2 Accumulators 累加器

这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期:

var counter = 0
val data = Array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
println(counter)

counter 最后的结果是 0,导致这个问题的主要原因是闭包。

5.2 理解闭包

1. Scala 中闭包的概念

这里先介绍一下 Scala 中关于闭包的概念:

var more = 10
val addMore = (x: Int) => x + more

如上函数 addMore 中有两个变量 x 和 more:

  • x : 是一个绑定变量 (bound variable),因为其是该函数的入参,在函数的上下文中有明确的定义;
  • more : 是一个自由变量 (free variable),因为函数字面量本生并没有给 more 赋予任何含义。

按照定义: 在创建函数时,如果需要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。

2. Spark 中的闭包

在实际计算时,Spark 会将对 RDD 操作分解为 Task,Task 运行在 Worker Node 上。在执行之前,Spark 会对任务进行闭包,如果闭包内涉及到自由变量,则程序会进行拷贝,并将副本变量放在闭包中,之后闭包被序列化并发送给每个执行者。因此,当在 foreach 函数中引用 counter 时,它将不再是 Driver 节点上的 counter,而是闭包中的副本 counter,默认情况下,副本 counter 更新后的值不会回传到 Driver,所以 counter 的最终值仍然为零。

需要注意的是: 在 Local 模式下,有可能执行 foreach 的 Worker Node 与 Diver 处在相同的 JVM,并引用相同的原始 counter,这时候更新可能是正确的,但是在集群模式下一定不正确。所以在遇到此类问题时应优先使用累加器。

累加器的原理实际上很简单: 就是将每个副本变量的最终值传回 Driver,由 Driver 聚合后得到最终值,并更新原始变量。

5.3 Use Accumulators 使用累加器

SparkContext 中定义了所有创建累加器的方法,需要注意的是: 部分的累加器方法在 Spark 2.0.0 之后被标识为废弃。

使用示例和执行结果分别如下:

val data = Array(1, 2, 3, 4, 5)
// 定义累加器
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(data).foreach(x => accum.add(x))
// 获取累加器的值
accum.value

5.4 Broadcast Variable 广播变量

在上面介绍中闭包的过程中我们说道每个 Task 任务的闭包都会持有自由变量的副本,如果变量很大且 Task 任务很多的情况下,这必然会对网络 IO 造成压力,为了解决这个情况,Spark 提供了广播变量。

广播变量的做法很简单: 就是不把副本变量分发到每个 Task 中,而是将其分发到每个 Executor,Executor 中的所有 Task 共享一个副本变量。

// 把一个数组定义为一个广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
// 之后用到该数组时应优先使用广播变量,而不是原值
sc.parallelize(broadcastVar.value).map(_ * 10).collect()

Reference RDD Programming Guide

6. Architecture deep dive

面试 Interview Questions

1. What is Spark driver and Spark executor?

2. How one Spark job is executed internally?

What is Spark driver and Spark executor?

Real-world Case

Offline data batch processing with Hive and Spark 离线批处理

数据收集: 当用户与广告互动(如点击、浏览等)时,这些互动事件会被记录为日志数据。

数据存储: 这些日志数据首先被存储到 HDFS。原因有以下几点:

  • 可扩展性: HDFS 是一个高度可扩展的系统,可以轻松地增加更多的机器来存储更多的数据。
  • 容错性: HDFS 会对数据进行多份副本存储,确保单点故障不会导致数据丢失。
  • 高吞吐量: HDFS 设计用于批处理而非实时处理,适合大量数据的写入和读取。

数据处理与查询: 虽然数据是存储在 HDFS 中,但为了方便地查询和分析这些数据,我们通常会使用 Hive。Hive 的表其实是一个元数据层,它知道数据在 HDFS 中的哪个位置,以及如何读取数据。所以当我们在 Hive 中创建表并加载数据时,数据实际上还是存储在 HDFS 中,Hive 只是为这些数据提供了一个 SQL 查询的界面。

1. Data Preparation 数据准备

User-ad interaction data is collected and stored in raw log files on HDFS after the event occurs. 用户与广告的互动数据在事件发生后被收集并存储到 HDFS 中的原始日志文件里。

2. Hive Preprocessing 数据预处理

  • Run Hive tasks daily, e.g., at 2 AM.
  • Clean data with Hive SQL, removing bad entries. 利用 Hive SQL 进行初步的数据清洗
  • Save cleaned data in a new Hive table for Spark. 清洗后的数据保存到新的 Hive 表中作为离线数据集,为后续 Spark 分析做准备

3. Spark for Offline Data Analysis 离线数据分析

Spark SQL -> Logical Plan -> Physical Plan -> Task -> RDD

  • DAG scheduler (e.g., Apache Airflow) triggers Spark tasks after Hive tasks. DAG 调度器定时触发 Spark 任务
  • Spark reads offline data from the preprocessed table in Hive. 从 Hive 的预处理表中读取离线数据
  • Use Spark for data aggregation and computations, such as calculating ad click-through rates, user conversion rates, etc. 进行数据的聚合和计算,如统计每个广告的点击率、用户的转化率等。
  • Utilize Spark's parallel processing capabilities for efficient large data processing. 并行处理大数据集

4. Data Landing and Updating 数据落地和更新

  • Spark structures the processed data into DataFrame format.
  • The DataFrame data is written to a new Hive table that stores daily aggregated ad metrics. 数据被写入一个新的 Hive 表中,这个表保存每日聚合后的广告指标。

5. Monitoring and Management 监控和管理

  • The DAG scheduler continuously monitors the execution status of Hive and Spark tasks. 持续监控 Hive 和 Spark 的任务执行状态。
  • In case of task errors or failures, the scheduler can log errors, auto-retry, or send alerts to ensure stable data processing. 若任务出错或失败,调度器可以进行错误记录、自动重试或发出报警,确保数据处理的稳定性。
  • Backup all processed data after completion to ensure data integrity and security. 任务完成后备份所有处理后的数据。

Common Problems and Solutions 生产事故

1. Event count drops sharply than previous day 事件计数结果大幅下降

  • Check task logs first to verify job status. 先检查任务日志,确认任务状态。
  • Check the reduction in data volume.
    • If it drops around 20%, it might be a normal case.
    • If it drops around 90%, review recent code/system updates and the status of data sources. 审查最近的代码/系统更新和数据源状态。

2. Job failed 任务失败

  • Insufficient resources like memory, CPU, or storage. 资源不足,例如内存或 CPU。
  • Machine or system issues. 机器或系统问题。
  • Data issues, such as incorrect format or corrupted data. 数据问题,如格式错误或数据损坏。

3. How to ensure that Spark Job result is correct? 任务结果的正确性

  • We have one SQL statement as input
  • We use a SQL parser to parse the SQL onto two engines, Hive and Spark
  • Run the SQL Job on both engines, and compare the results. 在两个引擎上运行 SQL 任务,并比较结果。

Data Skewness 数据倾斜

Definition

Data skewness refers to a scenario in big data processing where a particular task or process is assigned a disproportionately large amount of data. This causes the process to run for an extended period or even fail, leading to prolonged or failed overall computation. In simple terms, data skewness means uneven data distribution among tasks. 数据倾斜是指在大数据处理中,某一任务或进程被分配了大量的数据,导致这个进程运行时间过长或失败,进而使整个计算任务运行时间过长或者失败。简单来说,数据倾斜就是任务处理的数据不均匀。

Types

  • 读倾斜 Read Skew: A task takes too long to read data, typically due to overly large data blocks or issues with the data.某个任务读取数据时花费过多时间,通常是因为数据块太大或数据有问题。

  • 计算倾斜 Calculation Skew: During operations that require sorting or aggregation, processing a particular key takes too long. 在需要排序或聚合的操作中,处理某个键的时间过长。

  • 写倾斜 Write Skew: An operation outputs a vast amount of data, such as a sharp increase after data association, or situations where only one task can operate. 某个操作输出大量数据,如关联后数据急剧增长,或只有一个任务能操作的情况。

Causes of Data Skewness 数据倾斜产生的原因

In distributed big data computation, tasks and data are distributed to various nodes in the cluster based on certain rules. However, these distribution rules might not always be optimal. 大数据分布式计算中,任务和数据会按一定规则分发到集群的各个节点上。但这个分发规则可能并不总是最优的。

  • Unpredictable Data Distribution 数据的分布是不可预测的: The system cannot foresee the exact data distribution before computation. 系统不能在计算前预知数据的具体分布情况。
  • Uncertain Computation Results 计算结果的数量是不确定的: For instance, the number of rows after joining two tables or the result quantity after certain data operations is unpredictable. 例如,两表关联后的数据行数或者某些数据操作后的结果数量都是不可预测的。
  • Single-node Tasks 有些任务只能在一个节点上执行: For operations like global sorting, Limit, etc., they are usually performed on a single node. 例如全局排序、Limit 操作等,这些通常都是在单个节点上完成的。

Solutions

Practice for Write Skewness 写倾斜实践

  • Use collect_set/collect_list functions to aggregate data, reducing row counts while preserving data relationships. 聚合数据,将多行数据转换为一行列表既减少行数又保证数据关系。
  • Utilize explode+lateral view to expand aggregated rows back into detailed rows to meet user requirements. 将聚合的行重新展开为明细行。

Practice for Avoiding sorting 避免排序

  • In big data processing, sorting can lead to disk overflow and memory shortages. 在大数据处理中,排序可能导致磁盘溢出和内存不足。
  • Full sorting is often unnecessary because business usually focuses on result rather than internal processing. An effective method to avoid sorting is by using the max function instead of sorting. 避免排序的一个有效方法是使用 max 函数代替排序。

Practice for Filtering invalid values 过滤非法值

  • Filter out invalid values and nulls. 过滤掉非法值和空值。
  • Prevent memory overflow and excessive computation in certain scenarios. 避免在某些场景下内存溢出和过多计算。