flink

简介

apche Flink是一个框架和分布式处理一起,用于对无界和有界的数据进行状态计算

阿里,腾讯,华为,网易,京东,携程,美团爱奇艺等等都有用到flink

为什么选择flink

流数据真实反映了我们的生活方式

传统的数据架构是基于有限数据及的

我们的目标

  • 低延迟
  • 高吞吐
  • 结果的准确性和更好的容错性

哪些行业需要处理流数据

  • 电商和市场营销:数据报表,广告投放,业务流程需要
  • 物联网:传感器数据采集和显式,实时报警,交通运输业
  • 电信业:基站流量调配
  • 银行和金融业:实时结算和通知推送,实时监测异常行为

对比

传统数据结构

分析处理

讲数据从业务数据复制到数据仓,在进行分析和查询

有状态的流式处理

lambda架构

用俩套系统,来同时保证低延迟和结果准确

Flink

特点

事件驱动(Event-driven)

在flink的世界观中,一切都是由流组成的,理线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流

flink有许多分层API

越顶层越抽象,表达含义越简明,使用越方便

越底层越具体,表达能力越丰富,使用越灵活

  • 支持事件时间event-time和处理时间processing-time
  • 精确一次exactly-once的状态一次性保证
  • 低延迟,每秒处理百万个事件,毫秒级延迟
  • 与众多存储系统的连接
  • 高可用动态扩展,实现7*24小时全天运行

与Spark比较

流处理和批处理

数据模型

  • spark采用RDD模型,Spark Streaming的Dsteam实际上也就是一组小批数据RDD的集合
  • flink基本数据模型是数据流,以及事件Event序列

运行时架构

  • spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
  • fink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

创建测试实例

txt文件

hello world
asdasd
hello spark
and you
amd yes!
intel no
QAQAQ

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>com.nsyl</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>

    </properties>


    <dependencies>
        
        
<!--        <dependency>-->
<!--            <groupId>org.apache.logging.log4j</groupId>-->
<!--            <artifactId>log4j-to-slf4j</artifactId>-->
<!--            <version>2.14.0</version>-->
<!--        </dependency>-->
    </dependencies>

</project>

java 统计文件词频代码

package flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @Author:
 * @Date: 2023/4/4 9:35
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //从文件读取数据
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        //将每行数据进行分词,转换成二元组类型
        //Collector是收集器,因为java没有二元组类型,所以flink提供了Tuple2类 ,返回结果为一个算子
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            //line是当前这一行的数据
            //按照空格切分数据 会把每行的数据依次传入
            String[] words = line.split(" ");
            //每个单词转换成二元组
            for (String word : words) {
                //包成二元组,在for循环中每调用一次out.collect就相当于输出了一个数据,只return的话就不能起到一对多的打散效果
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));//因为java会泛型擦除,所以要声明到底是什么类型的


        //按照word进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
        //分组内进行聚合统计,按照索引,意思是把第二个位置的值,做一个累加操作
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        sum.print();
    }
}

结果

(and,1)
(amd,1)
(spark,1)
(yes!,1)
(intel,1)
(QAQAQ,1)
(world,1)
(no,1)
(you,1)
(asdasd,1)
(hello,2)

以上是dataSourceAPI为批处理,流处理是dataStream API

新版本DataStream可以替代dataSource做到批流统一

dataStream会把批处理作为有界流来处理

data stream的操作方式

package flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import javax.xml.bind.annotation.XmlInlineBinaryData;

/**
 * @Author:
 * @Date: 2023/4/4 10:12
 */
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        //创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //这个父类类继承自DataStream
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/word.txt");//读取文件
        //转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //分组,类似groupby,根据key分组,也是指定第一个元素
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        sum.print();
        //流处理应该是不停的监听和等待等待数据输入
        env.execute();
    }
}

输出的结果是每统计到一次就输出一次

14> (no,1)
6> (hello,1)
16> (intel,1)
11> (world,1)
16> (yes!,1)
9> (amd,1)
10> (QAQAQ,1)
12> (you,1)
19> (and,1)
15> (asdasd,1)
2> (spark,1)
6> (hello,2)

flink如果在本地跑的话,其实会用多线程去模拟本地集群,运行任务进行分布式的执行,所以输出前面的数字指的是,本地的哪个线程来执行当前统计的,是并行子任务的的编号,这里并行度有多少线程就会有多少,没有设置的情况下是cpu的核心数量

在实际场景中,数据应该是无界的,数据是源源不断到来的,这种时候就应该保持一个监听的状态

无界流处理

在实际场景中,数据应该是无界的,数据是源源不断到来的,这种时候就应该保持一个监听的状态

打开windows nc 使用nc -l -p 7777(有的时候会断掉多启动俩次)

package com.nsyl;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author: suiyi
 * @Date: 2022/8/10 9:47
 */
public class RealStreamWordCount {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //读取socket文本流 ,一般localhost这个主机名是从配置文件里面取
        DataStreamSource<String> lineDataStream = env.socketTextStream("localhost", 7777);
        //转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");//这里会将每一行依次传入比如 hello spark
            for (String word : words) {
                //包成二元组
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //分组操作,等价于 keyBy(0) 但它已经被弃用,这里传lambda表达式,二元组中,第一个是f0,第二个就是f1
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //按照当前key分组的聚合求和操作 sum传索引位置
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        //打印输出
        sum.print();
        //流处理默认是无界的,所以需要启动执行的操作
        env.execute();


    }
}

这样去监听后,每次输入会有一组对应的输出,在之前的结果上做对应的更新聚合统计

在代码执行时在execute之前都是不会进行具体的转换计算操作的,只有等到数据进来后才会依次去执行

部署

前言

flink提交作业和执行任务需要几个关键的组件,客户端Client,作业管理器,JobManager和任务管理器TaskManager.我们的代码有客户端获取并做转换,之后提交给JobManager,所以JobManager就是Flink集群里的管事人,对中央调度管理;而它获取到要执行的作业后,就会进一步处理转换,然后分发给众多的taskManager.这里的TaskManager是真正的干活人,数据处理操作都是它们来做的

启动后在jobManager上传jar包

可以点击showplan查看具体流程,submit提交任务

进入任务详情可以删除任务

也可以用命令行直接提交任务,或者在taskManager直接提交到job

如果任务槽位0的时候提交任务就会失败

部署模式

flink提供了三种不同的部署模式

  • 会话模式(Session Mode)
  • 单作业模式(pre-Job Mode)
  • 应用模式(application Mode)

他们的主要区别在于集群的生命周期以及资源的分配方式,以及main方法到底在哪里执行,client还是jobManager

会话模式

最符合常规思维.我们需要启动一个集群,保持一个会话,在这个会话中,通过客户端提交作业,集群启动时所有资源就已经确定,所以所有提交的作业会竞争集群中的资源.会话模式适合单个规模性小,执行时间短的大量作业

单作业模式

会话模式因为资源共享会导致很多问题,所以为了更好的隔离资源,我们考虑为每个提交的作业启动一个集群,这就是所谓的单作业模式per-job

单作业模式很好理解,就是严格的一对一,集群只为这个作业而生.同样由客户端运行应用程序.然后启动集群,作业被提交给jobManager,进而分发给taskManager执行.作业完成后,这些集群就会关闭,所有资源也会释放

这些特性使得但作业模式在生产环境运行更加稳定,所以是实际应用的首选模式

需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借住一些资源管理框架来启动比如Yarn,K8s

应用模式

前面提到的俩种情况下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的.这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗

所以解决办法是,我们不要客户端了,直接把应用提交到JobManager上运行.而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群.这个JobManager只为执行这一个应用而存在,执行结束后JobMAnager也就关闭了,这就是所谓的应用模式

简单来说一个项目启动一个集群,就算一段代码有俩个作业,如果用单作业模式提交,其实会启动俩个集群,应用模式只会启动一个

独立模式

独立模式是flink最基本也是最简单的方案,所需要的所有flink组件都只是操作系统上运行的一个jvm进程

独立模式是独立运行的,不依赖外部任何的资源管理平台;的人独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理.所以独立模式一般只用在开发测试或者作业非常少的场景下

会话模式

之前上文就是独立集群的会话部署模式

单作业模式

flink本身无法直接以单位作业方式启动集群,一般需要借助一些资源管理平台.所以flink的独立集群不支持单作业模式部署

应用模式

应用模式下,不会提前创建集群,所以不能调用start-cluster.sh脚本.我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager

yarn模式

yarn上部署的过程是:客户端把flink应用提交给yarn的resourceManager,Yarn的resourceManager会想Yarn的NodeManager申请容器.在这些容器上,Flink会部署JobManager和TaskMAnager的实例,从而启动集群.Flink会根据运行在JobManager上的作业所需要的slot数量动态分配TaskManager资源

配置

在将flink让我部署至yarn集群之前,需要确认集群是否有安装Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有hdfs服务

flink系统架构

角色

客户端

客户端负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态。客户端提交任务可以采用CLI方式或者通过使用Flink WebUI提交,也可以在应用程序中指定JobManager的RPC网络端口构建ExecutionEnvironment提交Flink应用。

JobManager

JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlots资源并命令TaskManager启动从客户端中获取的应用。JobManager相当于整个集群的Master节点,且整个集群中有且仅有一个活跃的JobManager,负责整个集群的任务管理和资源管理。JobManager和TaskManager之间通过Actor System进行通信,获取任务执行的情况并通过Actor System将应用的任务执行情况发送给客户端。同时在任务执行过程中,Flink JobManager会触发Checkpoints操作,每个TaskManager节点收到Checkpoint触发指令后,完成Checkpoint操作,所有的Checkpoint协调过程都是在Flink JobManager中完成。当任务完成后,Flink会将任务执行的信息反馈给客户端,并且释放掉TaskManager中的资源以供下一次提交任务使用。

  • jobMaster 在作业提交时,jobMaster会先接收到要执行的应用程序.一般是客户端提交来的,包括jar包,数据流图(dataflow graph,和作业图)
    jobmaster会把jobGraph转换成一个屋里层面的数据流图,这个图被叫做执行图(ExecutionGraph),它包含了所有可以并发执行的请求.JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源,一旦获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上
  • 资源管理器 resource Manager
    负责资源分配和管理,在flink集群中只有一个.所谓的资源主要是TASKManager的任务槽(task slots).任务槽就是flink集群中的资源调配单元,包含了机器用来执行计算一组cpu和内存资源.每一个任务Task都要分配到一个slot上执行
  • 分发器 dispatcher
    disPatcher主要负责提供一个rest接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的jobMaster组件.Dispatcher也会启动一个webUI用来方便地展示监控作业执行的信息.Dispatcher在架构中不是必须的,在不同的部署模式下可能会被忽略掉

TaskManager

TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请与管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已经注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。

每个TaskManager最少持有 1 个 Slot,Slot 是 Flink 执行 Job 时的最小资源分配单位,在 Slot 中运行着具体的 Task 任务。

作业提交流程

独立模式允许如下

yarn会话模式下

单作业模式下

重要概念

程序与数据流dataFlow

在运行时,flink上运行的程序会被映射成逻辑数据流dataflows,它包含了这三部分

每一个dataflow以一个或多个source开始以一个或多个sinks结束.dataflow类似任意的有向无环图

在大部分情况下,程序中转换运算transformations,跟dataflow中算子operator是一一对应关系

并行度(parallelism)

每个算子(operator)可以包含一个或多个子任务(perator subtask)这些子任务在不同的线程,不同的物理机或不同的容器中完全独立地执行

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)

可以在StreamExecutionEnvironment中设置全局并行度,代码里单独设置的算子并行度优先级更高,一般全局设置没有必要,因为它等价于作业提交时的-p参数

数据传输形式

一个程序中,不同的算子可能有不同的并行度

算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种模式,取决于算子的种类

one-to-one:stream维护者分区以及元素的顺序(比如source和map之间).这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务产生的元素个数,顺序相同,map,fliter,flatmap等算子都是one-to-one的对应关系

redistrbuting:stream的分区会发生改变.每一个算子的子任务一句所选择的transformation发送数据到不同的目标任务.列入keyBy基于hashCode分区,而borad和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于spark中的shuffle过程

算子链

flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信开销.为了满足任务链要求,必须将俩个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接

相同并行度的one-to-one操作,flink这样相连的算子链接在一起形成一个task,原来的算子称为里面的subtask

并行度相同并且是one-to-one操作,这俩个条件缺一不可

执行图

flink的执行图可以分为四层StreamGraph->JobGraph->ExecutionGraph->物理执行图

  • streamGraph:根据用户通过streamAPI编写的代码生成的最初的图.原来表示程序的拓扑结构
  • JobGraph:streamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构.主要的优化为,将多个符合条件的节点chain在一起作为一个节点
  • ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行版本,是调度层最核心的数据结构
  • 物理执行图:JobMAnager根据ExecutionGraph对Job进行调度后,在各个TAskManager上部署Task后形成的图,并不是一个具体的数据结构

任务和任务槽

flink中每一个taskManager都是一个JVM进程,它可能会在独立 的线程上运行一个或多个子任务

为了控制一个TaskManager能接受多少个task,TaskManager通过task slot来进行控制(一个TaskManager至少有一个slot

任务共享slot

默认情况下,flink允许子任务共享slot.这样的结果是一个slot可以保存作业的整个管道

当我们将资源密集型和非密集型的任务同时放到一个slot中,它们就可以自行分配对资源占用比例,从而保证最重的活分配给所有的TaskManager

flink中每一个task都是jvm进程,它可能会在独立的线程上运行一个或多个子任务

人了控制一个taskMaanger能接收多少个task,taskManager通过task slot来进行控制

flink DATAStream API

执行环境

编写flink程序的第一步,就是创建执行环境.我们要获取的执行环境,是StreamExecutionEnvironment类对象,这是所有flink程序的基础.在代码中创建执行环境的方式,就是调用这个类的静态方法

getExecutionEnvironment

最简单的方式就是直接调用getExecutionEnvironment方法,他会根据当前运行的上下文直接得到正确的结果,如果程序是独立运行的,就返回一个本地的执行环境,如果创建了jar包,然后从命令行调用它并提交到集群,那么返回集群的执行环境.也就是说这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

不需要我们做判断,简单高效

底层其实是createLocalEnvirnment和createRemoteEnvironment俩个方法欠着是返回本地执行环境,不传入并行度就是cpu核心数,后者是返回集群执行环境,需要调用时指定JobManager的主机名和端口号,并指定集群中运行的jar包.

执行模式

从1.12.0开始,flink实现了api上的流批统一,DataStream新增了一个重要特性:可以支持不同的执行模式(execution mode)通过简单的设置就可以让一段flink程序在流处理和批处理之间切换.这样依赖DataSet API也就没有存在的必要了

也可以在代码中直接env.setRunTimeMode

建议:不要在代码中配置,而是使用命令.这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写后之后既可以用于处理批处理,也可以处理流处理.而在代码中硬编码的方式扩展性差,一般不推荐

源算子Source

创建环境后,既可以构建数据处理的业务逻辑的了

flink可以从各种来源获取数据,然后构建dataStream进行转换处理.一般将数据的输入来源称为数据源(DataSource)而读取数据的算子就是源算子(source operator).所以source就是我们处理程序的输入端

Last modification:August 8, 2023
如果觉得我的文章对你有用,请随意赞赏