山西城乡和建设厅网站首页,怎样建设一个好的网站,个人备案的网站涉及到资金,宁波seo优化项目Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 21、Flink 的table API与DataStream API 集成1- 介绍及入门示例、集成说明 21、Flink 的table API与DataStream API 集成2- 批处理模式和inser-only流处理 21、Flink 的table API与DataStream API 集成3- changelog流处理、管道示例、类型转换和老版本转换示例 21、Flink 的table API与DataStream API 集成完整版 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 33、Flink 的Table API 和 SQL 中的时区 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、Table API 与 DataStream API集成1、概述2、 DataStream 和 Table 相互转换示例1、示例1 - toDataStream2、示例2 - toChangelogStream3、示例3 - 通过仅切换标志来处理批处理和流数据 3、集成说明1、maven依赖2、import3、Configuration4、执行行为1、DataStream API2、Table API 本文是Flink table api 与 datastream api的集成的第一篇主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明即maven依赖、import、配置以及执行行为并以具体的示例进行说明。 本文依赖flink、kafka集群能正常使用。 本文分为3个部分即集成概述、三个入门示例、集成说明。 本文的示例是在Flink 1.17版本中运行。
一、Table API 与 DataStream API集成
1、概述
在定义数据处理管道时Table API和DataStream API同样重要。
DataStream API在一个相对低级的命令式编程API中提供流处理的原语即时间、状态和数据流管理。Table API抽象了许多内部构件并提供了结构化和声明性API。
这两个API都可以处理有界和无界流。
在处理历史数据时需要管理有界流。无界流发生在实时处理场景中这些场景可能先使用历史数据进行初始化。
为了有效执行这两个API都以优化的批处理执行模式提供处理有界流。然而由于批处理只是流的一种特殊情况因此也可以在常规流执行模式下运行有界流的管道。
一个API中的管道可以端到端定义而不依赖于另一个API。然而出于各种原因混合这两种API可能是有用的
在DataStream API中实现主管道main pipeline之前使用表生态系统(table ecosystem)轻松访问目录catalogs 或连接到外部系统。在DataStream API中实现主管道之前访问一些SQL函数以进行无状态数据规范化和清理。如果table API中不存在更低级的操作例如自定义计时器处理则不时切换到DataStream API。
Flink提供了特殊的桥接功能以使与DataStream API的集成尽可能顺利。
在DataStream 和Table API之间切换会增加一些转换开销。例如部分处理二进制数据的表运行时即RowData的内部数据结构需要转换为更用户友好的数据结构即Row。通常这个开销可以忽略。
maven依赖 本篇文章如果没有特殊说明将使用如下maven依赖
propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-gateway/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.12/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-uber/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hive_2.12/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion3.1.2/version/dependency!-- flink连接器 --!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-connector-kafka/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-compress/artifactIdversion1.24.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.2/version!-- scopeprovided/scope --/dependency/dependencies2、 DataStream 和 Table 相互转换示例
Flink提供了专门的StreamTableEnvironment用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment并将DataStream API中使用的StreamExecutionEnvironments作为参数。
1、示例1 - toDataStream
下面的代码展示了如何在两个API之间来回切换的示例。表的列名和类型自动从DataStream的TypeInformation派生。由于DataStream API本机不支持变更日志处理因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** author alanchan**/
public class ConvertingDataStreamAndTableDemo {/*** param args* throws Exception*/public static void main(String[] args) throws Exception {// 1、创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 2、创建输入流DataStreamString dataStream env.fromElements(alan, alanchan, alanchanchn);// 3、将datastream 转为 tableTable inputTable tenv.fromDataStream(dataStream);// 4、创建视图该步骤不是必须将姓名转为大写tenv.createTemporaryView(InputTable, inputTable);Table resultTable tenv.sqlQuery(SELECT UPPER(f0) FROM InputTable);// 5、将table转成datastream进行输出DataStreamRow resultStream tenv.toDataStream(resultTable);resultStream.print();env.execute();}}
示例输出
12 I[ALAN]
14 I[ALANCHANCHN]
13 I[ALANCHAN]fromDataStream和toDataStream的完整语义可以在下面的部分中找到。特别是本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它还包括使用事件时间和水印。
根据查询的类型在许多情况下生成的动态表是一个管道它不仅在将表转换为数据流时产生仅插入的更改而且还产生收回和其他类型的更新。在表到流转换期间这可能会导致类似于以下内容的异常
Table sink Unregistered_DataStream_Sink_1 doesnt support consuming update changes [...].在这种情况下需要再次修改查询或切换到ChangelogStream。
2、示例2 - toChangelogStream
下面的示例显示如何转换更新表。 每个结果行表示更改日志中的一个条目该条目具有更改标志可以通过对其调用row.getKind来查询。在本例中Alice的第二个分数在更改之前-U创建更新在更改之后U创建更新。
本示例仅仅以一个方法来展示避免没有必要的代码运行框架参考上述示例。 public static void test2() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 2、创建输入流DataStreamRow dataStream env.fromElements(Row.of(alan, 18), Row.of(alanchan, 19), Row.of(alanchanchn, 20), Row.of(alan, 20));// 3、将datastream 转为 tableTable inputTable tenv.fromDataStream(dataStream).as(name, salary);// 4、创建视图该步骤不是必须tenv.createTemporaryView(InputTable, inputTable);Table resultTable tenv.sqlQuery(SELECT name, SUM(salary) FROM InputTable GROUP BY name);// 5、将table转成datastream进行输出DataStreamRow resultStream tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}运行结果
2 I[alan, 18]
16 I[alanchan, 19]
16 I[alanchanchn, 20]
2 -U[alan, 18]
2 U[alan, 38]fromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。特别是本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。
上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而在输入流有限即有界的情况下通过利用批处理原理可以更有效地计算结果。
在批处理中可以在连续的阶段中执行运算符这些阶段在发出结果之前使用整个输入表。例如连接操作符可以在执行实际连接之前对两个有界输入进行排序即排序合并连接算法或者在使用另一个输入之前从一个输入构建哈希表即哈希连接算法的构建/探测阶段。
DataStream API和Table API都提供专门的批处理运行时模式。
3、示例3 - 通过仅切换标志来处理批处理和流数据
下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。
public static void test3() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 2、创建输入流DataStreamRow dataStream env.fromElements(Row.of(alan, 18), Row.of(alanchan, 19), Row.of(alanchanchn, 20), Row.of(alan, 20));// 3、将datastream 转为 tableTable inputTable tenv.fromDataStream(dataStream).as(name, salary);// 4、创建视图该步骤不是必须tenv.createTemporaryView(InputTable, inputTable);Table resultTable tenv.sqlQuery(SELECT name, SUM(salary) FROM InputTable GROUP BY name);// 5、将table转成datastream进行输出DataStreamRow resultStream tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}运行结果
注意比较和示例2的输出区别
I[alanchan, 19]
I[alan, 38]
I[alanchanchn, 20]
一旦将changelog 应用于外部系统例如键值存储可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据批处理模式的更改日志仅由仅插入的更改组成。有关更多细节请参阅下面的专用批处理模式部分。
3、集成说明
将Table API与DataStream API相结合的项目需要添加以下桥接模块之一。 它们包括对 flink-table-api-java或flink-table-api-scala的可传递依赖性以及相应的特定于语言的DataStream api模块。
1、maven依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.12/artifactIdversion1.17.1/versionscopeprovided/scope
/dependency2、import
使用DataStream API和Table API的Java或Scala版本声明公共管道需要以下导入。
// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;3、Configuration
TableEnvironment将采用传递的StreamExecutionEnvironment.中的所有配置选项。然而不能保证对StreamExecutionEnvironment配置的进一步更改在实例化后传播到StreamTableEnvironment。在规划期间将选项从Table API传播到DataStream API。
我们建议在切换到Table API之前尽早在DataStream API中设置所有配置选项。
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream API
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration earlyenv.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table API
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of(Europe/Berlin));// start defining your pipelines in both APIs...
4、执行行为
这两个API都提供了执行管道的方法。换句话说如果被请求它们将编译一个作业图 job graph该作业图将提交到集群并触发以执行。结果将流式传输到声明的sinks。
通常这两个API都在方法名称中使用术语“执行”来标记这种行为。然而Table API和DataStream API之间的执行行为略有不同。
1、DataStream API
DataStream API的StreamExecutionEnvironment使用生成器模式builder pattern来构造复杂的管道。管道可能会拆分为多个分支这些分支可能以sink结尾也可能不以sink结尾。环境缓冲environment buffers所有这些定义的分支直到提交作业。
StreamExecutionEnvironment.execute提交整个构建的管道然后清除构建器。换句话说不再声明sources 和sinks 并且可以向生成器中添加新的管道。因此每个DataStream程序通常以对StreamExecutionEnvironment.execute的调用结束。或者DataStream.executeAndCollect隐式定义了一个sink用于将结果流式传输到本地客户端。
2、Table API
在Table API中分支管道仅在StatementSet中受支持其中每个分支必须声明一个最终sink。TableEnvironment和StreamTableEnvironment都不提供专用的通用execute方法。相反它们提供了提交单个source-to-sink管道或语句集的方法 final static String sinkSQL CREATE TABLE OutputTable (\n userId INT,\r\n age INT,\r\n balance DOUBLE,\r\n userName STRING,\r\n t_insert_time TIMESTAMP(3)\r\n ) WITH (\n connector print\n );final static String sinkSQL2 CREATE TABLE OutputTable2 (\n userId INT,\r\n age INT,\r\n balance DOUBLE,\r\n userName STRING,\r\n t_insert_time TIMESTAMP(3)\r\n ) WITH (\n connector print\n );final static String sourceSQL CREATE TABLE InputTable (\r\n userId INT,\r\n age INT,\r\n balance DOUBLE,\r\n userName STRING,\r\n t_insert_time AS localtimestamp,\r\n WATERMARK FOR t_insert_time AS t_insert_time\r\n ) WITH (\r\n connector datagen,\r\n rows-per-second10,\r\n fields.userId.kindsequence,\r\n fields.userId.start1,\r\n fields.userId.end20,\r\n fields.balance.kindrandom,\r\n fields.balance.min1,\r\n fields.balance.max100,\r\n fields.age.min1,\r\n fields.age.max100,\r\n fields.userName.length6\r\n );;public static void test4() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);//sinkSQL//sourceSQL// 建表tenv.executeSql(sourceSQL);//tenv.executeSql(sinkSQL);tenv.executeSql(sinkSQL2);//插入表数据方式一tenv.from(InputTable).insertInto(OutputTable).execute();tenv.executeSql(select * from OutputTable);tenv.from(InputTable).execute().print();//插入表数据方式二tenv.executeSql(INSERT INTO OutputTable SELECT * FROM InputTable);tenv.executeSql(select * from OutputTable);//插入表数据方式三tenv.createStatementSet().addInsertSql(INSERT INTO OutputTable SELECT * FROM InputTable).addInsertSql(INSERT INTO OutputTable2 SELECT * FROM InputTable).execute();// 输出tenv.from(InputTable).execute().print();tenv.executeSql(SELECT * FROM InputTable).print();env.execute();}
输出结果
3 I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812]
15 I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812]
2 I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812]
16 I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812]
10 I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812]
13 I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812]
12 I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812]
6 I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812]
9 I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812]
8 I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812]
5 I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812]
7 I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812]
14 I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812]
11 I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812]
4 I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812]
1 I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812]
3 I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806]
1 I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807]
4 I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807]
2 I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806]
-----------------------------------------------------------------------------------------------------------------------
| op | userId | age | balance | userName | t_insert_time |
-----------------------------------------------------------------------------------------------------------------------
| I | 1 | 91 | 22.629318048042723 | 923e08 | 2023-11-13 14:49:59.800 |
| I | 2 | 67 | 75.26915785038814 | 342baa | 2023-11-13 14:49:59.803 |
| I | 3 | 68 | 74.06076023217011 | 1dbbce | 2023-11-13 14:49:59.803 |
| I | 4 | 26 | 79.47471729272772 | 083e2e | 2023-11-13 14:49:59.802 |
| I | 5 | 97 | 82.56249330491859 | 4a3c6e | 2023-11-13 14:49:59.804 |
| I | 6 | 32 | 81.74903214944425 | fdac4e | 2023-11-13 14:49:59.800 |
| I | 7 | 67 | 94.80154136831771 | f7acea | 2023-11-13 14:49:59.800 |
| I | 8 | 53 | 50.85073238739004 | cfbd0c | 2023-11-13 14:49:59.800 |
| I | 9 | 69 | 93.64054547476522 | 7fa9ec | 2023-11-13 14:49:59.801 |
| I | 10 | 66 | 61.92366658766452 | 05b86a | 2023-11-13 14:49:59.803 |
| I | 11 | 81 | 95.61717698776191 | efa8ce | 2023-11-13 14:49:59.797 |
| I | 12 | 8 | 63.573174957723076 | 0fbfec | 2023-11-13 14:49:59.802 |
| I | 13 | 85 | 52.938510850778734 | 43bfa8 | 2023-11-13 14:49:59.803 |
| I | 14 | 26 | 5.130287258770441 | 083c6c | 2023-11-13 14:49:59.797 |
| I | 15 | 35 | 73.3318749510538 | 0e3b4c | 2023-11-13 14:49:59.802 |
| I | 16 | 84 | 16.24326410122912 | ac2d6e | 2023-11-13 14:49:59.802 |
| I | 18 | 41 | 32.38455189801736 | b07afb | 2023-11-13 14:50:00.804 |
| I | 19 | 24 | 77.6947569111452 | 7f72ac | 2023-11-13 14:50:00.803 |
| I | 20 | 92 | 82.53929937026987 | 051fb9 | 2023-11-13 14:50:00.802 |
| I | 17 | 93 | 12.784194121509948 | bce5d9 | 2023-11-13 14:50:00.801 |
-----------------------------------------------------------------------------------------------------------------------
20 rows in set
为了组合这两种执行行为对StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次调用都将具体化materialize 即编译Table API子管道sub-pipeline并将其插入DataStream API管道生成器builder中。这意味着之后必须调用StreamExecutionEnvironment.execute或DataStream.executeAndCollect。Table API中的执行不会触发这些“外部部件external parts”。
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();上述示例中有具体应用。
以上本文是Flink table api 与 datastream api的集成的第一篇主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明即maven依赖、import、配置以及执行行为并以具体的示例进行说明。