17、Canal監聽MySQL-Binlog實現數據監聽
一、Canal簡介:
Canal 是(shi)阿里(li)巴(ba)巴(ba)開源的一款基(ji)于數據(ju)庫(ku)增量日志解析的中間件,主要用于實(shi)現數據(ju)庫(ku)變更數據(ju)的實(shi)時同(tong)步。

二、工作原理:
1、MySQL主備復制原理:

(1)、MySQL master 將(jiang)數(shu)據變(bian)更寫入二進(jin)(jin)制(zhi)日(ri)志(zhi)( binary log, 其中記錄叫做二進(jin)(jin)制(zhi)日(ri)志(zhi)事(shi)件binary log events,可以通(tong)過 show binlog events 進(jin)(jin)行查看)
(2)、MySQL slave 將 master 的(de) binary log events 拷貝到它的(de)中繼日志(relay log)
(3)、MySQL slave 重放 relay log 中事件(jian),將數據變更反(fan)映它(ta)自己的數據
2、canal工作原理:
(1)、canal 模(mo)擬 MySQL slave 的交互協(xie)議,偽(wei)裝自己為 MySQL slave ,向 MySQL master 發送dump 協(xie)議
(2)、MySQL master 收到 dump 請(qing)求,開始推送(song) binary log 給 slave (即 canal )
(3)、canal 解析 binary log 對象(xiang)(原始為 byte 流)
三、MySQL 配置(開啟 Binlog):
1、開啟 Binlog(ROW 模式):
# MySQL 配(pei)置(zhi)文件 # Linux:my.cnf配(pei)置(zhi)文件(/etc/mysql/) # Window:my.ini配置文(wen)件(C:\ProgramData\MySQL\MySQL Server 5.7\) # 開啟 Binlog log_bin = mysql-bin # 選擇 ROW 模式(記錄行級(ji)變更) binlog-format = ROW # 配置(zhi)數據庫唯一 ID(與 Canal 服(fu)務(wu)端的(de) slaveId 不同(tong)) server-id = 1


2、重啟 MySQL 并驗證:
# 打開命令提示符(cmd/services.msc): # 按 Win + R 鍵,輸(shu)入(ru) cmd,然后按(an) Enter 鍵打開命令(ling)提示符窗口。 # 停止MySQL服務: net stop MySQL57 # 啟動MySQL服務: net start MySQL57 # 驗證 SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format';


3、創建 Canal 專用賬號(權限最小化):
-- 1. 創建支持遠程連接的用戶(% 表示任意(yi) IP) -- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; -- 授予(yu)權限 -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- 2. 創建支持(chi)本地連接(jie)的用戶(localhost) CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal'; -- 授予相同(tong)權限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost'; -- 刷新權限,使配置(zhi)生效 FLUSH PRIVILEGES;
四、Canal 服務端配置:
1、下載并解壓 Canal 服務端:

2、配置 Canal 實例:
(1)、instance.properties配置:
# MySQL 主庫地址(zhi)(Canal 連接的(de) MySQL 地址(zhi)) canal.instance.master.address=127.0.0.1:3306 # MySQL 賬號密(mi)碼 canal.instance.dbUsername=canal canal.instance.dbPassword=canal

(2)、windows啟動(dong) Canal 服務端:
1)、雙擊啟動(dong)bin/startup.bat:

2)、存在黑屏閃退(tui),修改bin/startup.bat,重啟:

3)、日志:




五(wu)、SpringBoot整合Canal實現MySQL數據(ju)監聽:
1、POM配置:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.8</version>
</dependency>
2、YML配置:
canal: # 自動啟動同步標志位 auto-sync: true instances: # 第一個實例 instance1: host: 127.0.0.1 port: 11111 # canal server 中配置(zhi)的實例名(canal.destinations = example) name: example # 批量拉取條數 batch-size: 100 # 無數據時休眠時間(ms) sleep-time: 1000
3、Entity類聲明:
CanalProperties.class
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * Canal配置屬(shu)性類(映射(she)YAML配置) */ @Data @Component @ConfigurationProperties(prefix = "canal") public class CanalProperties { // 是否自動(dong)啟動(dong)同步(bu) private boolean autoSync = true; // 多實例(li)配置 private Map<String, InstanceConfig> instances = new HashMap<>(); @Data public static class InstanceConfig { private String host; private Integer port; private String name; private Integer batchSize = 100; private Integer sleepTime = 1000; } }
DataEventTypeEnum.enum
import org.springframework.util.StringUtils; import java.util.Arrays; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; public enum DataEventTypeEnum { INSERT("INSERT"), UPDATE("UPDATE"), DELETE("DELETE"); private final String name; DataEventTypeEnum(String name) { this.name = name; } public String NAME() { return name; } private static final Map<String, DataEventTypeEnum> NAME_MAP = Arrays.stream(DataEventTypeEnum.values()) .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity())); public static DataEventTypeEnum getEnum(String name) { if (!StringUtils.hasText(name)) { return null; } return NAME_MAP.get(name); } }
JsonMessageType.class
import lombok.Data; @Data public class JsonMessageType { /** * 庫名 */ private String schemaName; /** * 表名 */ private String tableName; /** * 事件類(lei)型 * (INSERT/UPDATE/DELETE) */ private String eventType; /** * 數據JSON字符串 */ private String data; }
4、CanalRunnerAutoConfig啟動Canal配置:
import com.iven.canal.entity.CanalProperties; import com.iven.canal.handle.CanalWorkRegistry; import com.iven.canal.utils.JsonMessageParser; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Canal自動配置(zhi) */ @Slf4j @Configuration @RequiredArgsConstructor public class CanalRunnerAutoConfig { private final CanalProperties canalProperties; private final JsonMessageParser jsonMessageParser; private final CanalWorkRegistry workRegistry; @Bean public ApplicationRunner canalApplicationRunner() { return args -> { if (!canalProperties.isAutoSync()) { log.info("Canal自動(dong)同(tong)步已關閉"); return; } // 如果(guo)沒有任何Work,則不(bu)啟(qi)動Canal if (!workRegistry.hasWork()) { log.info("無表同(tong)步(bu)處理器,不啟動Canal"); return; } // 啟動所有(you)配置的Canal實例 canalProperties.getInstances().forEach((instanceKey, config) -> { CanalRunner runner = new CanalRunner( config.getHost(), config.getPort(), config.getName(), config.getBatchSize(), config.getSleepTime(), jsonMessageParser, workRegistry ); runner.start(); }); }; } }
5、CanalRunner拉取數據:
import com.alibaba.fastjson2.JSON; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.iven.canal.entity.JsonMessageType; import com.iven.canal.handle.CanalWork; import com.iven.canal.handle.CanalWorkRegistry; import com.iven.canal.utils.JsonMessageParser; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * Canal運行(xing)器 * 手動管理生命周(zhou)期(qi) * * 1、啟動Canal實例 * 2、處(chu)理解(jie)析后的(de)數據(ju) */ @Slf4j public class CanalRunner { private Thread thread; private final String canalIp; private final Integer canalPort; private final String canalInstance; private final Integer batchSize; private final Integer sleepTime; private final JsonMessageParser jsonMessageParser; private final CanalWorkRegistry workRegistry; public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize, Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) { this.canalIp = canalIp; this.canalPort = canalPort; this.canalInstance = canalInstance; this.batchSize = batchSize; this.sleepTime = sleepTime; this.jsonMessageParser = jsonMessageParser; this.workRegistry = workRegistry; } /** * 啟動Canal實例(li) */ public void start() { if (thread == null || !thread.isAlive()) { thread = new Thread(this::run, "canal-runner-" + canalInstance); thread.start(); log.info("Canal實例[{}]啟動成(cheng)功", canalInstance); } } /** * 停止Canal實例 */ public void stop() { if (thread != null && !thread.isInterrupted()) { thread.interrupt(); } } private void run() { log.info("Canal實(shi)例[{}]啟動中(zhong)...", canalInstance); CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(canalIp, canalPort), canalInstance, "", ""); try { connector.connect(); // 訂閱所有表(后續通(tong)過(guo)Work過(guo)濾) connector.subscribe(); connector.rollback(); while (!thread.isInterrupted()) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); List<CanalEntry.Entry> entries = message.getEntries(); if (batchId == -1 || entries.isEmpty()) { Thread.sleep(sleepTime); } else { // 解析數(shu)據并(bing)處理 Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries); processParsedData(parsedData); // 確認處理(li)成功 connector.ack(batchId); } } } catch (InterruptedException e) { log.info("Canal實例[{}]被中斷", canalInstance); } catch (Exception e) { log.error("Canal實例[{}]運行(xing)異常", canalInstance, e); // 處理失敗回(hui)滾(gun) connector.rollback(); } finally { connector.disconnect(); log.info("Canal實(shi)例[{}]已停止(zhi)", canalInstance); } } /** * 調用Work處理解析后的(de)數(shu)據(ju) * * @param parsedData */ private void processParsedData(Map<String, List<JsonMessageType>> parsedData) { parsedData.forEach((tableKey, dataList) -> { // 獲取該表的所有Work List<CanalWork> works = workRegistry.getWorksByTable(tableKey); if (!works.isEmpty() && !dataList.isEmpty()) { // 轉換(huan)數據(ju)格(ge)式(Json字符串 -> Map) List<Map<String, Object>> dataMaps = dataList.stream() .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class)) .collect(Collectors.toList()); String schemaName = dataList.get(0).getSchemaName(); // 調用每個Work的處理方法(fa) works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName)); } }); } }
6、JsonMessageParser解析數據:
MessageParser
import com.alibaba.otter.canal.protocol.CanalEntry; import java.util.List; /** * 消息(xi)解析器接口 * */ public interface MessageParser<T> { T parse(List<CanalEntry.Entry> canalEntryList); }
import com.alibaba.fastjson2.JSON; import com.alibaba.otter.canal.protocol.CanalEntry; import com.iven.canal.entity.DataEventTypeEnum; import com.iven.canal.entity.JsonMessageType; import com.iven.canal.handle.CanalWorkRegistry; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.*; /** * Json消息解析器 * * 1、遍歷原始(shi)數據列表接收 * 2、解析行級變更數據 * 3、封裝為(wei) JsonParseType */ @Slf4j @Component @RequiredArgsConstructor public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> { private final CanalWorkRegistry workRegistry; @Override public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) { Map<String, List<JsonMessageType>> dataMap = new HashMap<>(); for (CanalEntry.Entry entry : canalEntryList) { if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) { continue; } // 1. 獲取庫名(ming)(ming)、表名(ming)(ming)、帶庫名(ming)(ming)的(de)表標(biao)識 String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); String fullTableName = schemaName + "." + tableName; // 2. 檢查是否(fou)有對(dui)應的處理(li)器(qi)(支持兩種格式) boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName)); boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName)); if (!hasFullTableWork && !hasSimpleTableWork) { log.debug("表(biao)[{}]和[{}]均(jun)無同步處理(li)器,跳過", fullTableName, tableName); continue; } try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); rowChange.getRowDatasList().forEach(rowData -> { JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData); if (jsonMessageType != null) { // 3. 按存(cun)在的處(chu)理器類型,分別添(tian)加到數(shu)據映射(she)中(zhong) if (hasFullTableWork) { dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType); } if (hasSimpleTableWork) { dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType); } } }); } catch (Exception e) { log.error("解析數據失(shi)敗", e); } } return dataMap; } private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { // 獲(huo)取(qu)庫名(ming) String schemaName = header.getSchemaName(); // 獲(huo)取(qu)表名 String tableName = header.getTableName(); if (eventType == CanalEntry.EventType.DELETE) { return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList()); } else if (eventType == CanalEntry.EventType.UPDATE) { return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList()); } return null; } private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType, List<CanalEntry.Column> columns) { Map<String, String> data = new HashMap<>(); columns.forEach(column -> data.put(column.getName(), column.getValue())); JsonMessageType result = new JsonMessageType(); result.setSchemaName(schemaName); result.setTableName(tableName); result.setEventType(eventType); result.setData(JSON.toJSONString(data)); return result; } }
7、CanalWorkRegistry匹配處理器:
CanalWork
import java.util.List; import java.util.Map; /** * Canal-Work處理器 * */ public interface CanalWork { /** * 返(fan)回需要處(chu)理的表(biao)名(ming)(如:tb_user) */ String getTableName(); /** * 處(chu)理表(biao)數據(ju)的方法 * @param dataList 表(biao)數據(ju)列表(biao)(每條數據(ju)是(shi)字段名-值(zhi)的Map) * @param eventType 事件類(lei)型(INSERT/UPDATE/DELETE) * @param schemaName 庫(ku)名(用于區分(fen)不同庫(ku)的表(biao)) */ void handle(List<Map<String, Object>> dataList, String eventType, String schemaName); }
CanalWorkRegistry
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * 處理(li)器注冊(ce)器, * 掃描并緩存(cun)所(suo)有CanalWork實現類(lei),按(an)表名(ming)分組管理(li),提供查詢表對應處理(li)器的(de)方法 */ @Slf4j @Component public class CanalWorkRegistry implements ApplicationContextAware { /** * 表名 -> Work列表(支持一個表多(duo)個Work) */ private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 掃描所有CanalWork實現(xian)類 Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class); // 按(an)表名分組 tableWorkMap.putAll(workMap.values().stream() .collect(Collectors.groupingBy(CanalWork::getTableName))); log.info("已注冊的表同步處理器: {}", tableWorkMap.keySet()); } /** * 獲取指定表的Work列表 * * @param tableName * @return */ public List<CanalWork> getWorksByTable(String tableName) { return tableWorkMap.getOrDefault(tableName, Collections.emptyList()); } /** * 判(pan)斷(duan)是否有表需要處理 * * @return */ public boolean hasWork() { return !tableWorkMap.isEmpty(); } }
8、CanalWork實現類處理數據:
import com.iven.canal.entity.DataEventTypeEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; /** * tb_user表數(shu)據(ju)處理(li) * * Canal服務(wu) → 變更數(shu)據(ju) → CanalRunner 拉取 → JsonMessageParser 解析(xi) → * 篩(shai)選出 tb_user 數(shu)據(ju) → CanalWorkRegistry 獲取 TbUserCanalWorkHandle → * 調用 handle 方(fang)法 → 按事件類型(xing)(INSERT/UPDATE/DELETE)執(zhi)行對應邏輯 */ @Slf4j @Component public class TbUserCanalWorkHandle implements CanalWork { @Override public String getTableName() { return "demo.tb_user"; } @Override public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) { log.info("開始處理(li)[{}庫(ku)]的tb_user表數據,事(shi)件類型:{},數據量:{}", schemaName, eventType, dataList.size()); DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType); // 根(gen)據事件類型(xing)分(fen)別處理 switch (dataEventTypeEnum) { case INSERT: handleInsert(dataList, schemaName); break; case UPDATE: handleUpdate(dataList, schemaName); break; case DELETE: handleDelete(dataList, schemaName); break; default: log.warn("未處理的事件類型:{}", eventType); } } /** * 處理新增(zeng)數據 */ private void handleInsert(List<Map<String, Object>> dataList, String schemaName) { log.info("處(chu)理[{}庫]的tb_user新增數(shu)據,共(gong){}條(tiao)", schemaName, dataList.size()); dataList.forEach(data -> { Object userId = data.get("id"); Object username = data.get("name"); // 新(xin)增邏輯:如同(tong)步到ES、緩存初(chu)始化等 log.info("新增用戶(hu) - ID: {}, 用戶(hu)名: {}", userId, username); }); } /** * 處理更新數(shu)據 */ private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) { log.info("處理[{}庫(ku)]的tb_user更新數據,共{}條", schemaName, dataList.size()); dataList.forEach(data -> { Object userId = data.get("id"); Object newPhone = data.get("phone"); // 假設更新(xin)了手機號 // 更(geng)新(xin)(xin)(xin)邏(luo)輯:如更(geng)新(xin)(xin)(xin)ES文檔、刷新(xin)(xin)(xin)緩存等(deng) log.info("更(geng)新用戶 - ID: {}, 新手(shou)機號: {}", userId, newPhone); }); } /** * 處理刪除數(shu)據 */ private void handleDelete(List<Map<String, Object>> dataList, String schemaName) { log.info("處(chu)理[{}庫]的(de)tb_user刪除數據(ju),共{}條", schemaName, dataList.size()); dataList.forEach(data -> { Object userId = data.get("id"); // 刪(shan)除邏輯:如從ES刪(shan)除、清除緩存(cun)等(deng) log.info("刪除用戶 - ID: {}", userId); }); } }

調度流程:
整個流程通過注冊器(qi)(qi)(qi)管理(li)(li)處理(li)(li)器(qi)(qi)(qi)、解析器(qi)(qi)(qi)轉換數據格式、運行器(qi)(qi)(qi)控制(zhi) Canal 客(ke)戶端生命(ming)周期,最終將數據庫變更(geng)事件(jian)分發到對(dui)應表的處理(li)(li)器(qi)(qi)(qi),實現(xian)了變更(geng)數據的監聽與業務(wu)處理(li)(li)解耦。用戶只需實現(xian)CanalWork接口,即(ji)可自定(ding)義任意表的變更(geng)處理(li)(li)邏(luo)輯。
(1)、初始化階段
1)、Spring 容器啟動時,CanalWorkRegistry 掃(sao)描(miao)所有 CanalWork 實現類(如 TbUserCanalWorkHandle),按表名分組(zu)緩存到 tableWorkMap 中。
2)、CanalRunnerAutoConfig 檢查配置(CanalProperties),若(ruo)開(kai)啟自動同(tong)步(bu)且存在 CanalWork,則為每個 Canal 實例創建 CanalRunner 并啟動。
(2)、運行階段
1)、CanalRunner 建立與 Canal 服務(wu)的連接(jie),訂閱數據庫變更事件(jian)。
2)、循環拉取變更數(shu)(shu)據(Message),通過 JsonMessageParser 解析為表名 - 數(shu)(shu)據列表的(de)映射(Map<String, List<JsonMessageType>>)。
3)、調(diao)用 processParsedData 方(fang)(fang)法,根據表名從 CanalWorkRegistry 獲取(qu)對(dui)應的 CanalWork 列表,執行(xing) handle 方(fang)(fang)法處理數(shu)據。
(3)、銷毀階段
程(cheng)序停(ting)止(zhi)時,CanalRunner 中斷(duan)線程(cheng),斷(duan)開與 Canal 服務(wu)的(de)連接。