OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步

《OpenShift / RHEL / DevSecOps 汇总目录》
说明:本文已经在OpenShift 4.10环境中验证

文章目录

  • 场景说明
  • 部署环境
    • 安装CDC源和目标数据库
      • 安装 MySQL
      • 安装 PostgreSQLSQL
    • 安装 AMQ Stream 环境
      • 安装 AMQ Stream Opeartor
      • 创建 Kafka 实例
      • 创建 KafkaConnect 用到的 Image
      • 配置 KafkaConnect
      • 配置 KafkaConnector
        • MySqlConnector
        • JdbcSinkConnector
      • 环境检查
  • CDC 验证
    • 数据同步
    • 添加数据
    • 更新数据
    • 删除数据
  • 参考

场景说明 本文使用 OpenShift 的 AMQ Steams(即企业版 Kafka)和 Redhat 主导的 CDC 开源项目 Debezium 来实现从 MySQL 到 PostgreSQL 数据库的数据同步。
OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
文章图片

上图中的 Kafka Connector 提供了访问源或目标的参数, 而 Kafka Connect 为访问源或目标的实际运行环境,该环境运行在相关容器中。
注意:本文操作需要用到 access.redhat.com 账号,另外还需有一个镜像 Registry 服务的账号,本文使用的是 quay.io Registry 服务。
部署环境 首先创建一个项目
$ oc project db-cdc

安装CDC源和目标数据库 安装 MySQL
  1. 在 OpenShift 控制台的 “开发者” 视图的 “+添加” 中找到 “数据库”,然后点击 MySQL (Ephemeral)。在 “实例化模板” 界面中提供如图配置,最后点击创建。
    OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
    文章图片
  2. 运行以下命令,在 MySQL 中创建测试数据表和记录。
$ wget https://raw.githubusercontent.com/liuxiaoyu-git/debezium_openshift/master/mysql/inventory.sql $ MYSQL_POD=$(oc get pods --output=jsonpath={.items..metadata.name} -l name=mysql) $ oc cp inventory.sql $MYSQL_POD:/tmp/ $ oc exec $MYSQL_POD -- sh -c 'mysql -uroot < /tmp/inventory.sql' $ oc exec $MYSQL_POD -it -- mysql -u mysqluser -pmysqlpassword inventory mysql> select * from customers; +------+------------+-----------+-----------------------+ | id| first_name | last_name | email| +------+------------+-----------+-----------------------+ | 1001 | Sally| Thomas| sally.thomas@acme.com | | 1002 | George| Bailey| gbailey@foobar.com| | 1003 | Edward| Walker| ed@walker.com| | 1004 | Anne| Kretchmar | annek@noanswer.org| +------+------------+-----------+-----------------------+

安装 PostgreSQLSQL
  1. 在 OpenShift 控制台的 “开发者” 视图的 “+添加” 中找到 “数据库”,然后点击 PostgreSQL (Ephemeral)。在 “实例化模板” 界面中提供如图配置,最后点击创建。
    OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
    文章图片
  2. 运行以下命令,确认当前在 PostgreSQL 中无 customers 表。
$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers; ERROR 1146 (42S02): Table 'inventory.customers1' doesn't exist

安装 AMQ Stream 环境 安装 AMQ Stream Opeartor
在 OpenShift 中使用默认配置安装 AMQ Stream Opeartor,步骤略。
创建 Kafka 实例
在安装好的 AMQ Stream Opeartor 中根据以下配置创建 kafka 服务。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: '3.1' storage: type: ephemeral listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true version: 3.1.0 replicas: 3 entityOperator: topicOperator: {} userOperator: {} zookeeper: storage: type: ephemeral replicas: 3

创建后会在 OpenShift 中看到部署的相关资源。
OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
文章图片

创建 KafkaConnect 用到的 Image
  1. 登录 registry.redhat.io。
$ podman login registry.redhat.io

  1. 使用以下内容创建名为 Dockerfile 的文件,其中包含访问 MySQL 日志的 Connect Plugin、访问 PostgreSQL 和 Kafka 的 JDBC Driver。
FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0-4.1652296055USER root:rootRUN mkdir -p /opt/kafka/plugins/debeziumARG POSTGRES_VERSION=42.2.8 ARG KAFKA_JDBC_VERSION=5.3.2 ARG MYSQL_CONNECTOR_PLUGIN_VERSION=1.9.5# Deploy MySQL Connect Plugin RUN cd /opt/kafka/plugins/debezium/ &&\ curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/$MYSQL_CONNECTOR_PLUGIN_VERSION.Final/debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\ tar -xf debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\ rm -f debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz# Deploy PostgreSQL JDBC Driver RUN cd /opt/kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar# Deploy Kafka Connect JDBC RUN cd /opt/kafka/plugins/debezium/ &&\ curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jarUSER 1001

  1. 在 Dockerfile 文件所在目录运行命令,在本地构建镜像。说明:请根据镜像 Registry 服务地址和账号名修改命令中的镜像名称。
$ podman build -t quay.io/dawnskyliu/connect-debezium:v1 .

  1. 登录 Registry 服务器,然后将生成的本地镜像推送到镜像 Registry 服务器上。
$ podman login quay.io $ podman push quay.io/dawnskyliu/connect-debezium:v1

  1. 在镜像 Registry 服务器上确认 connect-debezium 是 Public 访问类型的 Repository。
    OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
    文章图片
配置 KafkaConnect
在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnect 对象,其中使用了前面生成的 “quay.io/dawnskyliu/connect-debezium:v1” 镜像。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: version: 3.1.0 replicas: 1 image: 'quay.io/dawnskyliu/connect-debezium:v1' bootstrapServers: 'my-cluster-kafka-bootstrap:9093' tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 config.storage.min.insync.replicas: 1 offset.storage.min.insync.replicas: 1 status.storage.min.insync.replicas: 1

配置 KafkaConnector
MySqlConnector 在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnector 对象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: mysql-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: "database.hostname": "mysql" "database.ssl.mode": "disabled" "database.allowPublicKeyRetrieval": "true" "database.port": "3306" "database.user": "debezium" "database.password": "dbz" "database.server.id": "1" "database.server.name": "dbserver1" "database.include": "inventory" "database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092" "database.history.kafka.topic": "schema-changes.inventory" "transforms": "route" "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter" "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)" "transforms.route.replacement": "$3"

JdbcSinkConnector 在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnector 对象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: postgresql-sink-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.confluent.connect.jdbc.JdbcSinkConnector tasksMax: 1 config: "topics": "customers" "connection.url": "jdbc:postgresql://postgresql:5432/inventory?user=postgresuser&password=postgrespw" "transforms": "unwrap" "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" "transforms.unwrap.drop.tombstones": "false" "auto.create": "true" "insert.mode": "upsert" "delete.enabled": "true" "pk.fields": "id" "pk.mode": "record_key"

环境检查
在 AMQ Streams 的 Operator 中确认 Kafka,Kafka Connect 和 Kafka Connector 的运行状态。
OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
文章图片

OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
文章图片

OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步
文章图片

CDC 验证 数据同步 确认 customers 表和数据已经从 MySQL 同步到 PostgreSQL 中。
$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers; last_name |id| first_name |email -----------+------+------------+----------------------- Thomas| 1001 | Sally| sally.thomas@acme.com Bailey| 1002 | George| gbailey@foobar.com Walker| 1003 | Edward| ed@walker.com Kretchmar | 1004 | Anne| annek@noanswer.org (4 rows)

添加数据 在 MySQL 中执行命令添加新数据,然后在 PostgreSQL 确认变化数据已同步。
mysql> INSERT INTO customers VALUES (default,"test1","test1","test1@acme.com");

更新数据 在 MySQL 中执行命令更新数据,然后在 PostgreSQL 确认变化数据已同步。
mysql> update customers set first_name='Test' where id = 1001;

删除数据 在 MySQL 中执行命令删除数据,然后在 PostgreSQL 确认变化数据已同步。
mysql> delete from customers where first_name='test1';

参考 【OpenShift|OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步】https://github.com/liuxiaoyu-git/debezium_openshift
https://debezium.io/documentation/reference/1.9/operations/openshift.html
https://github.com/debezium/debezium-examples/tree/main/openshift
https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/

    推荐阅读