Kafka 提供了两种自动创建主题的机制。您可以为 Kafka 代理启用自动主题创建,并且从 Kafka 2.6.0 开始,您还可以启用 Kafka Connect 来创建主题。Kafka 代理使用该auto.create.topics.enable属性来控制自动主题创建。在 Kafka Connect 中,该topic.creation.enable属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都会启用自动主题创建。
启用自动创建主题后,如果 Debezium 源连接器为已不存在目标主题的表发出更改事件记录,则在将事件记录摄取到 Kafka 时在运行时创建主题。
代理创建的主题仅限于共享单个默认配置。代理不能将唯一配置应用于不同的主题或主题集。相比之下,Kafka Connect 可以在创建主题、设置复制因子、分区数量以及 Debezium 连接器配置中指定的其他主题特定设置时应用多种配置中的任何一种。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组相关联。
代理配置和 Kafka Connect 配置相互独立。无论您是否在代理上禁用主题创建,Kafka Connect 都可以创建主题。如果您在代理和 Kafka Connect 中都启用了自动主题创建,则 Connect 配置优先,并且仅当 Kafka Connect 配置中的任何设置都不适用时,代理才会创建主题。
禁用 Kafka 代理的自动主题创建
默认情况下,如果主题尚不存在,Kafka 代理配置允许代理在运行时创建主题。代理创建的主题不能使用自定义属性进行配置。如果您使用早于 2.6.0 的 Kafka 版本,并且想要创建具有特定配置的主题,则必须在代理处禁用自动主题创建,然后手动或通过自定义部署过程显式创建主题。
在代理配置中,将 的值设置
auto.create.topics.enable为false。
设置 Kafka 连接
Kafka Connect 中的自动主题创建由topic.creation.enable属性控制。该属性的默认值为true,启用自动创建主题,如下例所示:
topic.creation.enable = true该topic.creation.enable属性的设置适用于 Connect 集群中的所有工作人员。
Kafka Connect 自动主题创建要求您定义 Kafka Connect 在创建主题时应用的配置属性。通过定义主题组,然后指定要应用于每个组的属性,您可以在 Debezium 连接器配置中指定主题配置属性。连接器配置定义了一个默认主题创建组,以及一个或多个自定义主题创建组(可选)。自定义主题创建组使用主题名称模式列表来指定应用组设置的主题。
默认情况下,Kafka Connect 创建的主题基于模式命名server.schema.table,例如dbserver.myschema.inventory.
如果您不想让 Kafka Connect 自动创建主题, |
Kafka Connect 自动主题创建要求至少为主题创建组设置 |
配置
要让 Kafka Connect 自动创建主题,它需要来自源连接器的有关在创建主题时应用的配置属性的信息。您可以在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 为连接器发出的事件记录创建主题时,生成的主题会从适用的组中获取它们的配置。该配置仅适用于该连接器发出的事件记录。
主题创建组
一组主题属性与主题创建组相关联。最低限度,您必须定义一个default主题创建组并指定其配置属性。除此之外,您还可以选择定义一个或多个自定义主题创建组并为每个组指定唯一属性。
创建自定义主题创建组时,您可以根据主题名称模式为每个组定义成员主题。您可以指定命名模式来描述要从每个组中包含或排除的主题。include和exclude属性包含定义主题名称模式的正则表达式的逗号分隔列表。例如,如果您希望组包含以字符串 开头的所有主题,请将其属性dbserver1.inventory的值设置为。topic.creation.inventory.includedbserver1\\.inventory\\.*
如果您为自定义主题组同时指定 |
主题创建组配置属性
主题创建组和每个default自定义组都与一组唯一的配置属性相关联。您可以配置组以包含任何Kafka 主题级配置属性。例如,您可以指定旧主题段的清理策略、保留时间或主题组的主题压缩类型。您必须至少定义一组属性来描述要创建的主题的配置。
如果没有注册自定义组,或者include任何注册组的模式与要创建的任何主题的名称不匹配,则 Kafka Connect 使用default组的配置来创建主题。
有关通用主题配置注意事项,请参阅Debezium 安装指南中的配置 Debezium 主题。
默认组配置
在您可以使用 Kafka Connect 自动创建主题之前,您必须创建一个默认主题创建组并为其定义配置。默认主题创建组的配置应用于名称与include自定义主题创建组的列表模式不匹配的任何主题。
要为
topic.creation.default组定义属性,请将它们添加到连接器配置 JSON,如以下示例所示:{ ... "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 10, "topic.creation.default.cleanup.policy": "compact", "topic.creation.default.compression.type": "lz4" ... }您可以在组的配置中包含任何Kafka 主题级别的配置属性
default。
| 物品 | 描述 |
|---|---|
1 |
|
2 |
|
3 |
|
4 |
|
|
自定义组配置
您可以定义多个自定义主题组,每个主题组都有自己的配置。
要定义自定义主题组,请将属性添加到连接器 JSON,并在组名称后列出自定义组的属性。
topic.creation.<group_name>.include以下示例显示了自定义主题创建组
inventory的示例配置:applicationlogs{ ... "topic.creation.inventory.include": "dbserver1\\.inventory\\.*", "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
| 物品 | 描述 |
|---|---|
1 | 定义 |
2 |
|
3 | 定义 |
4 |
|
5 |
|
注册自定义组
为任何自定义主题创建组指定配置后,注册组。
通过将
topic.creation.groups属性添加到连接器 JSON 并指定以逗号分隔的组列表来注册自定义组。以下示例注册自定义主题创建组
inventory和applicationlogs:{ ... "topic.creation.groups": "inventory,applicationlogs", ... }
以下示例显示了一个完整的配置,其中包括default主题组的配置,inventory以及applicationlogs自定义主题创建组的配置:
{
...
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4"
"topic.creation.groups": "inventory,applicationlogs",
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
"topic.creation.inventory.partitions": 20,
"topic.creation.inventory.cleanup.policy": "compact",
"topic.creation.inventory.delete.retention.ms": 7776000000,
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
"topic.creation.applicationlogs.replication.factor": 1,
"topic.creation.applicationlogs.partitions": 20,
"topic.creation.applicationlogs.cleanup.policy": "delete",
"topic.creation.applicationlogs.retention.ms": 7776000000,
"topic.creation.applicationlogs.compression.type": "lz4"
}