Flink 笔记

Flink 的运行架构

Flink 程序:

应用代码→优化器/Graph构建器→客户端→数据流图

作业管理器(JobManager)

Jobmaster

jobmaster是jobmanager中核心组件 负责处理单独的job

一个 job 对应一个 jobmaster

相关关键词 :

  • dataflow graph (数据流图)

  • jobGraph (作业图)

  • ExecutionGraph (执行图)

  • ResourceManager (源管理器)

ResourceManager (资源管理器)

负责资源的分配和管理,在Flink集群中只有一个,所谓资源主要是任务槽(Task slots)

Dispatcher (分发器)

负责提供一个 REST 接口,用于提交应用,Dispatcher 会启动一个WebUI 用来方便地展示和监控作业执行的信息,它并不是必要的,在不同部署模式下可能会忽略

.

任务管理器(TaskManager)

  • Flink的工作进程会有多个 TaskManager 运行, 每一个TaskManager 都包含一定数量的槽位(slots)

  • TaskManager启动后,会向ResourceManager注册它的槽位,之后jobManager就可以向槽位分配任务(tasks)

  • 执行过程中,TaskManager可以跟其他运行同一应用的TaskManager交换数据

独立模式 任务流程

TaskManager JobManager 都提前启动好 再提交任务

YARN会话模式 任务流程

只提前启动 JobManager 等提交任务再申请资源、启动 TaskManager

YARN 单作业模式 任务流程

提交任务后再启动集群 任务提交给Yarn

程序与数据流 (DataFlow)

所有Flink程序都分为三个部分

  • Source 输入

  • TransFormation 处理: 利用各种算子进行加工处理

  • Sink 输出

每个dataflow 以一个或多个 sources → 一个或多个 sinks 结束.

并行度(Parallelism)

每个算子 (operator)可以包含多个子任务(operator subtask)

子任务在不同线程和不同机器中完全独立执行。

一个算子的子任务(subtask)的个数被称为其并行度(parallelism)

数据传输形式

算子之间的数据传输形式可以是 one-to-one(forwarding) 的模式,也可以是redistributing的模式,具体是哪一种取决于算子的种类

  • 当算子之间处于同一机器的时候 采用直通模式 one-to-one(Forward)

  • 当算子之间跨机器、通过网络传输时采用重分配模式 (redistribut)

算子链(Operator Chains)

直通模式的数据通过合并算子来优化算子数。

执行图(ExecutionGraph)

  • 执行图分为4层

StreamGraph (流图)

用于生成构建基础的逻辑结构 描述数据源、数据转换、数据汇等 生成最初的图、对代码进行算子优化 (算子链)

JobGraph(作业图)

在StreamGraph基础上生成 他是一个 有向无环图(DAG) 用于定义算子的并行度、数据传输方式、任务之间的依赖关系等

ExecutionGraph(执行图)

在JobGraph基础上生成,包含任务之间的依赖关系、调度策略任务资源分配

物理执行图

在ExecutionGraph基础上生成 他是一个具体实例,在TaskManager上部署Task后形成 包含具体的资源分配、任务调度、数据传输等,用于实际执行管理作业。

任务和任务槽 (Task and Task Slots

Flink中的每一个TaskManager是一个Jvm进程,他在一个独立的线程上执行一个或多个子任务。

slot数量 = 最大并行度

配置taskmanager的slots数量

# slots 数量
taskmanager.numberOfTaskSlots: 1

# taskmanager ram大小
taskmanager.memory.process.size: 1728m

从kafka导入数据

pom 引入

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

创建好环境 然后创建一个properties对象存入kafka配置

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "consumer.group")

读取kafka数据源

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

自定义数据源

创建一个继承于 SourceFunction[Event] 或 ParallelSourceFunction 的类

重写 run方法 和 cancel 方法

  override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {
        val event = xxx
        sourceContext.collect(event)
  }
  override def cancel(): Unit = {}

数据转换

基本的算子转换

  • 映射(map)

一对一的转换 消费一个数据产生一个数据

  • 过滤(filter)

一对一或者一对零

  • FlatMap

一对多

IDEA 关闭运行日志

添加编辑文件 src/main/resources/log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=DEBUG, file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=/log/flink.log
log4j.appender.file.append=false
log4j.appender.file.Threshold =DEBUG
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

时间语义

  • 处理时间 (数据到达算子的时候)

  • 事件时间 (事件发生的时候)

KeyBy 可以保留数据 可以做top排行

map().keyby(x => ture).reduce()

窗口

流式数据是无线的 如果要统计每1分钟的成交额 就要用到 窗口

作用是将无限的流 分为有限的块来统计

可以把窗口比作水桶、把数据流比作水龙头 更好理解

按照驱动类型划分窗口

时间窗口

从一个时间点 开始 到一个时间点结束 当超过结束时间 就不在收集数据 并触发计算 输出结果 然后销毁窗口。

计数窗口

数据到达一定量才关闭,切换到下一个窗口

窗口的分配

滚动窗口

可以基于时间和数量

滑动窗口

可以基于时间和数量

会话窗口

可以基于时间

全局窗口

需要自定义触发器

窗口的定义

 stream.keyBy(_).window()  // 带有keyby的流 高并行度
 stream.windowAll()  // 没有keyby的流 并行度为1
    //        时间滚动窗口
          .window(TumblingTimeWindows.of(Time.seconds(10)))
    //        时间滑动窗口
          .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)))
    //        时间会话窗口
          .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    //        计数滚动窗口
          .countWindow(5)
    //        计数滑动窗口
          .countWindow(5, 2)

窗口的计算

增量聚合

  • Reduce Function

期间准备好 Jdk、Hadoop 并启动hadoop

解压和设置flink环境变量

修改配置文件

vim $FLINK_HOME/conf/flink-conf.yaml 

# jobmanager.rpc.address: localhost
# 改为
jobmanager.rpc.address: master

vim $FLINK_HOME/conf/masters
# 改为
master:8081

vim $FLINK_HOME/conf/workers
# 改为
slave1
slave2

启动 $FLINK_HOME/bin/start-cluster.sh

在哪个机器上运行这条命令 就会在那台机器上开放web端口

提交任务

$FLINK_HOME/bin/flink run -m master:8081 -c word_count.无界流处理 -p 2 /root/untitled1-1.0-SNAPSHOT-jar-with-dependencies.jar --host slave1 --port 7777

#$FLINK_HOME/bin/flink run -m HOST:PORT -c 入口类 -p 并行度 PATH/TO/JAR.jar --args value

正在运行的作业查看 flink list| flink list -a

关闭运行的作业 flink cancel <JOB_ID>

Flink的运行模式

独立模式 (Standalone)

独立模式包含以下部署方式:

  • 单作业模式(借助其他管理平台如yarn、k8s)

  • 会话模式

  • 应用模式

YARN模式

将Flink应用提交给 Yarn的 ResourceManager

1、启动hadoop集群

2、检查环境变量是否有

export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

3、重启hadoop yarn

4、启动flink on yarn yarn-session.sh -nm test -d

参数:

  • -nm

  • -d 分离模式

  • -jm JobManager 所需内存 单位MB

  • -qu 指定yarn队列名

  • -tm 每个TaskManager 所所用内存

5、提交作业 flink run

flink run -c word_count.无界流处理 ./JAR.jar
Comment