Spark Structured Steaming

Spark Structured Steaming 实战入门

Posted by leone on 2019-04-02

Spark Structured Steaming

Spark Structured Streaming 简介

什么是 Spark Structured Streaming

Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。您可以以静态数据表示批量计算的方式来表达 streaming computation (流式计算)。 Spark SQL 引擎将随着 streaming data 持续到达而增量地持续地运行,并更新最终结果。您可以使用 Scala , Java , Python 或 R 中的 Dataset/DataFrame API 来表示 streaming aggregations (流聚合), event-time windows (事件时间窗口), stream-to-batch joins (流到批处理连接) 等。在同一个 optimized Spark SQL engine (优化的 Spark SQL 引擎)上执行计算。最后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),而无需用户理解 streaming 。

Structured Streaming 编程模型

Structured Streaming 的关键思想是将 live data stream (实时数据流)视为一种正在不断 appended (附加)的表。这形成了一个与 batch processing model (批处理模型)非常相似的新的 stream processing model (流处理模型)。您会将您的 streaming computation (流式计算)表示为在一个静态表上的 standard batch-like query (标准类批次查询),并且 Spark 在 unbounded(无界) 输入表上运行它作为 incremental(增量) 查询。让我们更加详细地了解这个模型。

基本概念

将 input data stream (输入数据流) 视为 “Input Table”(输入表)。每个在 stream 上到达的 data item (数据项)就像是一个被 appended 到 Input Table 的新的 row 。

  • Stream as a Table

对输入的查询将生成 “Result Table” (结果表)。每个 trigger interval (触发间隔)(例如,每 1 秒),新 row (行)将附加到 Input Table ,最终更新 Result Table 。无论何时更新 result table ,我们都希望将 changed result rows (更改的结果行)写入 external sink (外部接收器)。

  • Model

“Output(输出)” 被定义为写入 external storage (外部存储器)的内容。可以以不同的模式定义 output :

Complete Mode(完全模式) - 整个更新的 Result Table 将被写入外部存储。由 storage connector (存储连接器)决定如何处理整个表的写入。

Append Mode(附加模式) - 只有 Result Table 中自上次触发后附加的新 rows(行) 将被写入 external storage (外部存储)。这仅适用于不期望更改 Result Table 中现有行的查询。

Update Mode(更新模式) - 只有自上次触发后 Result Table 中更新的 rows (行)将被写入 external storage (外部存储)(从 Spark 2.1.1 之后可用)。请注意,这与 Complete Mode (完全模式),因为此模式仅输出自上次触发以来更改的 rows (行)。如果查询不包含 aggregations (聚合),它将等同于 Append mode 。

为了说明这个模型的使用,我们来了解一下上面章节的 快速示例 。第一个 lines DataFrame 是 input table ,并且最后的 wordCounts DataFrame 是 result table 。请注意,streaming lines DataFrame 上的查询生成 wordCounts 是 exactly the same(完全一样的) 因为它将是一个 static DataFrame (静态 DataFrame )。但是,当这个查询启动时, Spark 将从 socket 连接中持续检查新数据。如果有新数据,Spark 将运行一个 “incremental(增量)” 查询,它会结合以前的 running counts (运行计数)与新数据计算更新的 counts ,如下所示。

  • Model

这种模式与许多其他 stream processing engines (流处理引擎)有着显著不同。许多 streaming systems (流系统)要求用户本身保持运行 aggregations (聚合),因此必须要考虑容错,和数据一致性(at-least-once(至少一次), at-most-once (最多一次),exactly-once (完全一次))。在这个模型中,当有新数据时, Spark 负责更新 Result Table ,从而减轻用户对它的考虑。举个例子,我们来看一下这个模型如何处理对于基于 event-time 的处理和 late arriving (迟到)的数据。

Spark Structured Streaming 实战

使用 scala 开发 Structured Streaming 程序

  • StructuredNcWordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

/**
* <p>
*
* @author leone
* @since 2019-03-28
**/
object StructuredNcWordCount {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate()

import spark.implicits._

val line = spark.readStream
.format("socket")
.option("host", "ip")
.option("port", "8000")
.load()

// 由于控制台日志打印太多方便调试
spark.sparkContext.setLogLevel("WARN")

val words = line.as[String].flatMap(_.split(" "))

val wordsCount = words.groupBy("value").count()

// 注意:Append模式不支持基于数据流上的聚合操作(Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets)
// 三种模式1:complete 所有内容都输出 2:append 新增的行才输出 3:update 更新的行才输出
val query = wordsCount.writeStream.outputMode(OutputMode.Complete()).format("console").start()
query.awaitTermination()
}
}
  • Spark Structure Streaming 整合 kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}

/**
* <p>structured 整合kafka
*
* @author leone
* @since 2019-03-29
**/
object StructuredWithKafka {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

import spark.implicits._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node-2:9092,node-3:9092,node-4:9092")
.option("subscribe", "structured-topic")
//.option("startingOffsets", "earliest")
.load()

val kafkaDS: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

println(kafkaDS)

kafkaDS.printSchema()

val words = kafkaDS.flatMap(_._2.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts
.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
query.awaitTermination()

}

}

使用 java 开发 Structured Streaming 程序

  • Spark Structure Streaming NC word count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.Arrays;

/**
* <p>
*
* @author leone
* @since 2019-03-29
**/
public class StructuredStreamingWithNc {

public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate();
spark.sparkContext().setLogLevel("warn");

Dataset<Row> line = spark.readStream()
.format("socket")
.option("host", "ip")
.option("port", "8000")
.load();

Dataset<String> words = line.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator(), Encoders.STRING());

Dataset<Row> wordCount = words.groupBy("value").count();

StreamingQuery query = wordCount.writeStream()
.outputMode(OutputMode.Complete())
.format("console")
.start();

query.awaitTermination();
}

}
  • Spark Structure Streaming 整合kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.Arrays;

/**
* <p>
*
* @author leone
* @since 2019-04-02
**/
public class StructuredStreamingKafka {

public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate();
spark.sparkContext().setLogLevel("warn");

// Create DataSet representing the stream of input lines from kafka
Dataset<String> kafkaDS = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "node-2:9092,node-3:9092,node-4:9092")
.option("subscribe", "topic-spark-structured")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());

// val kafkaDS: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

//kafkaDS.printSchema();

Dataset<Row> wordCounts = kafkaDS.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();

StreamingQuery query = wordCounts.writeStream()
.outputMode(OutputMode.Complete())
.format("console")
.start();

query.awaitTermination();
}

}

Spark Streaming 提交模式

Local 模式

1
2
3
4
5
# 在 Spark home 下执行,无需启动 Spark 集群在一个节点直接运行即可

bin/spark-submit --master local[2] \
--class com.leone.bigdata.spark.java.structured.StructuredStreamingKafka \
/root/jars/spark-wc.jar file:///root/data/words/ file:///root/data/output/

Standalone 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群并指定 master

bin/spark-submit --master spark://node-1:7077 \
--class com.leone.bigdata.spark.java.structured.StructuredStreamingKafka \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output \
–-executor-memory 1G \
–-total-executor-cores 2

yarn Client 模式

1
2
3
4
5
6
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit --master yarn \
--class com.leone.bigdata.spark.java.structured.StructuredStreamingKafka \
--deploy-mode client \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output

yarn Cluster 模式

1
2
3
4
5
6
7
# 在 Spark home 下执行,需要启动 Spark 集群 hdfs 集群 yarn 集群

bin/spark-submit \
--class com.leone.bigdata.spark.java.structured.StructuredStreamingKafka \
--master yarn \
--deploy-mode cluster \
/root/jars/spark-wc.jar hdfs://node-1:9000/data/common hdfs://node-1:9000/data/output