Spark Core

Spark RDD详解

Posted by leone on 2018-08-09

Spark

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。

Spark Core

RDD

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及调用 RDD 操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象, 甚至可以包含用户自定义的对象。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。RDD支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的 RDD的操作,比如 map()和 filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如 count() 和 first()。 Spark采用惰性计算模式,RDD只有第一次在一个行动操作中用到时,才会真正计算。Spark可以优化整个计算过程。默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

RDD的属性

  • 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

  • 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

  • RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

  • 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

  • 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD为什么会产生

RDD是Spark的基石,是实现Spark数据处理的核心抽象。那么RDD为什么会产生呢?
Hadoop的MapReduce是一种基于数据集的工作模式,面向数据,这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备。数据更多面临的是一次性处理。

MR的这种方式对数据领域两种常见的操作不是很高效。第一种是迭代式的算法。比如机器学习中ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。MR这种模式不太合适,即使多MR串行处理,性能和时间也是一个问题。数据的共享依赖于磁盘。另外一种是交互式数据挖掘,MR显然不擅长。

我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark应运而生。RDD是基于工作集的工作模式,更多的是面向工作流。
但是无论是MR还是RDD都应该具有类似位置感知、容错和负载均衡等特性。

RDD弹性

  • 自动进行内存和磁盘数据存储的切换

    Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换

  • 基于血统的高效容错机制

    在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。

  • Task如果失败会自动进行特定次数的重试

    RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。

  • Stage如果失败会自动进行特定次数的重试

    如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。

  • Checkpoint和Persist可主动或被动触发

    RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。

  • 数据调度弹性

    Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。

  • 数据分片的高度弹性

    可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。

RDD特点

RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

只读

RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。下图是RDD所支持的操作算子列表。

依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。

缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

Checkpoint

虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

给定一个RDD我们至少可以知道如下几点信息:1、分区数以及分区方式;2、由父RDDs衍生而来的相关依赖信息;3、计算每个分区的数据,计算步骤为a: 如果被缓存,则从缓存中取的分区的数据 b: 如果被checkpoint,则从checkpoint处恢复数据c: 根据血缘关系计算分区的数据。

RDD编程

编程模型

在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

创建RDD

在Spark中创建RDD的创建方式大概可以分为两种:

1.从集合中创建RDD;

1
2
// 由一个已经存在的Scala集合创建,集合并行化。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:

1
2
3
4
5
6
7
8
9
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现。

2.从外部存储创建RDD;

1
2
// 由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://node-1:9000/input/user.log")

RDD编程

park 算子大致可以分为以下两类:

  • Transformation 变换/转换算子:

这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

  • Action 行动算子:

这类算子会触发 SparkContext 提交 Job 作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

Transformation算子详细介绍

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用的Transformation:

转换 含义
map(func) 将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 类似reduceByKey,对pairRDD中想用的key值进行聚合操作,使用初始值(seqOp中使用,而combOpenCL中未使用)对应返回值为pairRDD,而区于aggregate(返回值为非RDD)
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 求笛卡尔乘积。该操作不会执行shuffle操作。
pipe(command, [envVars]) 通过一个shell命令来对RDD各分区进行“管道化”。通过pipe变换将一些shell命令用于Spark中生成的新RDD
coalesce(numPartitions**)** 重新分区,减少RDD中分区的数量到numPartitions
repartition(numPartitions) repartition是coalesce接口中shuffle为true的简易实现,即Reshuffle RDD并随机分区,使各分区数据量尽可能平衡。若分区之后分区数远大于原分区数,则需要shuffle。
repartitionAndSortWithinPartitions(partitioner) 该方法根据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序。

Action算子详细介绍

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) 从指定的RDD中返回前k个(最小)元素的集合,底层排序,和top相反。
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
top(n) 底层调用的是takeOrdered,之后排序翻转
saveAsObjectFile(path) 用于将RDD中的元素序列化成对象,存储到文件中。
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
fold(num,func) 通过op函数聚合各分区中的元素及合并各分区的元素,op函数需要两个参数,在开始时第一个传入的参数为zeroValue,T为RDD数据集的数据类型,,其作用相当于SeqOp和comOp函数都相同的aggregate函数
sample(withReplacement,num,[seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

RDD持久化

RDD的缓存

Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果 希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

RDD缓存方式

RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空 间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
注意:使用 Tachyon可以实现堆外缓存

1
2
3
4
5
6

# 从HDFS创建rdd并缓存
val rdd = sc.textFile("hdfs://node-1:9000/spark-2.1.3/input1").cache

# 当执行action操作是才会触发缓存
rdd.count

RDD checkpoint 机制

Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了,上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。

如果存在以下场景,则比较适合使用检查点机制:

  1. DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
  2. 在宽依赖上做Checkpoint获得的收益更大。

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

  • scala代码checkpoint示例
1
2
3
4
5
6
7
8
9
10
11
12

# 从HDFS创建rdd并缓存
val rdd = sc.textFile("hdfs://node-1:9000/spark-2.1.3/input2").cache

# 设置rdd的checkpoint目录
sc.setCheckpointDir("hdfs://node-1:9000/spark-2.1.3/output3")

# checkpoint
val rdd1 = rdd.checkout

# 当执行action操作是才会真正的checkpoint
rdd.count
  • java代码checkpoint示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
* <p> spark rdd checkpoint 机制
*
* @author leone
* @since 2019-02-17
**/
public class JavaSparkRddCheckpoint {

public static void main(String[] args) {
try {
JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("spark-checkpoint").setMaster("local[*]"));
// 设置 checkpoint 目录
sparkContext.setCheckpointDir("file:///tmp/output/");

JavaRDD<String> rdd = sparkContext.textFile("file:///tmp/input/hello.txt");

// 注意: 现需要对RDD进行缓存
JavaPairRDD<String, Integer> javaPairRDD = rdd.flatMapToPair((PairFlatMapFunction<String, String, Integer>) s -> {
String[] arrays = s.split(" ");
List<Tuple2<String, Integer>> list = new ArrayList<>(arrays.length);
for (String arr : arrays) {
list.add(new Tuple2<>(arr, 1));
}
return list.iterator();
}).cache();

// 为pairRDD设置检查点
javaPairRDD.checkpoint();

System.err.println("isCheckpoint: " + javaPairRDD.isCheckpointed() + " -- checkpointFile: " + javaPairRDD.getCheckpointFile());

javaPairRDD.collect();

System.err.println("isCheckpoint: " + javaPairRDD.isCheckpointed() + " -- checkpoint: " + javaPairRDD.getCheckpointFile());

sparkContext.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

RDD的依赖关系

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女

宽依赖

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle
总结:宽依赖我们形象的比喻为超生

Spark 提交任务

参数名 参数说明
–master master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local
–deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
–class 应用程序的主类,仅针对 java 或 scala 应用
–name 应用程序的名称
–jars 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
–packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
–exclude-packages 为了避免冲突 而指定不包含的 package
–repositories 远程 repository
–conf PROP=VALUE 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
–properties-file 加载的配置文件,默认为 conf/spark-defaults.conf
–driver-memory Driver内存,默认 1G
–driver-java-options 传给 driver 的额外的 Java 选项
–driver-library-path 传给 driver 的额外的库路径
–driver-class-path 传给 driver 的额外的类路径
–driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用
–executor-memory 每个 executor 的内存,默认是1G
–total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用
–num-executors 启动的 executor 数量。默认为2。在 yarn 下使用
–executor-core 每个 executor 的核数。在yarn或者standalone下使用

Local 模式

1
2
3
4
5
# 在 Spark home 下执行,无需启动 Spark 集群在一个节点直接运行即可

bin/spark-submit --master local[2] \
--class com.leone.bigdata.spark.java.core.wc.JavaWordCount \
/root/jars/spark-wc.jar file:///root/data/words/ file:///root/data/output/

Standalone 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群并指定 master

bin/spark-submit --master spark://node-1:7077 \
--class com.leone.bigdata.spark.java.core.wc.JavaWordCount \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output \
–-executor-memory 1G \
–-total-executor-cores 2

yarn Client 模式

1
2
3
4
5
6
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit --master yarn \
--class com.leone.bigdata.spark.java.core.wc.JavaWordCount \
--deploy-mode client \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output

yarn Cluster 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit \
--class com.leone.bigdata.spark.java.core.wc.JavaWordCount \
--master yarn \
--deploy-mode cluster \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output

Spark on YARN 之 Client 与 Cluster 模式区别

  • yarn-client

Driver运行在本地客户端,负责调度Application,会与yarn集群产生大量的网络通信。好处是,执行时可以在本地看到所有的log,便于调试。所以一般用于测试环境。

  • yarn-cluster

Driver运行在NodeManager,每次运行都是随机分配到NM机器上去。缺点就是本地提交后看不到log,只能通过yarn application-logs application id命令来查看。比较麻烦。