flink
简介
apche Flink是一个框架和分布式处理一起,用于对无界和有界的数据进行状态计算
阿里,腾讯,华为,网易,京东,携程,美团爱奇艺等等都有用到flink
为什么选择flink
流数据真实反映了我们的生活方式
传统的数据架构是基于有限数据及的
我们的目标
- 低延迟
- 高吞吐
- 结果的准确性和更好的容错性
哪些行业需要处理流数据
- 电商和市场营销:数据报表,广告投放,业务流程需要
- 物联网:传感器数据采集和显式,实时报警,交通运输业
- 电信业:基站流量调配
- 银行和金融业:实时结算和通知推送,实时监测异常行为
对比
传统数据结构
分析处理
讲数据从业务数据复制到数据仓,在进行分析和查询
有状态的流式处理
lambda架构
用俩套系统,来同时保证低延迟和结果准确
Flink
特点
事件驱动(Event-driven)
在flink的世界观中,一切都是由流组成的,理线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流
flink有许多分层API
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活
- 支持事件时间event-time和处理时间processing-time
- 精确一次exactly-once的状态一次性保证
- 低延迟,每秒处理百万个事件,毫秒级延迟
- 与众多存储系统的连接
- 高可用动态扩展,实现7*24小时全天运行
与Spark比较
流处理和批处理
数据模型
- spark采用RDD模型,Spark Streaming的Dsteam实际上也就是一组小批数据RDD的集合
- flink基本数据模型是数据流,以及事件Event序列
运行时架构
- spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
- fink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
创建测试实例
txt文件
hello world
asdasd
hello spark
and you
amd yes!
intel no
QAQAQ
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>com.nsyl</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>log4j-to-slf4j</artifactId>-->
<!-- <version>2.14.0</version>-->
<!-- </dependency>-->
</dependencies>
</project>
java 统计文件词频代码
package flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @Author:
* @Date: 2023/4/4 9:35
*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件读取数据
DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
//将每行数据进行分词,转换成二元组类型
//Collector是收集器,因为java没有二元组类型,所以flink提供了Tuple2类 ,返回结果为一个算子
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
//line是当前这一行的数据
//按照空格切分数据 会把每行的数据依次传入
String[] words = line.split(" ");
//每个单词转换成二元组
for (String word : words) {
//包成二元组,在for循环中每调用一次out.collect就相当于输出了一个数据,只return的话就不能起到一对多的打散效果
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));//因为java会泛型擦除,所以要声明到底是什么类型的
//按照word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
//分组内进行聚合统计,按照索引,意思是把第二个位置的值,做一个累加操作
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
sum.print();
}
}
结果
(and,1)
(amd,1)
(spark,1)
(yes!,1)
(intel,1)
(QAQAQ,1)
(world,1)
(no,1)
(you,1)
(asdasd,1)
(hello,2)
以上是dataSourceAPI为批处理,流处理是dataStream API
新版本DataStream可以替代dataSource做到批流统一
dataStream会把批处理作为有界流来处理
data stream的操作方式
package flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import javax.xml.bind.annotation.XmlInlineBinaryData;
/**
* @Author:
* @Date: 2023/4/4 10:12
*/
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
//创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这个父类类继承自DataStream
DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/word.txt");//读取文件
//转换计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组,类似groupby,根据key分组,也是指定第一个元素
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
//求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
sum.print();
//流处理应该是不停的监听和等待等待数据输入
env.execute();
}
}
输出的结果是每统计到一次就输出一次
14> (no,1)
6> (hello,1)
16> (intel,1)
11> (world,1)
16> (yes!,1)
9> (amd,1)
10> (QAQAQ,1)
12> (you,1)
19> (and,1)
15> (asdasd,1)
2> (spark,1)
6> (hello,2)
flink如果在本地跑的话,其实会用多线程去模拟本地集群,运行任务进行分布式的执行,所以输出前面的数字指的是,本地的哪个线程来执行当前统计的,是并行子任务的的编号,这里并行度有多少线程就会有多少,没有设置的情况下是cpu的核心数量
在实际场景中,数据应该是无界的,数据是源源不断到来的,这种时候就应该保持一个监听的状态
无界流处理
在实际场景中,数据应该是无界的,数据是源源不断到来的,这种时候就应该保持一个监听的状态
打开windows nc 使用nc -l -p 7777
(有的时候会断掉多启动俩次)
package com.nsyl;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author: suiyi
* @Date: 2022/8/10 9:47
*/
public class RealStreamWordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取socket文本流 ,一般localhost这个主机名是从配置文件里面取
DataStreamSource<String> lineDataStream = env.socketTextStream("localhost", 7777);
//转换计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");//这里会将每一行依次传入比如 hello spark
for (String word : words) {
//包成二元组
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组操作,等价于 keyBy(0) 但它已经被弃用,这里传lambda表达式,二元组中,第一个是f0,第二个就是f1
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
//按照当前key分组的聚合求和操作 sum传索引位置
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
//打印输出
sum.print();
//流处理默认是无界的,所以需要启动执行的操作
env.execute();
}
}
这样去监听后,每次输入会有一组对应的输出,在之前的结果上做对应的更新聚合统计
在代码执行时在execute之前都是不会进行具体的转换计算操作的,只有等到数据进来后才会依次去执行
部署
前言
flink提交作业和执行任务需要几个关键的组件,客户端Client,作业管理器,JobManager和任务管理器TaskManager.我们的代码有客户端获取并做转换,之后提交给JobManager,所以JobManager就是Flink集群里的管事人,对中央调度管理;而它获取到要执行的作业后,就会进一步处理转换,然后分发给众多的taskManager.这里的TaskManager是真正的干活人,数据处理操作都是它们来做的
启动后在jobManager上传jar包
可以点击showplan查看具体流程,submit提交任务
进入任务详情可以删除任务
也可以用命令行直接提交任务,或者在taskManager直接提交到job
如果任务槽位0的时候提交任务就会失败
部署模式
flink提供了三种不同的部署模式
- 会话模式(Session Mode)
- 单作业模式(pre-Job Mode)
- 应用模式(application Mode)
他们的主要区别在于集群的生命周期以及资源的分配方式,以及main方法到底在哪里执行,client还是jobManager
会话模式
最符合常规思维.我们需要启动一个集群,保持一个会话,在这个会话中,通过客户端提交作业,集群启动时所有资源就已经确定,所以所有提交的作业会竞争集群中的资源.会话模式适合单个规模性小,执行时间短的大量作业
单作业模式
会话模式因为资源共享会导致很多问题,所以为了更好的隔离资源,我们考虑为每个提交的作业启动一个集群,这就是所谓的单作业模式per-job
单作业模式很好理解,就是严格的一对一,集群只为这个作业而生.同样由客户端运行应用程序.然后启动集群,作业被提交给jobManager,进而分发给taskManager执行.作业完成后,这些集群就会关闭,所有资源也会释放
这些特性使得但作业模式在生产环境运行更加稳定,所以是实际应用的首选模式
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借住一些资源管理框架来启动比如Yarn,K8s
应用模式
前面提到的俩种情况下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的.这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗
所以解决办法是,我们不要客户端了,直接把应用提交到JobManager上运行.而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群.这个JobManager只为执行这一个应用而存在,执行结束后JobMAnager也就关闭了,这就是所谓的应用模式
简单来说一个项目启动一个集群,就算一段代码有俩个作业,如果用单作业模式提交,其实会启动俩个集群,应用模式只会启动一个
独立模式
独立模式是flink最基本也是最简单的方案,所需要的所有flink组件都只是操作系统上运行的一个jvm进程
独立模式是独立运行的,不依赖外部任何的资源管理平台;的人独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理.所以独立模式一般只用在开发测试或者作业非常少的场景下
会话模式
之前上文就是独立集群的会话部署模式
单作业模式
flink本身无法直接以单位作业方式启动集群,一般需要借助一些资源管理平台.所以flink的独立集群不支持单作业模式部署
应用模式
应用模式下,不会提前创建集群,所以不能调用start-cluster.sh脚本.我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager
yarn模式
yarn上部署的过程是:客户端把flink应用提交给yarn的resourceManager,Yarn的resourceManager会想Yarn的NodeManager申请容器.在这些容器上,Flink会部署JobManager和TaskMAnager的实例,从而启动集群.Flink会根据运行在JobManager上的作业所需要的slot数量动态分配TaskManager资源
配置
在将flink让我部署至yarn集群之前,需要确认集群是否有安装Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有hdfs服务
flink系统架构
角色
客户端
客户端负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态。客户端提交任务可以采用CLI方式或者通过使用Flink WebUI提交,也可以在应用程序中指定JobManager的RPC网络端口构建ExecutionEnvironment提交Flink应用。
JobManager
JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlots资源并命令TaskManager启动从客户端中获取的应用。JobManager相当于整个集群的Master节点,且整个集群中有且仅有一个活跃的JobManager,负责整个集群的任务管理和资源管理。JobManager和TaskManager之间通过Actor System进行通信,获取任务执行的情况并通过Actor System将应用的任务执行情况发送给客户端。同时在任务执行过程中,Flink JobManager会触发Checkpoints操作,每个TaskManager节点收到Checkpoint触发指令后,完成Checkpoint操作,所有的Checkpoint协调过程都是在Flink JobManager中完成。当任务完成后,Flink会将任务执行的信息反馈给客户端,并且释放掉TaskManager中的资源以供下一次提交任务使用。
- jobMaster 在作业提交时,jobMaster会先接收到要执行的应用程序.一般是客户端提交来的,包括jar包,数据流图(dataflow graph,和作业图)
jobmaster会把jobGraph转换成一个屋里层面的数据流图,这个图被叫做执行图(ExecutionGraph),它包含了所有可以并发执行的请求.JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源,一旦获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上 - 资源管理器 resource Manager
负责资源分配和管理,在flink集群中只有一个.所谓的资源主要是TASKManager的任务槽(task slots).任务槽就是flink集群中的资源调配单元,包含了机器用来执行计算一组cpu和内存资源.每一个任务Task都要分配到一个slot上执行 - 分发器 dispatcher
disPatcher主要负责提供一个rest接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的jobMaster组件.Dispatcher也会启动一个webUI用来方便地展示监控作业执行的信息.Dispatcher在架构中不是必须的,在不同的部署模式下可能会被忽略掉
TaskManager
TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请与管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已经注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。
每个TaskManager最少持有 1 个 Slot,Slot 是 Flink 执行 Job 时的最小资源分配单位,在 Slot 中运行着具体的 Task 任务。
作业提交流程
独立模式允许如下
yarn会话模式下
单作业模式下
重要概念
程序与数据流dataFlow
在运行时,flink上运行的程序会被映射成逻辑数据流dataflows,它包含了这三部分
每一个dataflow以一个或多个source开始以一个或多个sinks结束.dataflow类似任意的有向无环图
在大部分情况下,程序中转换运算transformations,跟dataflow中算子operator是一一对应关系
并行度(parallelism)
每个算子(operator)可以包含一个或多个子任务(perator subtask)这些子任务在不同的线程,不同的物理机或不同的容器中完全独立地执行
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)
可以在StreamExecutionEnvironment中设置全局并行度,代码里单独设置的算子并行度优先级更高,一般全局设置没有必要,因为它等价于作业提交时的-p参数
数据传输形式
一个程序中,不同的算子可能有不同的并行度
算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种模式,取决于算子的种类
one-to-one:stream维护者分区以及元素的顺序(比如source和map之间).这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务产生的元素个数,顺序相同,map,fliter,flatmap等算子都是one-to-one的对应关系
redistrbuting:stream的分区会发生改变.每一个算子的子任务一句所选择的transformation发送数据到不同的目标任务.列入keyBy基于hashCode分区,而borad和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于spark中的shuffle过程
算子链
flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信开销.为了满足任务链要求,必须将俩个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
相同并行度的one-to-one操作,flink这样相连的算子链接在一起形成一个task,原来的算子称为里面的subtask
并行度相同并且是one-to-one操作,这俩个条件缺一不可
执行图
flink的执行图可以分为四层StreamGraph->JobGraph->ExecutionGraph->物理执行图
- streamGraph:根据用户通过streamAPI编写的代码生成的最初的图.原来表示程序的拓扑结构
- JobGraph:streamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构.主要的优化为,将多个符合条件的节点chain在一起作为一个节点
- ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行版本,是调度层最核心的数据结构
- 物理执行图:JobMAnager根据ExecutionGraph对Job进行调度后,在各个TAskManager上部署Task后形成的图,并不是一个具体的数据结构
任务和任务槽
flink中每一个taskManager都是一个JVM进程,它可能会在独立 的线程上运行一个或多个子任务
为了控制一个TaskManager能接受多少个task,TaskManager通过task slot来进行控制(一个TaskManager至少有一个slot
任务共享slot
默认情况下,flink允许子任务共享slot.这样的结果是一个slot可以保存作业的整个管道
当我们将资源密集型和非密集型的任务同时放到一个slot中,它们就可以自行分配对资源占用比例,从而保证最重的活分配给所有的TaskManager
flink中每一个task都是jvm进程,它可能会在独立的线程上运行一个或多个子任务
人了控制一个taskMaanger能接收多少个task,taskManager通过task slot来进行控制
flink DATAStream API
执行环境
编写flink程序的第一步,就是创建执行环境.我们要获取的执行环境,是StreamExecutionEnvironment类对象,这是所有flink程序的基础.在代码中创建执行环境的方式,就是调用这个类的静态方法
getExecutionEnvironment
最简单的方式就是直接调用getExecutionEnvironment方法,他会根据当前运行的上下文直接得到正确的结果,如果程序是独立运行的,就返回一个本地的执行环境,如果创建了jar包,然后从命令行调用它并提交到集群,那么返回集群的执行环境.也就是说这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
不需要我们做判断,简单高效
底层其实是createLocalEnvirnment和createRemoteEnvironment俩个方法欠着是返回本地执行环境,不传入并行度就是cpu核心数,后者是返回集群执行环境,需要调用时指定JobManager的主机名和端口号,并指定集群中运行的jar包.
执行模式
从1.12.0开始,flink实现了api上的流批统一,DataStream新增了一个重要特性:可以支持不同的执行模式(execution mode)通过简单的设置就可以让一段flink程序在流处理和批处理之间切换.这样依赖DataSet API也就没有存在的必要了
也可以在代码中直接env.setRunTimeMode
建议:不要在代码中配置,而是使用命令.这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写后之后既可以用于处理批处理,也可以处理流处理.而在代码中硬编码的方式扩展性差,一般不推荐
源算子Source
创建环境后,既可以构建数据处理的业务逻辑的了
flink可以从各种来源获取数据,然后构建dataStream进行转换处理.一般将数据的输入来源称为数据源(DataSource)而读取数据的算子就是源算子(source operator).所以source就是我们处理程序的输入端