【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、创建 Flink 表并将其注册到 Catalog
-
- 1、使用 SQL DDL
- 2、maven依赖
- 3、使用 Table API 创建hive表并注册到hivecatalog示例
- 4、使用 SQL语句 创建hive表并注册到hivecatalog示例
- 5、验证
-
- 1)、打包、上传
- 2)、提交任务
- 3)、验证
本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,其他依赖如下:
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
一、创建 Flink 表并将其注册到 Catalog
1、使用 SQL DDL
用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。
JdbcCatalog不能创建库或表,官方示例写的不明确;hivecatalog可以创建表。
本示例是以mysql为基础,flink 版本为1.17。
// the catalog should have been registered via yaml file Flink SQL> CREATE DATABASE mydb WITH (...); -----Jdbccatalog不能创建表,hivecatalog可以创建表---- Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...); Flink SQL> SHOW TABLES; mytable -----------------------具体示例如下----------------------------------- Flink SQL> CREATE CATALOG alan_catalog WITH( > 'type' = 'jdbc', > 'default-database' = 'test?useSSL=false', > 'username' = 'root', > 'password' = 'root', > 'base-url' = 'jdbc:mysql://192.168.10.44:3306' > ); [INFO] Execute statement succeed. Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | alan_catalog | | default_catalog | +-----------------+ 2 rows in set Flink SQL> use catalog alan_catalog; [INFO] Execute statement succeed. Flink SQL> show databases; +------------------+ | database name | +------------------+ | azkaban | | cdhhive | | cdhhue | ...... | spring_boot_plus | | springbootmall | | test | | zipkin | +------------------+ 29 rows in set Flink SQL> use test; [INFO] Execute statement succeed. Flink SQL> show tables; +------------------------------+ | table name | +------------------------------+ | permissions | | person | | personinfo | | role | | user | +------------------------------+ 34 rows in set Flink SQL> select * from person; Flink SQL> SET sql-client.execution.result-mode = tableau; [INFO] Execute statement succeed. Flink SQL> select * from person; +----+-------------+--------------------------------+-------------+ | op | id | name | age | +----+-------------+--------------------------------+-------------+ | +I | 11 | 测试修改go语言 | 30 | | +I | 13 | NameUpdate | 22 | | +I | 14 | updatejson | 23 | | +I | 189 | 再试一试 | 12 | | +I | 191 | test-full-update | 3333 | | +I | 889 | zhangsanswagger2 | 88 | | +I | 892 | update | 189 | | +I | 1001 | testupdate | 19 | | +I | 1002 | 测试go语言 | 23 | | +I | 1013 | slene | 0 | | +I | 1014 | testing | 0 | | +I | 1015 | testing | 18 | | +I | 1016 | astaxie | 19 | | +I | 1017 | alan | 18 | | +I | 1018 | chan | 19 | +----+-------------+--------------------------------+-------------+ Received a total of 15 rows
2、maven依赖
UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.13.6
jdk.tools
jdk.tools
1.8
system
${JAVA_HOME}/lib/tools.jar
org.apache.flink
flink-clients_2.12
${flink.version}
org.apache.flink
flink-scala_2.12
${flink.version}
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-streaming-scala_2.12
${flink.version}
org.apache.flink
flink-streaming-java_2.12
${flink.version}
provided
org.apache.flink
flink-table-api-scala-bridge_2.12
${flink.version}
org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}
org.apache.flink
flink-table-planner-blink_2.12
${flink.version}
provided
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
provided
org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}
provided
org.apache.flink
flink-connector-jdbc_2.12
${flink.version}
provided
org.apache.flink
flink-csv
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.flink
flink-connector-hive_2.12
${flink.version}
provided
org.apache.hive
hive-metastore
2.1.0
org.apache.hive
hive-exec
3.1.2
provided
org.apache.flink
flink-shaded-hadoop-2-uber
2.7.5-10.0
provided
mysql
mysql-connector-java
5.1.38
provided
<!--8.0.20 -->
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
com.alibaba
fastjson
1.2.44
org.projectlombok
lombok
1.18.2
provided
3、使用 Table API 创建hive表并注册到hivecatalog示例
用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
下文示例是以hivecatalog为例,关于更多的hivecatalog将在其他的专题中介绍。
需要说明的是本示例运行时需要将hadoop环境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar复制一份到flink的lib目录(/usr/local/bigdata/flink-1.13.5/lib),此处做法的原因是本人的hadoop环境中配置了lzo的压缩方式。
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
* @throws DatabaseAlreadyExistException
* @throws TableAlreadyExistException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 数据库名称
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注册的catalog
tenv.useCatalog("alan_hive");
List tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
//创建数据库
// public CatalogDatabaseImpl(Map properties, @Nullable String comment) {
// this.properties = checkNotNull(properties, "properties cannot be null");
// this.comment = comment;
// }
Map properties = new HashMap();
// properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
// properties.put("connector", "COLLECTION");
CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
hiveCatalog.createDatabase(newDatabaseName, cd, true);
//创建表
String tableName = "alan_hivecatalog_hivedb_testTable";
// public ObjectPath(String databaseName, String objectName)
ObjectPath path = new ObjectPath(newDatabaseName, tableName);
// public CatalogTableImpl( TableSchema tableSchema, Map properties, String comment)
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
// public CatalogTableImpl(TableSchema tableSchema, Map properties, String comment)
CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
hiveCatalog.createTable(path, catalogTable, true);
List newTables = hiveCatalog.listTables(newDatabaseName);
for (String table : newTables) {
System.out.println("Database:alan_hivecatalog_hivedb tables:" + table);
}
//插入数据
String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
String selectSQL = "select * from " + newDatabaseName + "." + tableName;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
4、使用 SQL语句 创建hive表并注册到hivecatalog示例
本示例功能与上述的示例功能一样,其区别是使用的实现方式不同,即一个是通过api建表,一个是通过sql建表。
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 数据库名称
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
// Map properties = new HashMap();
// CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
// if (hiveCatalog.databaseExists(newDatabaseName)) {
// hiveCatalog.dropDatabase(newDatabaseName, true);
// }
// hiveCatalog.createDatabase(newDatabaseName, cd, true);
tenv.useDatabase(newDatabaseName);
// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
// if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
// hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
// }
tenv.executeSql(hive_create_table_sql);
// 插入数据
// String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
// String selectSQL = "select * from " + tableName;
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
5、验证
本示例是在flink集群中以命令形式提交的任务,其实通过web ui页面提交任务一样,不再赘述。
前提:
1、hadoop环境好用
2、hive环境好用
3、flink与hive集成环境完成且好用
4、启动flink集群,本文是以yarn-session形式启动的
1)、打包、上传
pom.xml文件中配置打包插件
src/main/java
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
1.8
1.8
<!--${project.build.sourceEncoding} -->
org.apache.maven.plugins
maven-surefire-plugin
2.18.1
false
true
**/*Test.*
**/*Suite.*
org.apache.maven.plugins
maven-shade-plugin
2.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
org.table_sql.TestCreateHiveTable
在cmd中打包或在开发工具中打包,本处是以cmd命令行打包
mvn package -Dmaven.test.skip=true # 直到看到 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 18.304 s
将打包后的jar文件上传至flink集群中并运行即可。
2)、提交任务
#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar #如果配置了flink的环境变量直接运行下面的命令;如果没有配置flink的环境变量则需要切换到flink的bin目录运行下面命令 flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
3)、验证
# 1、提交任务后运行情况
[alanchan@server1 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
`id` INT,
`name` STRING,
`age` INT
)
2023-08-31 00:18:17,806 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms
# 2、在flink sql cli中查询表及其数据
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.
Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row
#以上,验证完毕
以上,本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/0c82c97aea.html
