Spark Steaming

Spark Steaming 实战入门

Posted by leone on 2018-10-17

Spark Steaming

Spark Streaming 简介

什么是Spark Streaming

Spark Streaming使用Spark Core的快速调度功能来执行流分析。它以小批量方式提取数据,并对这些小批量数据执行RDD转换。此设计使得为批量分析编写的同一组应用程序代码可用于流分析,从而有助于轻松实现lambda体系结构。 然而,这种便利性带来的等待时间等于小批量持续时间。其他按事件而不是小批量处理事件的流数据引擎包括Storm和Flink的流媒体组件。 Spark Streaming内置支持消费Kafka,Flume,Twitter,ZeroMQ,Kinesis和TCP / IP套接字。

在Spark 2.x中,还提供了一种基于数据集的独立技术,称为结构化流,具有更高级别的接口,以支持流式传输。

spark streaming实时接收输入数据流,并根据时间将数据流分成连续的多个batch,然后由Spark引擎一次处理一批数据,以批量生成最终结果流。

spark streaming的核心参数,设置流数据被分成多个batch的时间间隔,每个spark引擎处理的就是这个时间间隔内的数据。在Spark Streaming中,Job之间有可能存在依赖关系,所以后面的作业必须确保前面的作业执行完后才会被调度执行。如果批处理时间超过了batch duration,意味着数据处理速率跟不上数据接收速率,那么会导致后面正常的batch提交的作业无法按时执行,随着时间的推移,越来越多的作业被延迟执行,最后导致整个Streaming作业被阻塞,所以需要设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内执行完成。

Spark Streaming 特点

1、Spark Streaming用于处理流式计算问题。能够和Spark的其他模块无缝集成。

2、Spark Streaming是一个粗粒度的框架【也就是只能对一批数据指定处理方法】,核心是采用微批次架构。和Storm采用的以条处理的不同。

3、Spark Streaming会运行接收器来不断的接收输入的数据流,然后根据程序配置的时间,将时间范围内的所有数据打成一个RDD,发送给Spark Core去进行处理。依次来打成对数据流的计算。

4、Spark Streaming有它自己的抽象,叫DStream Discretized Stream离散化流

5、如果入水口的速度大于出水口的速度,那么势必导致水管爆裂,Spark Streaming也存在这个问题,内部采用背压机制来进行处理,会通过ReceiverRateController来不断计算RDD的处理速度和RDD的生成速度,来通过令牌桶机制进行速度控制。只要是控制令牌的生成周期。

术语定义

  • 离散流(discretized stream)或DStream:这是Spark Streaming对内部持续的实时数据流的抽象描述,即我们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。

  • 批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。

  • 时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。

  • 窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,

  • 滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数

  • Input DStream :一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。

Storm与Spark Streming比较

  • 处理模型以及延迟

虽然两框架都提供了可扩展性(scalability)和可容错性(fault tolerance),但是它们的处理模型从根本上说是不一样的。Storm可以实现亚秒级时延的处理,而每次只处理一条event,而Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event。所以说Storm可以实现亚秒级时延的处理,而Spark Streaming则有一定的时延。

  • 容错和数据保证

然而两者的代价都是容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。这就意味着可变更的状态可能被更新两次从而导致结果不正确。

任一方面,Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次,即使是node节点挂掉。虽然说Storm的 Trident library可以保证一条记录被处理一次,但是它依赖于事务更新状态,而这个过程是很慢的,并且需要由用户去实现。

  • 实现和编程API

Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。如果你想看看这两个框架是如何实现的或者你想自定义一些东西你就得记住这一点。Storm是由BackType和 Twitter开发,而Spark Streaming是在UC Berkeley开发的。

Storm提供了Java API,同时也支持其他语言的API。 Spark Streaming支持Scala和Java语言(其实也支持Python)。

  • 批处理框架集成

Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就可以想使用其他批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询。这就减少了单独编写流批量处理程序和历史数据处理程序。

  • 生产支持

Storm已经出现好多年了,而且自从2011年开始就在Twitter内部生产环境中使用,还有其他一些公司。而Spark Streaming是一个新的项目,并且在2013年仅仅被Sharethrough使用(据作者了解)。

Storm是 Hortonworks Hadoop数据平台中流处理的解决方案,而Spark Streaming出现在 MapR的分布式平台和Cloudera的企业数据平台中。除此之外,Databricks是为Spark提供技术支持的公司,包括了Spark Streaming。

虽然说两者都可以在各自的集群框架中运行,但是Storm可以在Mesos上运行, 而Spark Streaming可以在YARN和Mesos上运行。

DStream

表示一系列时间序列上连续的RDDs,每一个RDDs代表一定时间间隔内到达的数据,这样就把连续的数据流拆成很多小的RDDs数据块(RDDs数据块内的数据是连续的数据)。可以通过实时数据创建DStream,也可以对现有的DStream进行transformation操作生成,例如map、window、reduceByKeyAndWindow等转换操作。    在spark streaming运行期间,每个DStream都会定期生成一个RDDs,具体的是compute(time) 方法,生成的RDDs代表一个批次内的数据,作为提交job的输入元数据

Spark Streaming 实战

spark nc word count scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* <p>
*
* @author leone
* @since 2018-12-24
**/
object NcWordCount {

def main(args: Array[String]): Unit = {

// 创建sparkContext
val sc = new SparkContext(new SparkConf().setAppName("nc-wc-steaming").setMaster("local[2]"))

// 设置批次产生的时间间隔
val ssc = new StreamingContext(sc, Seconds(5000))

// 从一个socket端口读取数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node-1", 8888)

// 对DStream进行操作
val words: DStream[String] = lines.flatMap(_.split(" "))

val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)

// 打印结果
reduced.print()

// 启动spark程序
ssc.start()
ssc.awaitTermination()
}

}

Spark Streaming kafka scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* <p>
*
* @author leone
* @since 2018-12-24
**/
object KafkaWordCount {

def main(args: Array[String]): Unit = {

// offset保存路径
val checkpointPath = "file:///e:/tmp/spark/streaming/checkpoint/kafka-direct"

val conf = new SparkConf().setAppName("ScalaKafkaStream").setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(3))

ssc.checkpoint(checkpointPath)

val bootstrapServers = "node-2:9092,node-3:9092,node-4:9092"
val groupId = "streaming-group"
val topicName = "streaming-topic"
val maxPoll = 20000

val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val DStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))

DStream.map(_.value)
.flatMap(_.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.transform(data => {
data.sortBy(_._2, false)
}).print()

ssc.start()
ssc.awaitTermination()

}

}

Spark Streaming nc word count java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
* <p>
*
* @author leone
* @since 2018-12-25
**/
public class JavaStreamingTest {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("streaming");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(5));

JavaReceiverInputDStream<String> stream = jsc.socketTextStream("node-1", 8888);

// 拆分每一行数据
JavaDStream<String> words = stream.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());

// 单词映射成 (word, 1) 的形式
JavaPairDStream<Object, Integer> pairWords = words.mapToPair((PairFunction<String, Object, Integer>) s -> new Tuple2<>(s, 1));

// 进行reduce聚合操作
JavaPairDStream<Object, Integer> result = pairWords.reduceByKey((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);

// 打印输出结构
result.print();

jsc.start();
jsc.awaitTermination();
jsc.close();
}

}

Spark Streaming kafka word count java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.*;

/**
* <p>基于Kafka Direct方式实时 word count 程序
*
* @author leone
* @since 2018-12-25
**/
public class JavaStreamingKafkaTest {

public static void main(String[] args) throws InterruptedException {
// 创建SparkConf对象
SparkConf conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[*]");

// 创建JavaStreamingContext对象
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

// kafka的brokers
String brokers = "node-2:9092,node-3:9092,node-4:9092";

// 创建Kafka参数Map
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "g1");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");

// 创建Kafka的topics ,里面可以填多个topic
Collection<String> topics = Collections.singletonList("streaming-topic");

// 创建DStream
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(jsc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams));

// 拆分Kafka topic里面的数据
JavaDStream<String> linesSplit = lines.flatMap((FlatMapFunction<ConsumerRecord<Object, Object>, String>) line -> Arrays.asList(line.value().toString().split(" ")).iterator());

// 单词映射成(word,1)的形式
JavaPairDStream<String, Integer> word = linesSplit.mapToPair((PairFunction<String, String, Integer>) everyWord -> new Tuple2<>(everyWord, 1));

// 进行reduce聚合操作
JavaPairDStream<String, Integer> wordsCount = word.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);

// 打印输出结构
wordsCount.print();

jsc.start();
jsc.awaitTermination();
jsc.close();
}

}

Spark Streaming 提交模式

Local 模式

1
2
3
4
5
# 在 Spark home 下执行,无需启动 Spark 集群在一个节点直接运行即可

bin/spark-submit --master local[2] \
--class com.leone.bigdata.spark.java.streaming.JavaSparkStreamingKafka \
/root/jars/spark-wc.jar file:///root/data/words/ file:///root/data/output/

Standalone 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群并指定 master

bin/spark-submit --master spark://node-1:7077 \
--class com.leone.bigdata.spark.java.streaming.JavaSparkStreamingKafka \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output \
–-executor-memory 1G \
–-total-executor-cores 2

yarn Client 模式

1
2
3
4
5
6
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit --master yarn \
--class com.leone.bigdata.spark.java.streaming.JavaSparkStreamingKafka \
--deploy-mode client \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output

yarn Cluster 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit \
--class com.leone.bigdata.spark.java.streaming.JavaSparkStreamingKafka \
--master yarn \
--deploy-mode cluster \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output