Flink、Scala开发环境配置

Flink Scala 开发环境设置

pom.xml 依赖示例文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <!--    flink版本    -->
        <flink.version>1.13.6</flink.version>
        <!--    scala 完整版本    -->
        <scala.version>2.12.20</scala.version>
        <!--    scala 大版本 2.12、2.11等    -->
        <scala.binary.version>2.12</scala.binary.version>

        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--    flink scala 依赖    -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

    </dependencies>

</project>

词频统计示例代码

WordCount.scala

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    // 创建 Flink 执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 指定输入文件路径
    val inputPath = "src/main/resources/words.txt"
    val text = env.readTextFile(inputPath)

    // 统计单词
    val wordCounts = text
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    // 输出结果
    wordCounts.print()
  }
}

实时词频统计示例代码

SocketWordCount.scala

  import org.apache.flink.streaming.api.scala._

  object SocketWordCount {
    def main(args: Array[String]): Unit = {

      // 创建 Flink 执行环境
      val env = StreamExecutionEnvironment.getExecutionEnvironment

      // 连接到 netcat (localhost:6000)
      val text = env.socketTextStream("改成虚拟机IP", 6000)
  
      // 分割输入的每一行,并将单词映射为 (word, 1) 的元组
      val wordCounts = text
        .flatMap(_.split(" "))         // 按空格分割每一行
        .map((_, 1))                   // 将每个单词映射为 (word, 1)
        .keyBy(0)                      // 按单词分组
        .sum(1)                        // 统计每个单词的出现次数

      // 打印统计结果到控制台
      wordCounts.print()

      // 执行
      env.execute()
    }
  }

评论