Debezium同步之oracle数据到Kafka的同步

   

目录

   一、概述

   二、设置oracle

2.1 与 Oracle 安装类型的兼容性

2.2 准备数据库

2.3 重做日志大小

2.4 为连接器创建用户

2.5 备用数据库

2.6 故障转移数据库

   三、部署

3.1 Debezium Oracle 连接器配置

3.2 可插拔与不可插拔数据库

3.3 添加连接器配置


   一、概述

Debezium 的 Oracle 连接器捕获并记录发生在 Oracle 服务器上的数据库中的行级更改,包括在连接器运行时添加的表。您可以将连接器配置为针对特定的模式和表子集发出更改事件,或者忽略、屏蔽或截断特定列中的值。

有关与此连接器兼容的 Oracle 数据库版本的信息,请参阅Debezium 版本概述

Debezium 通过使用本机 LogMiner 数据库包或XStream API从 Oracle 获取更改事件。虽然连接器可能适用于各种 Oracle 版本和版本,但只有 Oracle EE 12 和 19 已经过测试。

   二、设置oracle

要设置 Oracle 以与 Debezium Oracle 连接器一起使用,需要执行以下步骤。这些步骤假设将多租户配置与容器数据库和至少一个可插拔数据库一起使用。如果您不打算使用多租户配置,则可能需要调整以下步骤。

有关使用 Vagrant 在虚拟机中设置 Oracle 的信息,请参阅Debezium Vagrant Box for Oracle 数据库GitHub 存储库。

2.1 与 Oracle 安装类型的兼容性

Oracle 数据库可以作为独立实例安装,也可以使用 Oracle Real Application Cluster (RAC) 安装。Debezium Oracle 连接器与这两种安装类型兼容。

2.2 准备数据库

Oracle LogMiner 所需的配置
ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

此外,必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面说明了如何在特定表上进行配置,这是最大限度减少 Oracle 重做日志中捕获的信息量的理想选择。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

必须在数据库级别启用最小补充日志记录,并且可以按如下方式进行配置。

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

2.3 重做日志大小

根据数据库配置,重做日志的大小和数量可能不足以实现可接受的性能。在设置 Debezium Oracle 连接器之前,请确保重做日志的容量足以支持数据库。

数据库的重做日志容量必须足以存储其数据字典。一般来说,数据字典的大小随着数据库中表和列的数量而增加。如果重做日志容量不足,数据库和 Debezium 连接器都可能会遇到性能问题。

请咨询您的数据库管理员以评估数据库是否可能需要增加日志容量。

2.4 为连接器创建用户

为了让 Debezium Oracle 连接器捕获更改事件,它必须以具有特定权限的 Oracle LogMiner 用户身份运行。以下示例显示了用于在多租户数据库模型中为连接器创建 Oracle 用户帐户的 SQL。

连接器捕获由其自己的 Oracle 用户帐户所做的数据库更改。但是,它不会捕获由SYSSYSTEM用户帐户所做的更改。

创建连接器的 LogMiner 用户
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; 
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; 
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; 
  GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; 

  GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; 
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; 
  GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; 

  GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; 
  GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; 

  GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; 
  GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; 

  exit;

权限/授权的描述:

2.5 备用数据库

可以为 Oracle 数据库配置物理或逻辑备用环境,以便在生产故障后进行恢复。此时,Debezium Oracle 连接器无法使用物理或逻辑备用数据库作为更改事件源。有一个开放的Jira 问题可以调查这种支持。

2.6 故障转移数据库

在 Oracle 生产故障的情况下,通常会存在逻辑或物理备用。当发生故障并将备用实例提升到生产环境时,必须先打开数据库进行读/写事务,然后 Debezium Oracle 连接器才能连接到数据库。

在物理备用的情况下,备用是生产的精确副本,这意味着 SCN 值是相同的。使用物理备用数据库时,只需重新配置 Debezium Oracle 连接器以在数据库打开后使用备用数据库的主机名即可。

对于逻辑备用数据库,备用数据库不是生产数据库的精确副本,因此备用数据库中的 SCN 偏移量与生产数据库中的不同。如果您使用逻辑备用,以帮助确保 Debezium 不会错过任何更改事件,在数据库打开后,配置新的连接器并执行新的数据库快照。

   三、部署

要部署 Debezium Oracle 连接器,您需要安装 Debezium Oracle 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件
程序
  1. 下载 Debezium Oracle 连接器插件存档

  2. 将文件提取到您的 Kafka Connect 环境中。

  3. 从 Maven Central 下载 Oracle的JDBC 驱动程序,并将下载的驱动程序文件解压缩到包含 Debezium Oracle 连接器 JAR 文件的目录。

  4. 将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.

  5. 重新启动 Kafka Connect 进程以获取新的 JAR 文件。

3.1 Debezium Oracle 连接器配置

通常,您通过提交指定连接器配置属性的 JSON 请求来注册 Debezium Oracle 连接器。server1以下示例显示了一个 JSON 请求,用于在端口 1521注册具有逻辑名称的 Debezium Oracle 连接器实例:

您可以选择为数据库中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。

示例:Debezium Oracle 连接器配置
{
    "name": "inventory-connector",  
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",  
        "database.hostname" : "<ORACLE_IP_ADDRESS>",  
        "database.port" : "1521",  
        "database.user" : "c##dbzuser",  
        "database.password" : "dbz",   
        "database.dbname" : "ORCLCDB",  
        "database.server.name" : "server1",  
        "tasks.max" : "1",  
        "database.pdb.name" : "ORCLPDB1",  
        "database.history.kafka.bootstrap.servers" : "kafka:9092", 
        "database.history.kafka.topic": "schema-changes.inventory"  
    }
}

1当您向 Kafka Connect 服务注册连接器时分配给连接器的名称。
2此 Oracle 连接器类的名称。
3Oracle 实例的地址。
4Oracle 实例的端口号。
5Oracle 用户的名称,在为连接器创建用户中指定。
6Oracle 用户的密码,在为连接器创建用户中指定。
7要从中捕获更改的数据库的名称。
8为连接器捕获更改的 Oracle 数据库服务器标识并提供命名空间的逻辑名称。
9为此连接器创建的最大任务数。
10连接器从中捕获更改的 Oracle 可插拔数据库的名称。仅用于容器数据库 (CDB) 安装。
11此连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。
12连接器写入和恢复 DDL 语句的数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。

在前面的示例中,database.hostnamedatabase.port属性用于定义与数据库主机的连接。但是,在更复杂的 Oracle 部署中,或在使用 TNS 名称的部署中,您可以使用指定 JDBC URL 的替代方法。

以下 JSON 示例显示与前面示例中相同的配置,不同之处在于它使用 JDBC URL 连接到数据库。

示例:使用 JDBC URL 连接到数据库的 Debezium Oracle 连接器配置
{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

3.2 可插拔与不可插拔数据库

Oracle 数据库支持以下部署类型:

容器数据库 (CDB)

可以包含多个可插拔数据库 (PDB) 的数据库。数据库客户端连接到每个 PDB,就好像它是标准的非 CDB 数据库一样。

非容器数据库(non-CDB)

标准 Oracle 数据库,不支持创建可插拔数据库。

示例:用于 CDB 部署的 Debezium 连接器配置
{
  "config": {
    "connector.class" : "io.debezium.connector.oracle.OracleConnector",
    "tasks.max" : "1",
    "database.server.name" : "server1",
    "database.hostname" : "<oracle ip>",
    "database.port" : "1521",
    "database.user" : "c##dbzuser",
    "database.password" : "dbz",
    "database.dbname" : "ORCLCDB",
    "database.pdb.name" : "ORCLPDB1",
    "database.history.kafka.bootstrap.servers" : "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

当您配置 Debezium Oracle 连接器以与 Oracle CDB 一起使用时,您必须为属性指定一个值,该值database.pdb.name命名您希望连接器从中捕获更改的 PDB。对于非 CDB 安装,不要指定属性database.pdb.name

示例:用于非 CDB 部署的 Debezium Oracle 连接器配置
{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

有关可以为 Debezium Oracle 连接器设置的配置属性的完整列表,请参阅Oracle 连接器属性

您可以使用命令将此配置发送POST到正在运行的 Kafka Connect 服务。该服务记录配置并启动执行以下操作的连接器任务:

  • 连接到 Oracle 数据库。

  • 读取重做日志。

  • 将更改事件记录到 Kafka 主题。

3.3 添加连接器配置

要开始运行 Debezium Oracle 连接器,请创建一个连接器配置,并将该配置添加到您的 Kafka Connect 集群。

先决条件
程序
  1. 为 Oracle 连接器创建配置。

  2. 使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。

结果

连接器启动后,它会为连接器配置的 Oracle 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。  


版权声明:本文为Auspicious_air原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。