18、Flink CDC監聽(ting)MySQL-Binlog實(shi)現數(shu)據(ju)監聽(ting)
一(yi)、CDC簡介:
CDC(Change Data Capture)是(shi)變(bian)更數(shu)(shu)據(ju)捕獲(huo)(huo)的(de)簡稱,其(qi)核心思想(xiang)是(shi)監測并捕獲(huo)(huo)數(shu)(shu)據(ju)庫的(de)變(bian)動(包括數(shu)(shu)據(ju)或數(shu)(shu)據(ju)表的(de)插入、更新(xin)、刪除等),將這些變(bian)更按發生的(de)順序(xu)完整記錄下來,并寫入到消息中間件或數(shu)(shu)據(ju)倉(cang)庫中以供其(qi)他服務(wu)進行訂閱及消費。CDC技術廣(guang)泛應用于數(shu)(shu)據(ju)同(tong)步、數(shu)(shu)據(ju)分(fen)發、數(shu)(shu)據(ju)采(cai)集等場景,是(shi)數(shu)(shu)據(ju)集成領域(yu)的(de)重要工具。
1、CDC常用工具:
|
CDC工具 |
Debezium |
Canal |
Maxwell |
Flink CDC |
|
核心定位 |
多(duo)數據源 CDC 框架 |
輕(qing)量 MySQL 同步工具 |
MySQL 專屬極簡工具 |
實時處理一體化框架 |
|
支持數據源 |
MySQL、PostgreSQL、Oracle 等 |
MySQL(最佳)、PostgreSQL 等 |
僅 MySQL |
MySQL、PostgreSQL 等(基于(yu) Debezium) |
|
典型輸出目標 |
Kafka、Flink/Spark |
Kafka、RocketMQ、數據(ju)庫(ku) |
Kafka、Redis、文件 |
Kafka、ES、Hive 等 |
|
突出優勢 |
支持廣泛,全量(liang) + 增量(liang)同步 |
部(bu)署簡(jian)單,國內生態適配好 |
配置極(ji)簡,資源占(zhan)用低 |
支持(chi)實時處理,Exactly-Once 語義 |
|
適用場景 |
多源同步、復雜(za)數(shu)據管(guan)道 |
MySQL 為(wei)主(zhu)的輕量同步 |
簡單 MySQL 變更(geng)同步 |
實時數倉、捕獲(huo) + 處(chu)理(li)一體化 |
2、相關參考:
二、Flink CDC工作原(yuan)理(li):
Flink CDC(Change Data Capture)的(de)核心工作原理(li)是通過(guo)捕獲數(shu)據庫的(de)變(bian)更日(ri)志(如 binlog、WAL 等),將(jiang)其(qi)(qi)轉換為結構化事件流,接(jie)入 Flink 實時計算引擎進行處(chu)理(li),并最終同(tong)步(bu)到(dao)目(mu)標系統(tong)。其(qi)(qi)工作流程可拆解為以下關鍵步(bu)驟:
1、Debezium 捕獲解析數據庫日志:
Flink CDC 本身不直接解(jie)析(xi)數據(ju)庫(ku)日志,而是集(ji)成 Debezium(開(kai)源 CDC 框架)作為底層捕獲引(yin)擎,支持(chi) MySQL、PostgreSQL、Oracle 等(deng)多種(zhong)數據(ju)庫(ku),具體邏(luo)輯如下(xia):
(1)、模擬從(cong)節點獲取日志(zhi):
- 對于支持主從復制的數據庫(如 MySQL),Debezium 會偽裝成數據庫的從節點,向主庫發送復制請求,獲取變更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日(ri)志為(wei)結(jie)構化事件(jian):
(3)數據庫日志通(tong)常是二(er)進制(zhi)格(ge)式,Debezium 會將其解析(xi)為包(bao)含詳細信息(xi)的結構化事(shi)件,包(bao)括:
- 操作類型(INSERT/UPDATE/DELETE);
- 變更數據(UPDATE 時包含舊值和新值,INSERT/DELETE 包含對應數據);
- 表名、數據庫名、操作時間戳等元數據。
2. 全量 + 增量同步(無鎖機制):
Flink CDC 支持 “全量數(shu)據初(chu)始(shi)化 + 增量變(bian)更同步” 的無(wu)縫銜(xian)接,且通過無(wu)鎖機制避(bi)免影響源(yuan)庫性(xing)能:
(1)、全量快照階段:
- 首次同步時,會對數據庫表進行一次全量快照(讀取當前所有數據),確保初始數據完整。
(2)、增量同步階段(duan):
- 快照完成后,自動切換到增量同步模式,通過監控 binlog 等日志獲取實時變更,且通過記錄日志位置(如 binlog 的文件名和偏移量)保證全量與增量數據的連續性(無重復、無遺漏)。
3、 封裝為 Flink Source 流入引擎:
解析后的(de)結構化事件會被封裝為(wei) Flink Source 連接(jie)器,直(zhi)接(jie)作為(wei) Flink 的(de)輸入流(liu):
(1)、變更事(shi)件以(yi)流的(de)(de)形式進入 Flink 計算引擎,每條事(shi)件對(dui)應一條數(shu)據記(ji)錄,可通過(guo) Flink 的(de)(de) DataStream API 或 Table/SQL 進行處(chu)理。
4、Flink實時數據處理:
Flink CDC 不僅是 “捕獲工具(ju)”,更能(neng)結合(he) Flink 的實時計算能(neng)力對(dui)變更數據進行處理:
(1)、數據清洗與轉換:
- 過濾無效數據、格式轉換(如 JSON 轉 Avro)、字段映射等。
(2)、關聯與聚合:
- 支持與維度表(如 MySQL 維表、HBase 維表)關聯,或進行窗口聚合(如統計分鐘級變更量)。
(3)、狀態管理:
- 利用 Flink 的狀態后端(如 RocksDB)保存中間結果,支持復雜邏輯(如去重、累計計算)。
5、基于 Checkpoint 保證一致性:
Flink CDC 依賴 Flink 的(de) Checkpoint 機制 確保(bao)數據處理的(de)一致性:
(1)、Checkpoint 觸發:
- 定期將當前處理進度(包括 CDC 捕獲的日志位置、算子狀態等)持久化到存儲(如 HDFS、本地文件)。
(2)、故障恢復:
- 若 Flink 任務失敗,可從最近一次 Checkpoint 恢復狀態和日志位置,保證數據不丟失、不重復,實現 Exactly-Once 語義(端到端一致性需下游 Sink 配合支持)。
6. 通過 Sink 同步到目標系統:
處理后的變更數據通過(guo) Flink 的 Sink 連(lian)接器 寫入(ru)目標存儲(chu),支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多種系統。
三、MySQL 配置(zhi)(開啟 Binlog):
1、開啟 Binlog(ROW 模式):
# MySQL 配(pei)置(zhi)文件 # Linux:my.cnf配(pei)置(zhi)文件(/etc/mysql/) # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\) # 開啟 Binlog log_bin = mysql-bin # 選擇(ze) ROW 模式(記錄(lu)行級(ji)變更(geng)) binlog-format = ROW # 配置數(shu)據庫唯一 ID(與 Canal 服務端的 slaveId 不同(tong)) server-id = 1


2、重啟 MySQL 并驗證:
# 打開命令提示符(cmd/services.msc): # 按 Win + R 鍵,輸入 cmd,然后按 Enter 鍵打開命令提示符窗口。 # 停止MySQL服(fu)務(wu): net stop MySQL57 # 啟動(dong)MySQL服(fu)務(wu): net start MySQL57 # 驗證(zheng) SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format';

四、SpringBoot整合Flink CDC實現MySQL數據監聽:
1、POM配置:
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink Table 核心依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink Connector Base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink CDC MySQL -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
2、YML配置:
flink: cdc: # 是否開啟(qi)CDC監聽 auto-start: true # 自定義(yi)一個唯一的id server-id: "123456" # 數(shu)據(ju)庫配置 mysql: hostname: localhost port: 3306 username: root password: 123
3、Entity類聲明:
DataChangeType.class
/** * Flink CDC數據變更類型枚舉 * 1、"c"表(biao)示(shi)創建 * 2、"u"表(biao)示(shi)更新 * 3、"d"表(biao)示(shi)刪除(chu) * 4、"r"表(biao)示(shi)讀取 */ public enum DataChangeType { INSERT("c"), UPDATE("u"), DELETE("d"), READ("r"); private final String code; DataChangeType(String code) { this.code = code; } public static DataChangeType getByCode(String code) { for (DataChangeType type : values()) { if (type.code.equals(code)) { return type; } } return null; } }
FlinkCdcProperties.class
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "flink.cdc") public class FlinkCdcProperties { /** * 是否自動啟動CDC監聽 */ private boolean autoStart; private String serverId; /** * MySQL配(pei)置(zhi) */ private Mysql mysql = new Mysql(); @Data public static class Mysql { private String hostname; private int port; private String username; private String password; } }
4、FlinkCdcRunner數據變更監聽啟動器:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.iven.flinkcdcdemoservice.entity.DataChangeType; import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties; import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry; import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.*; @Slf4j @Component @RequiredArgsConstructor public class FlinkCdcRunner implements CommandLineRunner { // 配置屬性 private final FlinkCdcProperties properties; // 處理器注冊中心 private final FlinkCdcHandlerRegistry handlerRegistry; @Override public void run(String... args) throws Exception { // 總開關關閉則(ze)不啟(qi)動 if (!properties.isAutoStart()) { log.info("Flink CDC 總開(kai)關關閉,不啟動監聽"); return; } // 沒有需要監聽的表則不啟(qi)動 List<String> monitoredTables = handlerRegistry.getMonitoredTables(); if (monitoredTables.isEmpty()) { log.warn("未發現需要監聽的表(未實現FlinkCdcTableHandler),不啟動監聽"); return; } // 1. 創建Flink執行環境(jing) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(she)置(zhi)并(bing)行度(du)為 1: server-id 的數量必須(xu) ≥ 并(bing)行度(du) env.setParallelism(1); // 啟用(yong)檢查點(可選) // env.enableCheckpointing(5000); // 配置檢查點存儲路徑(本地路徑或分布式存儲如HDFS) // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints"); // 檢查點超(chao)時(shi)時(shi)間(60秒未完成則取消) // env.getCheckpointConfig().setCheckpointTimeout(60000); // 允許檢查(cha)點失敗次數(默(mo)認0,即一(yi)次失敗則任務(wu)失敗) // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 禁用檢(jian)查(cha)點(dian) env.getCheckpointConfig().disableCheckpointing(); // 重試(shi)次數/重試(shi)間(jian)隔(ge) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10))); // 2. 配置(zhi)(zhi)MySQL CDC源(動態設置(zhi)(zhi)需要監聽的表(biao)) MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .serverId(properties.getServerId()) .hostname(properties.getMysql().getHostname()) .port(properties.getMysql().getPort()) .username(properties.getMysql().getUsername()) .password(properties.getMysql().getPassword()) // 從監聽的表中提取數(shu)據庫列(lie)表(去重) .databaseList(extractDatabases(monitoredTables)) // 直接使用注冊中(zhong)心收集(ji)的表列表 .tableList(monitoredTables.toArray(new String[0])) // 反序列化為JSON .deserializer(new JsonDebeziumDeserializationSchema()) /* initial: 初始化(hua)快照,即全(quan)量導(dao)入(ru)后增量導(dao)入(ru)(檢(jian)測更(geng)新數(shu)據(ju)(ju)寫入(ru)) * latest: 只進(jin)行增量導(dao)入(ru)(不讀(du)取歷史變化(hua)) * timestamp: 指定(ding)(ding)時間(jian)(jian)戳進(jin)行數(shu)據(ju)(ju)導(dao)入(ru)(大于等于指定(ding)(ding)時間(jian)(jian)錯讀(du)取數(shu)據(ju)(ju)) */ .startupOptions(StartupOptions.latest()) .build(); // 3. 讀取CDC數據流并(bing)處理(li) DataStreamSource<String> dataStream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC-Source" ); // 4. 解析數據并路由(you)到(dao)對應(ying)處理器,使用靜態內(nei)(nei)部類(lei)代(dai)替匿名內(nei)(nei)部類(lei) dataStream.process(new CdcDataProcessFunction(handlerRegistry)); // 5. 啟動(dong)Flink作業 env.execute("Flink-CDC-動態監聽作業"); } /** * 靜態內部類實(shi)現ProcessFunction,確保可序列化 */ private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable { private final FlinkCdcHandlerRegistry handlerRegistry; // 通過構造(zao)函數傳入依賴 public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) { this.handlerRegistry = handlerRegistry; } @Override public void processElement(String json, Context ctx, Collector<Void> out) { try { JSONObject cdcData = JSON.parseObject(json); // 操作(zuo)類型:c/u/d String op = cdcData.getString("op"); JSONObject source = cdcData.getJSONObject("source"); String dbName = source.getString("db"); String tableName = source.getString("table"); // 庫名(ming).表名(ming) String fullTableName = dbName + "." + tableName; // 找到(dao)對應(ying)表的(de)處理器 FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName); if (handler == null) { log.warn("表[{}]無(wu)處理(li)器,跳過處理(li)", fullTableName); return; } // 按事(shi)件(jian)類型分發(fa) DataChangeType changeType = DataChangeType.getByCode(op); if (changeType == null) { log.warn("未知操作類型:{}", op); return; } switch (changeType) { case INSERT: List<Map<String, Object>> insertData = Collections.singletonList( cdcData.getJSONObject("after").getInnerMap() ); handler.handleInsert(insertData); break; case UPDATE: List<Map<String, Object>> beforeData = Collections.singletonList( cdcData.getJSONObject("before").getInnerMap() ); List<Map<String, Object>> afterData = Collections.singletonList( cdcData.getJSONObject("after").getInnerMap() ); handler.handleUpdate(beforeData, afterData); break; case DELETE: List<Map<String, Object>> deleteData = Collections.singletonList( cdcData.getJSONObject("before").getInnerMap() ); handler.handleDelete(deleteData); break; case READ: // 可以忽略快照階(jie)段的(de)讀取操作,或根(gen)據需要處(chu)理 log.debug("處理快(kuai)照讀取操作: {}", fullTableName); break; } } catch (Exception e) { log.error("Flink-CDC數(shu)據處理發生未(wei)預期異常", e); } } } /** * 從表(biao)(biao)名(庫名.表(biao)(biao)名)中提取數據庫列表(biao)(biao)(去重) * * @param tables * @return */ private String[] extractDatabases(List<String> tables) { // 截取庫名(如demo.tb_user → demo) return tables.stream() .map(table -> table.split("\\.")[0]) .distinct() .toArray(String[]::new); } }
5、FlinkCdcHandlerRegistry策略路由:
import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Flink CDC處(chu)理(li)器注冊中(zhong)心 * 處(chu)理(li)器注冊中(zhong)心(自動掃描監聽(ting)表(biao)) */ @Slf4j @Component public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable { // 緩存(cun):表名(庫名.表名)→ 處理器 private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>(); // 收集(ji)所有需要監聽的表(供Flink CDC配置使用) private List<String> monitoredTables; @Override public void setApplicationContext(ApplicationContext applicationContext) { // 掃描所有實(shi)現類 Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class); beans.values().forEach(handler -> { String tableName = handler.getTableName(); handlerMap.put(tableName, handler); log.info("注(zhu)冊監聽表:{} → 處理器:{}", tableName, handler.getClass().getSimpleName()); }); // 提取所有需要監(jian)聽的表(biao) monitoredTables = new ArrayList<>(handlerMap.keySet()); } /** * 獲(huo)取指定(ding)表的處理(li)器 * * @param tableName * @return */ public FlinkCdcHandler getHandler(String tableName) { return handlerMap.get(tableName); } /** * 獲(huo)取所(suo)有需要監聽的表(供Flink CDC配置(zhi)) * * @return */ public List<String> getMonitoredTables() { return monitoredTables; } }
6、FlinkCdcHandler策略模式數據處理:
FlinkCdcHandler
import java.util.List; import java.util.Map; /** * 表數據處理接口,每個監聽(ting)的表需實現此接口 */ public interface FlinkCdcHandler { /** * 獲取監聽的表名(ming)(ming)(格(ge)式:庫名(ming)(ming).表名(ming)(ming),如demo.tb_user) */ String getTableName(); /** * 處理新增數據 */ default void handleInsert(List<Map<String, Object>> dataList) { // 默認空(kong)實現,子(zi)類(lei)可(ke)重寫 } /** * 處理更新(xin)數據(ju)(包含變(bian)更前(qian)和(he)變(bian)更后的數據(ju)) * @param beforeList 變(bian)更前(qian)數據(ju) * @param afterList 變(bian)更后數據(ju) */ default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) { // 默(mo)認空實現,子類可(ke)重寫(xie) } /** * 處理刪除數據(ju) */ default void handleDelete(List<Map<String, Object>> dataList) { // 默(mo)認空實現,子類可重寫(xie) } }
TbUserFlinkCdcHandler
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.List; import java.util.Map; @Slf4j @Component public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable { @Override public String getTableName() { return "demo.tb_user"; } @Override public void handleInsert(List<Map<String, Object>> dataList) { log.info("處(chu)理(li)tb_user新(xin)增數(shu)據,共{}條", dataList.size()); dataList.forEach(data -> { String id = (String)data.get("id"); String username = (String) data.get("name"); // 業務邏輯:如同步到ES、緩存等 log.info("新增(zeng)用戶:id={}, name={}", id, username); }); } @Override public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) { log.info("處理tb_user更(geng)新數據,共(gong){}條", afterList.size()); for (int i = 0; i < afterList.size(); i++) { Map<String, Object> before = beforeList.get(i); Map<String, Object> after = afterList.get(i); log.info("更新用戶(hu):id={}, 舊用戶(hu)名={}, 新用戶(hu)名={}", after.get("id"), before.get("name"), after.get("name")); } } @Override public void handleDelete(List<Map<String, Object>> dataList) { log.info("處(chu)理(li)tb_user刪除數據,共{}條", dataList.size()); dataList.forEach(data -> { log.info("刪除用戶:id={}", data.get("id")); }); } }

項目啟動時,FlinkCdcHandlerRegistry 掃描并注冊所有 FlinkCdcHandler 實現(xian)類,建立表(biao)與處(chu)理器(qi)的映射(she);FlinkCdcRunner 在 Spring 容器(qi)初始化(hua)后(hou)觸發,檢(jian)查啟動條(tiao)件(jian),初始化(hua) Flink 環境(jing)并構(gou)建 CDC 數(shu)據源,將數(shu)據流接入 CdcDataProcessFunction,該函(han)數(shu)解析變更(geng)事(shi)件(jian)并路(lu)由到(dao)對(dui)應處(chu)理器(qi)執行業務邏輯,最后(hou)啟動 Flink 作業持續監聽(ting)處(chu)理。