0%

Flink Catalog 介绍

0. 引言

这篇文章我们介绍了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫 Catalog。下面是 Flink 的 Catalog 的官方定义。

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

简单来说,Catalog 就是元数据管理中心,其中元数据包括数据库、表、表结构等信息。

1. Catalog 定义

Flink 的 Catalog 相关代码定义在 catalog.java 文件中,是一个 interface,如下。

1
2
3
4
5
6
7
8
/**
* This interface is responsible for reading and writing metadata such as database/table/views/UDFs
* from a registered catalog. It connects a registered catalog and Flink's Table API.
*/
@PublicEvolving
public interface Catalog {
...
}

既然是 interface,我们来看一下支持的操作。

image-20200726120334206

我们可以将这些接口做一个简单的分类。

  • Database 相关操作

    • getDefaultDataBase:获取默认的 database
    • getDatabase:获取特定的 database
    • listDatabases:列出所有的 database
    • databaseExists:判断 database 是否存在
    • createDatabases:创建 database
    • dropDatabases:删除 database
    • alterDatabases:修改 database
  • Table 相关操作,一般都会有个参数是 database

    • listTables:列出所有的 table 和 view
    • getTable:获取指定的 table 或者 view
    • tableExist:判断 table 或者 view 是否存在
    • dropTable:删除 table 或者 view
    • createTable:创建 table 或者 view
    • renameTable:重命名 table 或者 view
    • alterTable:修改 table 或者 view
  • View 相关操作,除了和 table 共用方法外,还有一个独有的方法

    • listViews:列出所有的 view
  • Partition 相关操作,partition 是 table 的一个属性,所以参数一般都会带有 table 信息

    • listPartition:列出 table 的所有 partition
    • getPartition:获取指定的 partition
    • partitionExist:判断 parition 是否存在
    • createPartition:创建 partition
    • dropPartition:删除 partition
    • alterPartition:修改 parition
  • Function 相关操作,这里的 function 知道的是用户自定义的 function,也就是 Udf

    • listFunctions:列出所有的 function

    • getFunction:获取指定的 func

    • functionExist:判断 function 是否存在

    • dropFunction:删除 function

    • alterFunction:修改 function

2. Catalog 的实现

Catalog 作为 interface,我们看一下有哪些实现类。下图是通过 Idea 查看 Type Hierarchy 的结果。

image-20200726125927099

从上图我们可以看到 Catalog 的最终实现有三个类:

  • HiveCatalog:使用 Hive 的元数据来作为 Flink 的 HiveCatalog
  • GenericInMemoryCatalog:使用内存实现 Catalog
  • JdbcCatalog:使用其他支持 jdbc 协议的关系型数据库来存储元数据
  • PostgresCatalog:使用 Postgres 数据库来作为 Catalog 存储元数据

3. Catalog 使用举例

下面的示例是 Flink SQL 使用 Catalog 的示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TableEnvironment tableEnv = ...

// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");

// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // should return the tables in current catalog and database.

下面是 api 的方式来使用 Catalog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.descriptors.Kafka;

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));

// Create a catalog table
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();

catalog.createTable(
new ObjectPath("mydb", "mytable"),
new CatalogTableImpl(
schema,
new Kafka()
.version("0.11")
....
.startFromEarlist()
.toProperties(),
"my comment"
),
false
);

List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

4. 自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。 想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

5. 总结

这篇文章写的比较简单,相当于自己的学习笔记,下一篇文章我们比较一下 Spark 的 Catalog 实现。