Flink

Flink 快速入门

Posted by leone on 2019-03-01

Flink

flink介绍

什么是flink?

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并发化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink与Storm类似,属于事件驱动型实时流系统。

Flink流处理特性:

  • 支持高吞吐、低延迟、高性能的流处理

  • 支持带有事件时间的窗口(Window)操作

  • 支持有状态计算的Exactly-once语义

  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作

  • 支持具有Backpressure功能的持续流模型

  • 支持基于轻量级分布式快照(Snapshot)实现的容错

  • 一个运行时同时支持Batch on Streaming处理和Streaming处理

  • Flink在JVM内部实现了自己的内存管理

  • 支持迭代计算

  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

flink技术栈

  1. DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便的采用Flink提供的各种操作符对分布式数据集进行各种操作,支持Java,Scala和Python。

  2. DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便的采用Flink提供的各种操作符对分布式数据流进行各种操作,支持Java和Scala。

  3. Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过Flink提供的类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

  4. Flink ML,Flink的机器学习库,提供了机器学习Pipelines API以及很多的机器学习算法实现。

  5. Gelly,Flink的图计算库,提供了图计算的相关API以及很多的图计算算法实现。

Flink集群的搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 下载
$ wget https://mirrors.shu.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.11.tgz

# 解压缩
$ tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz

# 配置flink-conf.yaml
$ vim $FLINK_HOME/conf/flink-conf.yaml

jobmanager.rpc.address: node-1

# 配置slaves
$ vim $FLINK_HOME/conf/slaves

node-2
node-3
node-4


# 配置master
$ vim $FLINK_HOME/conf/masters

node-1:8081


# 启动flink集群
$ $FLINK_HOME/bin/start-cluster.sh

# 关闭flink集群
$ $FLINK_HOME/bin/stop-cluster.sh

# 启动flink
$ $FLINK_HOME/bin/start-cluster.sh

# 停止flink
$ $FLINK_HOME/bin/stop-cluster.sh

# 分发文件到其他节点
scp -r flink-1.7.2 root@node-x:/xxx/xxx

flink 提交任务

这里使用的是flink自带的 jar 演示

standalone模式提价

1
bin/flink run -m node-1:8081 ./examples/batch/WordCount.jar --input hdfs://node-1:9000/data/common/ --output hdfs://node-1:9000/data/output

flink on yarn

1
2
3
4
5
# 先启动一个YARN session 用 4 个 TaskManager(每个TaskManager分配1GB的堆空间)
$FLINK_HOME/bin/yarn-session.sh -n 3 -s 1 -jm 1024 -tm 1024 -nm test -d

# 提价任务到 flink 集群
$FLINK_HOME/bin/flink run -m yarn-cluster -yn 3 ./examples/batch/WordCount.jar --input hdfs://node-1:9000/data/common/ --output hdfs://node-1:9000/data/output

flink java word count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Test;

/**
* <p>
*
* @author leone
* @since 2019-02-28
**/
public class FlinkJavaWc {

/**
* 从socket中读取数据并处理
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("node-1", 8082);
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split(" ");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(2), Time.seconds(1)).sum(1);
dataStream.print();
env.execute("Java WordCount from SocketTextStream Example");
}

}

flink scala word count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
* <p>
*
* @author leone
* @since 2019-02-28
**/
object FlinkScalaWordCount {

def main(args: Array[String]): Unit = {
// 必须要导入隐式转换
import org.apache.flink.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("node-1", 9999, '\n')
val result = text.flatMap(line => line.split("\\s"))
.map(w => WordWithCount(w, 1))
.keyBy("word")
.timeWindowAll(Time.seconds(2), Time.seconds(1))
.sum("count")

result.print().setParallelism(1)

env.execute("scala—nc-wc")
}

case class WordWithCount(word: String, count: Long)

}

flink 整合 kafka

  • FlinkJavaKafka.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Properties;

/**
* <p>
*
* @author leone
* @since 2019-03-04
**/
public class FlinkJavaKafka {

private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

private static final String GROUP = "test-group";

private static final String TOPIC_NAME = "flink-topic";

public static void main(String[] args) throws Exception {

// get env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkJavaKafka conn = new FlinkJavaKafka();

Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER);
kafkaProps.setProperty("group.id", GROUP);

FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(TOPIC_NAME, new SimpleStringSchema(), kafkaProps);

DataStream<String> stream = env.addSource(kafkaConsumer);

DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1).setParallelism(1);

counts.print();

env.execute();

}

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;

public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.split(" ");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

}
  • FlinkScalaKafka.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56


import java.util.Properties

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

/**
* <p>
*
* @author leone
* @since 2019-03-04
**/
object FlinkScalaKafka {

private val ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181"

private val KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092"

private val GROUP_ID = "test-group"

private val TOPIC_NAME = "flink-topic"

def main(args: Array[String]): Unit = {

// get env
val env = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint 配置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// kafka 配置
val kafkaProps = new Properties()
kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
kafkaProps.setProperty("group.id", GROUP_ID)

// 读取 kafka 消息
val srouce = env.addSource(new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps))

val result = srouce.flatMap(_.split(" ")
.filter(_.nonEmpty)
.map((_, 1)))
.keyBy(0)
.sum(1)
.setParallelism(1)

result.print()

env.execute()

}