使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

news/2025/2/21 23:41:23

文章目录

  • 前言
  • 一、背景
  • 二、Canal 简介
  • 三、主库数据库配置
      • 1.主库配置
      • 2.创建 Canal 用户并授予权限
  • 四.配置 Canal Server
      • 1.Canal Server 配置文件
      • 2.启动 Canal Server
  • 五.开发 Spring Boot 客户端
      • 1. 引入依赖
      • 2. 配置 Canal 客户端
      • 3. 实现数据同步逻辑
  • 六.启动并测试
  • 七.注意事项
  • 八.总结


前言

在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步


一、背景

假设我们有以下数据库架构:

  • 两个主库:db_1 和 db_2。
    每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我们的目标是:
    将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
    将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 简介

Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。

三、主库数据库配置

1.主库配置

为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:

  • 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
  • 配置 server-id:为每个主库设置唯一的 server-id。
  • 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。

编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:

[mysqld]
# 开启 Binlog 日志
log-bin=mysql-bin
# 设置 Binlog 格式为 ROW 模式
binlog-format=ROW
# 设置唯一的 server-id
server-id=1

注意:

  • 如果你有多个主库,每个主库的 server-id 必须是唯一的。
  • 修改配置后,需要重启 MySQL 服务以使配置生效。

2.创建 Canal 用户并授予权限

Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:

# 登录 MySQL
mysql -u root -p
# 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新权限
FLUSH PRIVILEGES;

说明:

  • canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
  • 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。

1.Canal Server 配置文件

在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表
canal.instance.filter.regex=db_2\\..*

2.启动 Canal Server

使用以下命令启动 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
  • 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。

五.开发 Spring Boot 客户端

Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。

1. 引入依赖

在 Spring Boot 项目的 pom.xml文件中,引入 Canal 客户端依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.8</version>
</dependency>

2. 配置 Canal 客户端

application.yml 文件中,配置 Canal Server 的地址:

canal:
  server.ip: canal_server_ip
  server.port: 11111

3. 实现数据同步逻辑

创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。
CanalClientService.java:

@Service
public class CanalClientService {
    private final CanalConnector canalConnector;

    public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {
        this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");
    }

    @PostConstruct
    public void start() {
        canalConnector.connect();
        canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表
        new Thread(this::process).start();
    }

    private void process() {
        while (true) {
            Message message = canalConnector.getWithoutAck(100);
            long batchId = message.getId();
            if (batchId == -1 || message.getEntries().isEmpty()) {
                continue;
            }
            for (Entry entry : message.getEntries()) {
                handleData(entry);
            }
            canalConnector.ack(batchId);
        }
    }

    private void handleData(Entry entry) {
        String schemaName = entry.getHeader().getSchemaName(); // 数据库
        String tableName = entry.getHeader().getTableName();  // 表名
        EventType eventType = entry.getHeader().getEventType(); // 数据变更类型

        System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);

        // 根据来源数据库同步到对应的从库
        if ("db_1".equals(schemaName)) {
            syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");
        } else if ("db_2".equals(schemaName)) {
            syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");
        }
    }

    private void syncToBackupDbs(Entry entry, String... backupDbs) {
        // 根据事件类型同步到从库
        if (entry.getHeader().getEventType() == EventType.INSERT) {
            for (String db : backupDbs) {
                syncInsert(entry, db);
            }
        } else if (entry.getHeader().getEventType() == EventType.UPDATE) {
            for (String db : backupDbs) {
                syncUpdate(entry, db);
            }
        } else if (entry.getHeader().getEventType() == EventType.DELETE) {
            for (String db : backupDbs) {
                syncDelete(entry, db);
            }
        }
    }

    private void syncInsert(Entry entry, String backupDb) {
        // 使用 MyBatis 将数据插入到对应的从库
        System.out.println("INSERT into " + backupDb);
    }

    private void syncUpdate(Entry entry, String backupDb) {
        // 使用 MyBatis 将数据更新到对应的从库
        System.out.println("UPDATE into " + backupDb);
    }

    private void syncDelete(Entry entry, String backupDb) {
        // 使用 MyBatis 将数据从对应的从库删除
        System.out.println("DELETE from " + backupDb);
    }
}

六.启动并测试

  • 启动 Canal Server。
  • 启动 Spring Boot 应用。
  • 在主库 db_1 或 db_2 中插入、更新或删除数据。
  • 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

七.注意事项

  • 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
  • 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
  • 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
  • Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。

八.总结

通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。


http://www.niftyadmin.cn/n/5861421.html

相关文章

uniapp 使用unplugin-auto-import 后, vue文件报红问题

现象 vite.config.js中已配置好unplugin-auto-import 解决方法 让你的auto-imports.d.ts文件, 处于打开状态 尼…玛…,的 这谁敢信 重新打开编译器后还会有这个问题 关掉auto-imports.d.ts文件, 重新打开 吐槽 我真$%$$#$%^&^%&^%^服了 各种改vite-config.js和ts…

使用Geotools读取DEM地形数据实战-以湖南省30米数据为例

目录 前言 一、DEM地形数据介绍 1、DEM数据简介 2、DEM应用领域 3、QGIS中读取DEM数据 二、GeoTools解析地形 1、Maven中依赖引用 2、获取数据基本信息 三、总结 前言 随着全球数字化进程的加速&#xff0c;各类地理空间数据呈爆炸式增长&#xff0c;DEM 数据作为其中的…

前后端项目部署服务器(传统部署和Docker部署)

内外网 开发环境连外网&#xff08;8.140.26.187&#xff09;&#xff0c;测试/生产环境连内网&#xff08;172.20.59.17&#xff09; 内外网地址不同&#xff0c;但指定的库是同一个 内网IP地址范围包括&#xff1a; 10.0.0.0 到 10.255.255.255172.16.0.0 到 172.31.2551…

【VUE面试】Vue2和Vue3的diff算法中,key的作用分别是什么?

在 Vue2 和 Vue3 的 Diff 算法里&#xff0c;key 都起着关键作用&#xff0c;但由于两个版本的 Diff 算法实现有所不同&#xff0c;key 的具体作用和使用场景也存在一些差异。以下详细介绍&#xff1a; Vue2 中 key 的作用 1. 辅助节点复用 在 Vue2 的 Diff 算法中&#xff0…

Spring IoC DI

一、IoC详解 控制反转&#xff08;Inversion of Control&#xff0c;IoC&#xff09;是一种软件设计原则&#xff0c;其核心思想是将程序中的对象创建、依赖管理和生命周期控制的权力从应用程序代码转移到外部容器或框架&#xff0c;从而降低组件间的耦合度&#xff0c;提升代码…

ok113i平台——多媒体播放器适配

1. 视频播放支持 1.1 在Linux平台交叉编译ffmpeg动态库&#xff0c;详情查看《ok113i平台——交叉编译音视频动态库》 提取如下动态库&#xff1a; libavcodec.so.58.134.100 libavdevice.so.58.13.100 libavfilter.so.7.110.100 libavformat.so.58.76.100 libavutil.so.56.…

AI赋能Web3.0前端开发:效率革命与ScriptEcho的实践

Web3.0浪潮席卷全球&#xff0c;前端开发作为用户体验的关键环节&#xff0c;面临着前所未有的挑战。如何高效、低成本地构建复杂的去中心化应用&#xff08;DApps&#xff09;成为行业关注的焦点。幸运的是&#xff0c;AI写代码工具的兴起为Web3.0前端开发带来了新的曙光&…

JavaScript 高级程序设计 读书笔记(第三章)

JavaScript 高级程序设计 读书笔记&#xff08;第三章&#xff09; 前几天小哆啦去参加了一场超有挑战性的面试&#xff0c;本以为凭借自己的聪明才智和平时积累的小本事&#xff0c;肯定能轻松应对。可没想到&#xff0c;面试官抛出的一些关于 JavaScript 的深入问题&#xff…