Motivation

  1. Hadoop是一个比较基础的语言,需要一个高级语言
  2. Hadoop每次map/map+reduce之后需要将数据存储到hdfs中,io开销过大
  3. 此外,spark还支持更多复杂的、多步骤的数据处理(如机器学习)以及实时流数据分析

RDD

RDD,全称为Resilient Distributed Dataset(弹性分布式数据集),是Spark中的一个非常重要的概念。RDD具有以下几个关键特性:

  1. 不可变性(Immutability):RDD是不可变的,一旦创建便不能更改。
  2. 分区(Partitioned):RDD是分区的,数据存储在多个分区中以实现并行处理。
  3. 延迟计算(Lazy Evaluation):RDD的转换操作是延迟执行的,只有当执行Action操作时,才会触发实际的计算。
  4. 依赖关系(Lineage):Spark通过记录RDD的依赖关系的方式来跟踪其生成过程,当数据丢失时可以通过血统重新计算丢失的分区。
image-20250421173238679

RDD是

  • Set of Partitions(分区集合)
  • List of Dependencies on Parent RDDs:RDD 是通过转换操作(如 map、filter 等)从其他 RDD 生成的,每个 RDD 记录了它从哪些父 RDD(Parent RDDs)派生而来,这种关系称为依赖关系(Dependencies)
  • Function to Compute a Partition Given Its Parents:每个 RDD 都定义了一个函数,用于计算其分区的数据。这个函数通过依赖关系从父 RDD 的分区中获取数据,并进行操作(如 map、filter 等)
  • ** (Optional) Partitioner**
  • (Optional) Preferred Locations for Each Partition:RDD 可以指定每个分区的优选位置(Preferred Locations),即分区的最佳计算节点。这通常用于数据本地化(Data Locality)。

RDD的操作可以分为两类:

  • 转换操作(Transformations):如mapflatMapreduceByKey等,这些操作定义了RDD的转换逻辑,但不会立即执行。
  • 行动操作(Actions):如reducesaveAsTextFile等,这些操作会触发实际的计算。

operation可以分为以下几类:

  • Map-like operations
  • Reduce-like operations
  • Sort operations
  • Join-like operations
  • Set-ish operations

Spark结构

与MapReduce类似,spark需要一个driver与cluster manager(类似NameNode)进行通信,每个机器上有一个spark executor用于执行程序。

Spark Application是一个基于Spark API构建的用户程序,用于在Spark集群或环境中运行

Spark Driver是Spark Application的核心组件,提供接口和API以供应用程序使用

Spark Session是与Spark互动的入口点,它提供了一个统一的接口来访问所有底层的Spark功能

image-20250421174502741

Spark lazy evaluation

Transformation不会进行计算,直到action

如果有多个都是narrow dependencies,那么可以把这些合并成一个phase,pipeline方式运行,直到有wide dependencies.

image-20250422011535700

schedule optimization

image-20250517200633800

根据wide dependency划分stage,narrow dependency部分可以使用pipline进行加速。

hash shuffle

image-20250517202230307

每个mapper会根据hash生成R(reducer)数量个output file

Lineage

word RDD:

1
2
text_file = sc.textFile(“hdfs://...”)
words = text_file.flatMap(lambda line: line.split(" "))

dependencies: one-to-one

join RDD

1
2
3
rdd1 = sc.parallelize([(“foo”, 1), (“bar”, 2), (“baz”, 3)])
rdd2 = sc.parallelize([(“foo”, 4), (“bar”, 5), (“baz”, 6)])
joinedRDD = rdd1.join(rdd2)

dependencies: all-to-all

pySpark

初始化实例

1
2
3
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("<you_app_name>").setMaster("local[*]")
sc = SparkContext(conf=conf)
  • SparkConf:用于配置Spark应用程序的参数(如应用名称、运行模式等)。
  • SparkContext:Spark功能的入口点,负责与集群通信、创建RDD(弹性分布式数据集)等。
  • .setAppName(""):设置应用程序的名称(在Spark UI和日志中显示)。
  • .setMaster("local[*]"):指定运行模式为本地模式(local),[*]表示使用所有可用的CPU核心

读取文件

sc.textFile用于从文件系统(如本地文件系统、HDFS、S3等)读取文本文件,并将其转换为一个RDD

查看rdd相关信息

rdd.count():返回 RDD 中元素的总数

rdd.first():返回 RDD 的第一个元素

rdd.collect():返回一个包含所有元素的列表

rdd.getNumPartitions():返回 RDD 的分区数量

rdd.glom():将每个分区的数据以列表的形式返回

tranformation

map like

rdd.filter(f):对 RDD 的每个元素应用函数 f,保留函数返回 True 的元素,过滤掉返回 False 的元素

1
2
3
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.filter(lambda x: x % 2 == 0) # 过滤偶数
# 输出:[2, 4]

rdd.map(f):对 RDD 的每个元素应用函数 f,将函数返回值作为新 RDD 的元素。

1
2
3
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.map(lambda x: x * 2) # 每个元素乘以 2
# 输出:[2, 4, 6, 8, 10]

rdd.flatMap(f)

image-20250519102306909

map是one-to-one mapping

flatMap是 one-to-many mapping

rdd.distinct():去除重复的元素

1
2
3
rdd = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
result = rdd.distinct() # 去除重复元素
# 输出:[1, 2, 3, 4, 5]

rdd1.union(rdd2):返回两个 RDD 的并集,将两个 RDD 的所有元素组合到一个 RDD 中

1
2
3
4
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)
# 输出:[1, 2, 3, 3, 4, 5]

rdd1.cartesian(rdd2): 返回两个 RDD 的笛卡尔积

1
2
3
4
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
result = rdd1.cartesian(rdd2)
# 输出:[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]

rdd1.zip(rdd2):将两个 RDD 按位置进行配对将两个 RDD 按位置进行配对(要求两个 RDD 的长度相同,否则会报错)

1
2
3
4
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(["a", "b", "c"])
result = rdd1.zip(rdd2)
# 输出:[(1, 'a'), (2, 'b'), (3, 'c')]

rdd.zipWithIndex():将 RDD 的每个元素与其索引(从 0 开始)配对,返回一个新的 RDD,每个元素是 (element, index) 的二元组

1
2
3
rdd = sc.parallelize(["a", "b", "c"])
result = rdd.zipWithIndex()
# 输出:[('a', 0), ('b', 1), ('c', 2)]

rdd.zipWithUniqueId():将 RDD 的每个元素与一个唯一 ID 配对,返回一个新的 RDD,每个元素是 (element, unique_id) 的二元组。(Items in the kth partition will get ids k, n+k, 2*n+k, …, where n is the number of partitions.)

1
2
3
4
rdd = sc.parallelize(["a", "b", "c", "d", "e"], numSlices=2)
# [["a", "b", "c"],["d", "e"]]
result = repartitioned_rdd.zipWithUniqueId()
# [[("a",0), ("b",2), ("c",4)],[("d",1), ("e",3)]]

reduce like

rdd.reduceByKey(func):对元素为 (key, value) 形式的 RDD,根据 key 进行分组,并用函数 func 对每组的 value 进行聚合,返回一个新的 (key, reduced_value) 形式的 RDD。

1
2
3
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)])
result = rdd.reduceByKey(lambda x, y: x + y) # 对每个 key 的 value 求和
# 输出:[('a', 4), ('b', 6)]

rdd.sortByKey([ascending=True]):对元素为 (key, value) 形式的 RDD,按照 key 进行排序,返回排序后的新 RDD。

1
2
3
rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)])
result = rdd.sortByKey() # 默认按 key 升序排序
# 输出:[('a', 1), ('b', 2), ('c', 3)]

rdd.groupByKey():对元素为 (key, value) 形式的 RDD,根据 key 进行分组,把相同 key 的所有 value 收集到一起,返回新的 RDD,每个元素形式为 (key, Iterable[value])

1
2
3
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)])
result = rdd.groupByKey()
# 输出:[('a', [1, 3]), ('b', [2, 4])]

rdd1.intersection(rdd2):返回两个 RDD 的交集

1
2
3
4
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.intersection(rdd2)
# 输出:[3]

rdd1.substract(rdd2):返回一个新的 RDD,该 RDD 包含 rdd1 中所有不在 rdd2 中的元素(即 rdd1 减去 rdd2

1
2
3
4
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([3, 4, 6])
result = rdd1.subtract(rdd2) # 返回 rdd1 减去 rdd2 的元素
# 输出:[1, 2, 5]

Actions

reduce 是 Spark RDD 的一种 聚合(Aggregation)操作,用于将整个 RDD 的所有元素使用给定的函数聚合成一个最终结果

1
2
3
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.reduce(lambda a, b: a + b) # 求和
# result = 15

aggregate(zeroValue, seqOp, combOp)zeroValue:初始值。seqOp:分区内的聚合函数。combOp:各分区间合并结果的聚合函数。

1
2
3
4
5
6
7
rdd = sc.parallelize([1, 2, 3, 2, 4, 1, 5], numSlices=3)
result = rdd.aggregate(
set(), # 初始值是空集合
lambda acc, x: acc | {x}, # seqOp:分区内,把元素加入集合
lambda acc1, acc2: acc1 | acc2 # combOp:分区间,把集合合并
)
# {1, 2, 3, 4, 5}

fold(zeroValue, func)aggregate 的简化版,两步合并使用同样的函数

1
2
rdd = sc.parallelize([1, 2, 3])
rdd.fold(0, lambda x, y: x + y) # 结果 6

broadcast

Accumulator 是 Spark 中的一种共享变量,主要用于聚合信息,如计数、求和等。它们只支持添加操作,在 worker 端进行累加,driver 端可以读取最终结果。常见于采集调试信息、统计日志条数、总数等。任务失败后可能会重复累加,所以 accumulator 一般只用于监控、调试,不建议用作业务结果。

1
2
3
4
5
6
7
8
9
acc = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])
def f(x):
acc.add(x)
return x * 2
rdd = rdd.map(f)
print(acc.value) # 输出0
result = rdd.collect()
print(acc.value) # 输出10

Broadcast 用于在分布式计算中高效地向所有 worker 节点分发只读数据(如大字典、配置参数、查找表等),避免每个 task 都拷贝一份,节省内存、网络和带宽。

1
2
3
4
5
6
7
8
9
broadcasts = sc.broadcast([1, 2, 3])
def f(x):
if x in broadcasts.value:
return True
else:
return False
rdd = sc.parallelize([2, 5, 7, 1])
result = rdd.filter(f).collect()
print(result) # 输出[2, 1]

Example: find prime numbers

使用的方法是减去合数的方法。对于比如说要找[1,100]之间的所有质数,先求出这区间所有的合数,然后再用所有数的集合和合数的集合相减,得到质数。

草稿纸.jpeg
1
2
3
4
5
6
7
8
9
from pyspark import SparkContext

sc = SparkContext("local", "Find Prime Numbers")

n = 100
all_numbers = sc.parallelize(range(2, n + 1), 8).cache()
composite_numbers = all_numbers.flatMap(lambda x: [i for i in range(2 * x, n + 1, x)])
composite_numbers = composite_numbers.repartition(8)
prime_numbers = all_numbers.subtract(composite_numbers)

without partition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Partition 0: 203 : [4, 6, 8, ..., 100,
6, 9, ..., 99,
8, 12, ..., 100,
10, 15, ..., 100,
12, 18, ..., 96,
14, 21, ..., 98,
16, 24, ..., 96,
18, 27, ..., 99,
20, 30, ..., 100,
22, 33, ..., 99,
24, 36, ..., 96,
26, 39, ..., 91]
Partition 1: 47 : [28, 42, ..., 100]
Partition 2: 21 : [52, ..., 76]
Partition 3: 12 : [78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
Partition 4: 0 : []
Partition 5: 0 : []
Partition 6: 0 : []
Partition 7: 0 : []

with partition:

1
2
3
4
5
6
7
8
Partition 0: 30 : [39, ..., 90]
Partition 1: 30 : [69, ..., 74]
Partition 2: 41 : [4, ..., 76]
Partition 3: 40 : [24, ..., 85]
Partition 4: 40 : [44, ..., 60]
Partition 5: 40 : [64, ..., 69]
Partition 6: 40 : [84, ..., 96]
Partition 7: 22 : [9, ..., 100]

result

1
[[], [17, 97], [2], [3, 19, 67, 83], [], [5, 37, 53], [], [7, 23, 71], [], [41, 73, 89], [], [11, 43, 59], [], [13, 29, 61], [], [31, 47, 79]]

这里结果分区是按照x mod num_partition来分区的。

  • all_numbers.subtract(composite_numbers)默认使用defaultPartitioner决定结果RDD的分区数。

  • defaultPartitioner的规则为:取spark.default.parallelism值与父RDD最大分区数中的较大者。

  • 本机有16核, 所以分成了16个partition。

  • 如果将spark.default.parallelism设置为更小或更大的数,比如12或20可以观察到分区的变化

    1
    2
    3
    [[], [13, 37, 61, 73, 97], [2], [3], [], [5, 17, 29, 41, 53, 89], [], [7, 19, 31, 43, 67, 79], [], [], [], [11, 23, 47, 59, 71, 83]]

    [[], [41, 61], [2], [3, 23, 43, 83], [], [5], [], [7, 47, 67], [], [29, 89], [], [11, 31, 71], [], [13, 53, 73], [], [], [], [17, 37, 97], [], [19, 59, 79]]

Partition

properties of partition

  • Partition不会跨机器,一个Partition只会在一台机子上
  • 每台机器可能包含一个或多个Partition(比如有多核)
  • 用多少Partition是可以配置的,默认等于核心数量

如果一个transformation操作是在key-value pair上完成的且是窄依赖(mapValues,flatMapValues,filter,filterByKey),如果parent RDD有partitioner,那么一般会自动继承。但是像mapflatmap之类的不会,因为这些会改变key value pairs.

对于reduce like operator,会生成新的partitioner,例如sortByKey一般默认使用RangePartitionergroupByKey默认使用HashPartitioner(这主要是因为sortByKey的结果是有序的)。

Example: Hash Partitioner and Range Partitioner

首先我们创建一组数据,并观察其partitionerpartition

1
2
3
4
5
6
7
8
9
sc = SparkContext("local", "Partitioner Example")
data = [8, 8, 1, 96, 240, 400, 1, 800, 4, 12]
rdd = sc.parallelize(zip(data, data), 4)
print(rdd.collect())
>>[(8, 8), (8, 8), (1, 1), (96, 96), (240, 240), (400, 400), (1, 1), (800, 800), (4, 4), (12, 12)]
print(rdd.partitioner)
>>None
print(rdd.glom().collect())
>>[[(8, 8), (8, 8)], [(1, 1), (96, 96)], [(240, 240), (400, 400)], [(1, 1), (800, 800), (4, 4), (12, 12)]]

partitioner不是必须的。partitioner 只对 Key-Value 形式(Pair RDD)起作用,不适用于普通的仅值(value)RDD。

在一开始的这个数据中,没有key value pair,所以没有partitioner,但是partition是有的,这个partition是Spark框架自动分配的以保证work balance

然后我们进行一次map操作,再观察partitionerpartition

1
2
3
4
5
rdd = rdd.map(lambda t: (t[0], t[1] + 1))
print(rdd.partitioner)
>> None
print(rdd.glom().collect())
>>[[(8, 9), (8, 9)], [(1, 2), (96, 97)], [(240, 241), (400, 401)], [(1, 2), (800, 801), (4, 5), (12, 13)]]

原因同上。最后我们进行一次reduceByKey

1
2
3
4
5
6
7
rdd = rdd.reduceByKey(lambda x, y: x + y)
print(rdd.glom().collect())
>>[[(8, 18), (96, 97), (240, 241), (400, 401), (800, 801), (4, 5), (12, 13)], [(1, 4)], [], []]
print(rdd.partitioner)
>><pyspark.rdd.Partitioner object at 0x000002E55BE72A90>
print(rdd.partitioner.partitionFunc)
>><function portable_hash at 0x000002E55B660220>

可以看到,进行reduceByKey后,出现了partitioner,默认使用的是hash function作为partitioner

Range Partitioner

1
2
3
4
5
rdd1 = rdd.map(lambda x: (x[0], x[1] + 1))
print(rdd1.glom().collect())
>>[[(8, 19), (96, 98), (240, 242), (400, 402), (800, 802), (4, 6), (12, 14)], [(1, 5)], [], []]
print(rdd1.partitioner)
>>None

注意到经过map之后partitioner就无了

1
2
3
rdd2 = rdd.mapValues(lambda x: x + 1)
print(rdd2.partitioner.partitionFunc)
>><function portable_hash at 0x000002E55B660220>

但是如果是mapValues的话就还在,并且根据地址,是其parent的partition function

1
2
3
4
5
rdd = rdd.sortByKey()
print(rdd.glom().collect())
>>[[(1, 4), (4, 5), (8, 18)], [(12, 13), (96, 97)], [(240, 241), (400, 401)], [(800, 801)]]
print(rdd.partitioner.partitionFunc)
>><function RDD.sortByKey.<locals>.rangePartitioner at 0x000002E55BE17100>

可以看到经过RangePartition之后是按顺序排列的

1
2
3
rdd3 = rdd.mapValues(lambda x: x + 1)
print(rdd3.partitioner.partitionFunc)
>><function RDD.sortByKey.<locals>.rangePartitioner at 0x000002E55BE17100>

再次经过mapValues,发现partitionFunc依然继承了parent rdd(RangePartitioner)

Example: custom partitioner

1
2
3
4
data1 = list(range(0, n, 16)) + list(range(0, n, 16))
>> [0,16,32,...,0,16,32,...]
data2 = range(0, n, 8)
>> [0,8,16,...]
1
2
#这个函数的作用是生成每个partition中元素的数量
def partitionsize(it): yield len(list(it))
1
2
3
4
5
rdd1 = sc.parallelize(zip(data1, data2), 16)
print(rdd1.mapPartitions(partitionsize).collect())
>>[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 5]
print(rdd1.glom().collect())
[[(0, 0), (16, 8), (32, 16)], ..., [(320, 360), (336, 368), (352, 376), (368, 384), (384, 392)]]
1
2
3
4
5
6
7
8
9
10
def f(x):
return x % 17

rdd2 = rdd1.partitionBy(16, f)
print(rdd2.mapPartitions(partitionsize).collect())
>>[8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4]
print(rdd2.glom().collect())
>>[[(0, 0), (16, 8), (272, 136), (288, 144), (0, 200), (16, 208), (272, 336), (288, 344)], ...,[(32, 16), (304, 152), (32, 216), (304, 352)]]
print(rdd2.partitioner.partitionFunc)
>><function f at 0x00000256815A7600>

可以看到,mod 17 余数为0的和余数为16的元素共享了第一个partition,而最后一个partition 是mod 17 余数为15的。

Example: narrow depencency

一些narrw dependency 的例子:

map操作不会改变partition

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Narrow Dependency Operation")

# MAP
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8],8)
mapped_rdd = data.map(lambda x: x * 2)
print(data.glom().collect())
print(mapped_rdd.glom().collect())
>>[[1], [2], [3], [4], [5], [6], [7], [8]]
>>[[2], [4], [6], [8], [10], [12], [14], [16]]

union不会改变partition

1
2
3
4
5
6
7
8
9
10
# UNION
rdd1 = sc.parallelize([1, 2, 3, 4], 4)
rdd2 = sc.parallelize([4, 5, 6, 8], 4)
union_rdd = rdd1.union(rdd2)
print(rdd1.glom().collect())
print(rdd2.glom().collect())
print(union_rdd.glom().collect())
>>[[1], [2], [3], [4]]
>>[[4], [5], [6], [8]]
>>[[1], [2], [3], [4], [4], [5], [6], [8]]

使用partitionby预先分区,就可以在join的时候变成窄依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
rdd1 = sc.parallelize([(1, "A"), (2, "B"), (3, "C")], 3)
rdd2 = sc.parallelize([(1, "X"), (2, "Y"), (3, "Z")], 3)
# Ensure both RDDs are hash-partitioned with the same partitioner
partitioned_rdd1 = rdd1.partitionBy(3)
partitioned_rdd2 = rdd2.partitionBy(3)
# Perform join (inputs are co-partitioned)
joined_rdd = partitioned_rdd1.join(partitioned_rdd2)
# Collect and print the results
print(rdd1.glom().collect())
print(rdd2.glom().collect())
print(partitioned_rdd1.glom().collect())
print(partitioned_rdd2.glom().collect())
print("Join with inputs co-partitioned:")
print(joined_rdd.glom().collect())
>>[[(1, 'A')], [(2, 'B')], [(3, 'C')]]
>>[[(1, 'X')], [(2, 'Y')], [(3, 'Z')]]
>>[[(3, 'C')], [(1, 'A')], [(2, 'B')]]
>>[[(3, 'Z')], [(1, 'X')], [(2, 'Y')]]
>>[[(3, ('C', 'Z'))], [(1, ('A', 'X'))], [(2, ('B', 'Y'))]]

Graph Algorithm

Page Rank

image-20250519120135997

BFS

image-20250519120104422

image-20250519120048297