Flink DataStream API

标签: 大数据  flink  

一、Flink运行模型

在这里插入图片描述
以上为Flink的运行模型,Flink的程序主要由三部分构成,分别为SourceTransformationSink

DataSource——主要负责数据的读取
Transformation——主要负责对数据的转换操作
Sink——负责最终数据的输出

二、Flink程序架构

每个Flink程序都包含以下的若干流程:

(1)获得一个执行环境(Execution Environment)

(2)加载/创建初始数据(Source)

(3)指定转换这些数据(Transformation)

(4)指定放置计算结果的位置(Sink)

(5)触发程序执行

三、Environment

执行环境StreamExecutionEnvironment是所有Flink程序的基础

创建执行环境有三种方式,分别为:

StreamExecutionEnvironment.getExecutionEnvironment

StreamExecutionEnvironment.createLocalEnvironment

StreamExecutionEnvironment.createRemoteEnvironment

3.1 StreamExecutionEnvironment.getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment

3.2 StreamExecutionEnvironment.createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

3.3 StreamExecutionEnvironment.createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)

四、Source

4.1 基于File的数据源

(1) readTextFile(path)

一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("F:\\date\\student.txt")
stream.print()
env.execute("FirstJob")

(2) readFile(fileInputFormat, path)

按照指定的文件格式读取文件。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = new Path("/opt/module/test.txt")
val stream = env.readFile(new TextInputFormat(path), "/opt/module/test.txt")
stream.print()
env.execute("FirstJob")

4.2 基于Socket的数据源

socketTextStream

从Socket中读取信息,元素可以用分隔符分开。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 11111)
stream.print()
env.execute("FirstJob")

4.3 基于集合(Collection)的数据源

(1)fromCollection(seq)

从集合中创建一个数据流,集合中所有元素的类型是一致的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromCollection(list)
stream.print()
env.execute("FirstJob")

(2)fromCollection(Iterator)

从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。

//注意导包
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val iterator = Iterator(1,2,3,4)
val stream = env.fromCollection(iterator)
stream.print()
env.execute("FirstJob")

(3)fromElements(elements:_*)

从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。

//注意此行,必须使用此种导包方式
import org.apache.flink.api.scala._

object ScalaWordCount {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements("Who's there? I think I hear them. Standm ho! Who's there?")

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter {_.nonEmpty}}
.map {( _, 1)}
.groupBy(0)
.sum(1)
counts.print()
}
}

官网说明:

1:A frequent reason if that the code that generates the TypeInformation has not been imported. Make sure to import the entire flink.api.scala package.

2:Another common cause are generic methods, which can be fixed as described in the following section.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromElement(list)
stream.print()
env.execute("FirstJob")

运行结果:
在这里插入图片描述
(4)generateSequence(from, to)

从给定的间隔中并行地产生一个数字序列。读取一定范围的sequnce对象。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
stream.print()
env.execute("FirstJob")

在这里插入图片描述

五、Sink

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。

Flink有许多封装在DataStream操作里的内置输出格式。

5.1 writeAsText

将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

5.2 WriteAsCsv

将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

5.3 print/printToErr

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

5.4 writeUsingOutputFormat

自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

5.5 writeToSocket

根据SerializationSchema 将元素写入到socket中。

六、Transformation

6.1 Map

DataStream → DataStream:输入一个参数产生一个参数。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamMap = stream.map { x => x * 2 }
streamFilter.print()

env.execute("FirstJob")

注意:stream.print():每一行前面的数字代表这一行是哪一个并行线程输出的。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.utils.ParameterTool;
import scala.Tuple2;

import java.util.Random;


public class StuScore {
	private static Random rand = new Random();

	public static void main(String[] args) throws Exception {
		ParameterTool params = ParameterTool.fromArgs(args);
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().setGlobalJobParameters(params);

		DataSet<String> text;
			if (params.has("input")) {
			text = env.readTextFile("F:\\date\\flinkdata\\stu.txt");
		}else{
			System.out.println("请检查你的输入");
			return;
	}

MapOperator<String, Tuple2<String, Integer>> stuscore = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
	@Override
	public Tuple2<String, Integer> map(String s) throws Exception {
		return new Tuple2<>(s, rand.nextInt(100) + 1);
			}
	});

if (params.has("output")) {
		stuscore.writeAsCsv("F:\\date\\flinkdata\\personinput\\A");
		}else {
			System.out.println("打印到控制台");
			stuscore.print();
		}
	}
}

6.2 FlatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
streamFilter.print()

env.execute("FirstJob")

6.3 Filter

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamFilter = stream.filter{
	//打印奇数
x => (x % 2 != 0)
}
streamFilter.print()

env.execute("FirstJob")

6.4 Connect

在这里插入图片描述
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")

val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
//streamMap和streamCollect交换顺序不会影响结果
val streamConnect = streamMap.connect(streamCollect)

streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")

6.5 CoMap,CoFlatMap(注)

在这里插入图片描述
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
(str) => str + "connect",
(in) => in + 100
)

streamCoMap.print()

env.execute("FirstJob")

//========================

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
(str1) => str1.split(" "),
(str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")

6.6 Split

在这里插入图片描述
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

注:此代码无法运行出结果,使用Select即可运行

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
	//字符串内容为hadoop的组成一个DataStream,其余的组成一个DataStream
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)

env.execute("FirstJob")

6.7 Select

在这里插入图片描述
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)

val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
other.print()

env.execute("FirstJob")

6.8 Union

在这里插入图片描述
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)

env.execute("FirstJob")

6.9 KeyBy

DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
val streamMap = streamFlatMap.map{
x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")

6.10 Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)

streamReduce.print()

env.execute("FirstJob")

6.11 Fold

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.fold(100)(
(begin, item) => (begin + item._2)
)

streamReduce.print()

env.execute("FirstJob")

6.12 Aggregations

KeyedStream → DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)

val streamReduce = stream.sum(1)

streamReduce.print()

env.execute("FirstJob")

在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。

版权声明:本文为weixin_43520450原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_43520450/article/details/106325822

智能推荐

【Spark 内核】 Spark 内核解析-下

Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制、Spark任务调度机制、Spark内存管理机制、Spark核心功能的运行原理等,熟练掌握Spark内核原理,能够帮助我们更好地完成Spark代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在。 Spark Shuffle 解析 Shuffle 的核心要点 ShuffleMapStage与ResultSta...

Reflect反射的基础知识

写个父类: 写个子类: 利用反射获得该子类中的属性,方法,构造,父类及接口: 运行结果:...

spring cloud netflix (07) 服务的消费者(feign)

前言 完整知识点:spring cloud netflix 系列技术栈 Feign (同步通信 HTTP通信) feign是基于接口完成服务与服务之间的通信的 搭建Feign服务 项目结构 项目搭建 pom.xml application类 application.yml 使用feign完成服务与服务之间的通信 feign是基于接口完成服务与服务之间的通信的...

AtCoder Beginner Contest 174 E.Logs

AtCoder Beginner Contest 174 E.Logs 题目链接 到最后才发现是二分,菜菜的我/(ㄒoㄒ)/~~ 我们直接二分 [1,max{a[i]}][1,max\lbrace a[i]\rbrace][1,max{a[i]}] 即可,对每一个 midmidmid,每个数 a[i]a[i]a[i] 只需要切 a[i]−1mid\frac{a[i]-1}{mid}mi...

小程序基础与实战案例

小程序开发工具与基础 小程序开发准备: 申请小程序账号( appid ) 下载并安装微信开发者工具 具体步骤如下: 先进入 微信公众平台 ,下拉页面,把鼠标悬浮在小程序图标上 然后点击 小程序开发文档 照着里面给的步骤,就可以申请到小程序账号了。 然后就可以下载 开发者工具 了 下载完打开后的界面就是这个样子 下面让我们来新建一个小程序开发项目: 在AppID输入自己刚刚注册的AppID就可以,或...

猜你喜欢

VMware centOS7 下通过minikube部署Kubernetes

1、环境准备: VMware CentOS-7-x86_64 CPU:2*2core 内存:8G 宿主机和虚拟机需网络互通,虚拟机外网访问正常 Centos发行版版本查看:cat /etc/centos-release root用户操作 2、禁用swap分区 Kubernetes 1.8开始要求关闭系统的Swap,可暂时关闭或永久禁用, 使用 $ free -m 确认swap是否为开启状态 $ s...

逻辑回归与scikit-learn

欢迎关注本人的微信公众号AI_Engine LogisticRegression 算法原理 一句话概括:逻辑回归假设数据服从伯努利分布,通过极大化似然函数(损失函数)的方法,运用梯度下降或其他优化算法来求解参数,来达到将数据二分类的目的。 定义:逻辑回归(Logistic Regression)是一种用于解决二分类(0 or 1)问题的机器学习方法,用于估计某种事物的可能性(不是概率)。比如某用户...

指针OR数组?用他们来表达字符串又有何不同?

cocowy的编程之旅 在学习C语言的过程中我们经常可以看到或者听到这样一句话:数组其实等价于指针,例如: 在这里可以轻松的看出输出后他们的值相等,其实在计算机内存里面,p为本地变量,有着他自己的作用域。而指针变量q保存着这个数组的首地址,通过*号指向这个地址保存的变量值。 然而我们再看一个例子: 这个时候计算机报错,这是为什么呢? 其实原因很简单,指针说指向的这个字符串的地址是位于计算机代码段地...

广度搜索

广度搜索的基本使用方法 广度搜索不同于深度搜索,是一种一步一步进行的过程,每一个点只记录一遍。需要用到队列记录每一步可以走到的位置,找到目标位置输出步数即可。 用到的知识:结构体、队列 如图 首先我们需要定义一个结构体来存储每个遍历到的点和步数 广搜不会用到递归,所以可以直接在主函数里写,这里需要定义一个结构体队列 初始化队列并将起始点入列 遍历 完整代码...

NIO Socket 编程实现tcp通信入门(二)

1、NIO简介 NIO面向通道和缓冲区进行工作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。可以双向传输数据,是同步非阻塞式IO。NIO还引入了选择器机制,从而实现了一个选择器监听多个底层通道,减少了线程并发数。用NIO实现socket的Tcp通信需要掌握下面三个知识点: Buffer 缓冲区 Channel 通道 Selector 选择器   2、java.nio.Buff...