中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

17、Canal監聽MySQL-Binlog實現數據監聽

一、Canal簡介:

Canal 是(shi)阿里(li)巴(ba)巴(ba)開源的一款基(ji)于數據(ju)庫(ku)增量日志解析的中間件,主要用于實(shi)現數據(ju)庫(ku)變更數據(ju)的實(shi)時同(tong)步。

c8800bf8-5e5c-433c-a111-2a76009a6e59

 

二、工作原理:

1、MySQL主備復制原理:

 44171535-bdc3-4e49-8207-a7682fbcaf2c

(1)、MySQL master 將(jiang)數(shu)據變(bian)更寫入二進(jin)(jin)制(zhi)日(ri)志(zhi)( binary log, 其中記錄叫做二進(jin)(jin)制(zhi)日(ri)志(zhi)事(shi)件binary log events,可以通(tong)過 show binlog events 進(jin)(jin)行查看)

(2)、MySQL slave 將 master 的(de) binary log events 拷貝到它的(de)中繼日志(relay log)

(3)、MySQL slave 重放 relay log 中事件(jian),將數據變更反(fan)映它(ta)自己的數據

2、canal工作原理:

(1)、canal 模(mo)擬 MySQL slave 的交互協(xie)議,偽(wei)裝自己為 MySQL slave ,向 MySQL master 發送dump 協(xie)議

(2)、MySQL master 收到 dump 請(qing)求,開始推送(song) binary log 給 slave (即 canal )

(3)、canal 解析 binary log 對象(xiang)(原始為 byte 流)

 

三、MySQL 配置(開啟 Binlog):

1、開啟 Binlog(ROW 模式):

# MySQL 配(pei)置(zhi)文件
# Linux:my.cnf配(pei)置(zhi)文件(/etc/mysql/)
# Window:my.ini配置文(wen)件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 開啟 Binlog
log_bin = mysql-bin

# 選擇 ROW 模式(記錄行級(ji)變更)
binlog-format = ROW

# 配置(zhi)數據庫唯一 ID(與 Canal 服(fu)務(wu)端的(de) slaveId 不同(tong))
server-id = 1

1

2

2、重啟 MySQL 并驗證:

# 打開命令提示符(cmd/services.msc):
# 按 Win + R 鍵,輸(shu)入(ru) cmd,然后按(an) Enter 鍵打開命令(ling)提示符窗口。
# 停止MySQL服務:
net stop MySQL57

# 啟動MySQL服務:
net start MySQL57

# 驗證
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

3

4

3、創建 Canal 專用賬號(權限最小化):

-- 1. 創建支持遠程連接的用戶(% 表示任意(yi) IP)
-- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予(yu)權限
-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- 2. 創建支持(chi)本地連接(jie)的用戶(localhost)
CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';

-- 授予相同(tong)權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';

-- 刷新權限,使配置(zhi)生效
FLUSH PRIVILEGES;

 

四、Canal 服務端配置:

1、下載并解壓 Canal 服務端:

3e125b24-cfe6-426d-b7df-6f0341c1a31a

2、配置 Canal 實例:

(1)、instance.properties配置:

# MySQL 主庫地址(zhi)(Canal 連接的(de) MySQL 地址(zhi))
canal.instance.master.address=127.0.0.1:3306

# MySQL 賬號密(mi)碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

5

(2)、windows啟動(dong) Canal 服務端:

1)、雙擊啟動(dong)bin/startup.bat:

 6

2)、存在黑屏閃退(tui),修改bin/startup.bat,重啟:

7

3)、日志:

 8

9

10

11

 

五(wu)、SpringBoot整合Canal實現MySQL數據(ju)監聽:

1、POM配置:

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.8</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.8</version>
        </dependency>

2、YML配置:

canal:
  # 自動啟動同步標志位
  auto-sync: true
  instances:
    # 第一個實例
    instance1:
      host: 127.0.0.1
      port: 11111
      # canal server 中配置(zhi)的實例名(canal.destinations = example)
      name: example
      # 批量拉取條數
      batch-size: 100
      # 無數據時休眠時間(ms)
      sleep-time: 1000

3、Entity類聲明:

CanalProperties.class

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

/**
 * Canal配置屬(shu)性類(映射(she)YAML配置)
 */
@Data
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {

    // 是否自動(dong)啟動(dong)同步(bu)
    private boolean autoSync = true;
    // 多實例(li)配置
    private Map<String, InstanceConfig> instances = new HashMap<>();

    @Data
    public static class InstanceConfig {
        private String host;
        private Integer port;
        private String name;
        private Integer batchSize = 100;
        private Integer sleepTime = 1000;
    }

}

DataEventTypeEnum.enum

import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum DataEventTypeEnum {

    INSERT("INSERT"),
    UPDATE("UPDATE"),
    DELETE("DELETE");

    private final String name;

    DataEventTypeEnum(String name) {
        this.name = name;
    }

    public String NAME() {
        return name;
    }

    private static final Map<String, DataEventTypeEnum> NAME_MAP =
            Arrays.stream(DataEventTypeEnum.values())
                    .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));

    public static DataEventTypeEnum getEnum(String name) {
        if (!StringUtils.hasText(name)) {
            return null;
        }
        return NAME_MAP.get(name);
    }
}

JsonMessageType.class

import lombok.Data;

@Data
public class JsonMessageType {

    /**
     * 庫名
     */
    private String schemaName;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 事件類(lei)型
     * (INSERT/UPDATE/DELETE)
     */
    private String eventType;

    /**
     * 數據JSON字符串
     */
    private String data;

}

4、CanalRunnerAutoConfig啟動Canal配置:

import com.iven.canal.entity.CanalProperties;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Canal自動配置(zhi)
 */
@Slf4j
@Configuration
@RequiredArgsConstructor
public class CanalRunnerAutoConfig {

    private final CanalProperties canalProperties;
    private final JsonMessageParser jsonMessageParser;
    private final CanalWorkRegistry workRegistry;

    @Bean
    public ApplicationRunner canalApplicationRunner() {
        return args -> {
            if (!canalProperties.isAutoSync()) {
                log.info("Canal自動(dong)同(tong)步已關閉");
                return;
            }
            // 如果(guo)沒有任何Work,則不(bu)啟(qi)動Canal
            if (!workRegistry.hasWork()) {
                log.info("無表同(tong)步(bu)處理器,不啟動Canal");
                return;
            }
            // 啟動所有(you)配置的Canal實例
            canalProperties.getInstances().forEach((instanceKey, config) -> {
                CanalRunner runner = new CanalRunner(
                        config.getHost(),
                        config.getPort(),
                        config.getName(),
                        config.getBatchSize(),
                        config.getSleepTime(),
                        jsonMessageParser,
                        workRegistry
                );
                runner.start();
            });
        };
    }
}

5、CanalRunner拉取數據:

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWork;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Canal運行(xing)器
 * 手動管理生命周(zhou)期(qi)
 *
 * 1、啟動Canal實例
 * 2、處(chu)理解(jie)析后的(de)數據(ju)
 */
@Slf4j
public class CanalRunner {

    private Thread thread;
    private final String canalIp;
    private final Integer canalPort;
    private final String canalInstance;
    private final Integer batchSize;
    private final Integer sleepTime;
    private final JsonMessageParser jsonMessageParser;
    private final CanalWorkRegistry workRegistry;

    public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,
                       Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {
        this.canalIp = canalIp;
        this.canalPort = canalPort;
        this.canalInstance = canalInstance;
        this.batchSize = batchSize;
        this.sleepTime = sleepTime;
        this.jsonMessageParser = jsonMessageParser;
        this.workRegistry = workRegistry;
    }

    /**
     * 啟動Canal實例(li)
     */
    public void start() {
        if (thread == null || !thread.isAlive()) {
            thread = new Thread(this::run, "canal-runner-" + canalInstance);
            thread.start();
            log.info("Canal實例[{}]啟動成(cheng)功", canalInstance);
        }
    }

    /**
     * 停止Canal實例
     */
    public void stop() {
        if (thread != null && !thread.isInterrupted()) {
            thread.interrupt();
        }
    }

    private void run() {
        log.info("Canal實(shi)例[{}]啟動中(zhong)...", canalInstance);
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");
        try {
            connector.connect();
            // 訂閱所有表(后續通(tong)過(guo)Work過(guo)濾)
            connector.subscribe();
            connector.rollback();

            while (!thread.isInterrupted()) {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();

                if (batchId == -1 || entries.isEmpty()) {
                    Thread.sleep(sleepTime);
                } else {
                    // 解析數(shu)據并(bing)處理
                    Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);
                    processParsedData(parsedData);
                    // 確認處理(li)成功
                    connector.ack(batchId);
                }
            }
        } catch (InterruptedException e) {
            log.info("Canal實例[{}]被中斷", canalInstance);
        } catch (Exception e) {
            log.error("Canal實例[{}]運行(xing)異常", canalInstance, e);
            // 處理失敗回(hui)滾(gun)
            connector.rollback();
        } finally {
            connector.disconnect();
            log.info("Canal實(shi)例[{}]已停止(zhi)", canalInstance);
        }
    }

    /**
     * 調用Work處理解析后的(de)數(shu)據(ju)
     *
     * @param parsedData
     */
    private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {
        parsedData.forEach((tableKey, dataList) -> {
            // 獲取該表的所有Work
            List<CanalWork> works = workRegistry.getWorksByTable(tableKey);
            if (!works.isEmpty() && !dataList.isEmpty()) {
                // 轉換(huan)數據(ju)格(ge)式(Json字符串 -> Map)
                List<Map<String, Object>> dataMaps = dataList.stream()
                        .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))
                        .collect(Collectors.toList());
                String schemaName = dataList.get(0).getSchemaName();
                // 調用每個Work的處理方法(fa)
                works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));
            }
        });
    }
    
}

6、JsonMessageParser解析數據:

MessageParser

import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;

/**
 * 消息(xi)解析器接口
 *
 */
public interface MessageParser<T> {

    T parse(List<CanalEntry.Entry> canalEntryList);

}
 JsonMessageParser
import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.iven.canal.entity.DataEventTypeEnum;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWorkRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;

/**
 * Json消息解析器
 *
 * 1、遍歷原始(shi)數據列表接收
 * 2、解析行級變更數據
 * 3、封裝為(wei) JsonParseType
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {

    private final CanalWorkRegistry workRegistry;

    @Override
    public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {
        Map<String, List<JsonMessageType>> dataMap = new HashMap<>();
        for (CanalEntry.Entry entry : canalEntryList) {
            if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
                continue;
            }

            // 1. 獲取庫名(ming)(ming)、表名(ming)(ming)、帶庫名(ming)(ming)的(de)表標(biao)識
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            String fullTableName = schemaName + "." + tableName;

            // 2. 檢查是否(fou)有對(dui)應的處理(li)器(qi)(支持兩種格式)
            boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));
            boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));
            if (!hasFullTableWork && !hasSimpleTableWork) {
                log.debug("表(biao)[{}]和[{}]均(jun)無同步處理(li)器,跳過", fullTableName, tableName);
                continue;
            }

            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                rowChange.getRowDatasList().forEach(rowData -> {
                    JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);
                    if (jsonMessageType != null) {
                        // 3. 按存(cun)在的處(chu)理器類型,分別添(tian)加到數(shu)據映射(she)中(zhong)
                        if (hasFullTableWork) {
                            dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);
                        }
                        if (hasSimpleTableWork) {
                            dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);
                        }
                    }
                });
            } catch (Exception e) {
                log.error("解析數據失(shi)敗", e);
            }
        }
        return dataMap;
    }

    private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,
                                         CanalEntry.RowData rowData) {
        // 獲(huo)取(qu)庫名(ming)
        String schemaName = header.getSchemaName();
        // 獲(huo)取(qu)表名
        String tableName = header.getTableName();
        if (eventType == CanalEntry.EventType.DELETE) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());
        } else if (eventType == CanalEntry.EventType.INSERT) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());
        } else if (eventType == CanalEntry.EventType.UPDATE) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());
        }
        return null;
    }

    private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,
                                        List<CanalEntry.Column> columns) {
        Map<String, String> data = new HashMap<>();
        columns.forEach(column -> data.put(column.getName(), column.getValue()));
        JsonMessageType result = new JsonMessageType();
        result.setSchemaName(schemaName);
        result.setTableName(tableName);
        result.setEventType(eventType);
        result.setData(JSON.toJSONString(data));
        return result;
    }

}

7、CanalWorkRegistry匹配處理器:

CanalWork

import java.util.List;
import java.util.Map;

/**
 * Canal-Work處理器
 *
 */
public interface CanalWork {
    /**
     * 返(fan)回需要處(chu)理的表(biao)名(ming)(如:tb_user)
     */
    String getTableName();

    /**
     * 處(chu)理表(biao)數據(ju)的方法
     * @param dataList 表(biao)數據(ju)列表(biao)(每條數據(ju)是(shi)字段名-值(zhi)的Map)
     * @param eventType 事件類(lei)型(INSERT/UPDATE/DELETE)
     * @param schemaName 庫(ku)名(用于區分(fen)不同庫(ku)的表(biao))
     */
    void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);

}

CanalWorkRegistry

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * 處理(li)器注冊(ce)器,
 * 掃描并緩存(cun)所(suo)有CanalWork實現類(lei),按(an)表名(ming)分組管理(li),提供查詢表對應處理(li)器的(de)方法
 */
@Slf4j
@Component
public class CanalWorkRegistry implements ApplicationContextAware {

    /**
     * 表名 -> Work列表(支持一個表多(duo)個Work)
     */
    private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 掃描所有CanalWork實現(xian)類
        Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);
        // 按(an)表名分組
        tableWorkMap.putAll(workMap.values().stream()
                .collect(Collectors.groupingBy(CanalWork::getTableName)));
        log.info("已注冊的表同步處理器: {}", tableWorkMap.keySet());
    }

    /**
     * 獲取指定表的Work列表
     *
     * @param tableName
     * @return
     */
    public List<CanalWork> getWorksByTable(String tableName) {
        return tableWorkMap.getOrDefault(tableName, Collections.emptyList());
    }

    /**
     * 判(pan)斷(duan)是否有表需要處理
     *
     * @return
     */
    public boolean hasWork() {
        return !tableWorkMap.isEmpty();
    }

}

8、CanalWork實現類處理數據:

import com.iven.canal.entity.DataEventTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;

/**
 * tb_user表數(shu)據(ju)處理(li)
 *
 * Canal服務(wu) → 變更數(shu)據(ju) → CanalRunner 拉取 → JsonMessageParser 解析(xi) →
 * 篩(shai)選出 tb_user 數(shu)據(ju) → CanalWorkRegistry 獲取 TbUserCanalWorkHandle →
 * 調用 handle 方(fang)法 → 按事件類型(xing)(INSERT/UPDATE/DELETE)執(zhi)行對應邏輯
 */
@Slf4j
@Component
public class TbUserCanalWorkHandle implements CanalWork {
    @Override
    public String getTableName() {
        return "demo.tb_user";
    }

    @Override
    public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {
        log.info("開始處理(li)[{}庫(ku)]的tb_user表數據,事(shi)件類型:{},數據量:{}", schemaName, eventType, dataList.size());

        DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);
        
        // 根(gen)據事件類型(xing)分(fen)別處理
        switch (dataEventTypeEnum) {
            case INSERT:
                handleInsert(dataList, schemaName);
                break;
            case UPDATE:
                handleUpdate(dataList, schemaName);
                break;
            case DELETE:
                handleDelete(dataList, schemaName);
                break;
            default:
                log.warn("未處理的事件類型:{}", eventType);
        }
    }

    /**
     * 處理新增(zeng)數據
     */
    private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {
        log.info("處(chu)理[{}庫]的tb_user新增數(shu)據,共(gong){}條(tiao)", schemaName, dataList.size());
        dataList.forEach(data -> {
            Object userId = data.get("id");
            Object username = data.get("name");
            // 新(xin)增邏輯:如同(tong)步到ES、緩存初(chu)始化等
            log.info("新增用戶(hu) - ID: {}, 用戶(hu)名: {}", userId, username);
        });
    }

    /**
     * 處理更新數(shu)據
     */
    private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {
        log.info("處理[{}庫(ku)]的tb_user更新數據,共{}條", schemaName, dataList.size());
        dataList.forEach(data -> {
            Object userId = data.get("id");
            Object newPhone = data.get("phone"); // 假設更新(xin)了手機號
            // 更(geng)新(xin)(xin)(xin)邏(luo)輯:如更(geng)新(xin)(xin)(xin)ES文檔、刷新(xin)(xin)(xin)緩存等(deng)
            log.info("更(geng)新用戶 - ID: {}, 新手(shou)機號: {}", userId, newPhone);
        });
    }

    /**
     * 處理刪除數(shu)據
     */
    private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {
        log.info("處(chu)理[{}庫]的(de)tb_user刪除數據(ju),共{}條", schemaName, dataList.size());
        dataList.forEach(data -> {
            Object userId = data.get("id");
            // 刪(shan)除邏輯:如從ES刪(shan)除、清除緩存(cun)等(deng)
            log.info("刪除用戶 - ID: {}", userId);
        });
    }
}

b8706148-77aa-40f9-9f39-65310de78089

 

調度流程:

整個流程通過注冊器(qi)(qi)(qi)管理(li)(li)處理(li)(li)器(qi)(qi)(qi)、解析器(qi)(qi)(qi)轉換數據格式、運行器(qi)(qi)(qi)控制(zhi) Canal 客(ke)戶端生命(ming)周期,最終將數據庫變更(geng)事件(jian)分發到對(dui)應表的處理(li)(li)器(qi)(qi)(qi),實現(xian)了變更(geng)數據的監聽與業務(wu)處理(li)(li)解耦。用戶只需實現(xian)CanalWork接口,即(ji)可自定(ding)義任意表的變更(geng)處理(li)(li)邏(luo)輯。

(1)、初始化階段

1)、Spring 容器啟動時,CanalWorkRegistry 掃(sao)描(miao)所有 CanalWork 實現類(如 TbUserCanalWorkHandle),按表名分組(zu)緩存到 tableWorkMap 中。

2)、CanalRunnerAutoConfig 檢查配置(CanalProperties),若(ruo)開(kai)啟自動同(tong)步(bu)且存在 CanalWork,則為每個 Canal 實例創建 CanalRunner 并啟動。

(2)、運行階段

1)、CanalRunner 建立與 Canal 服務(wu)的連接(jie),訂閱數據庫變更事件(jian)。

2)、循環拉取變更數(shu)(shu)據(Message),通過 JsonMessageParser 解析為表名 - 數(shu)(shu)據列表的(de)映射(Map<String, List<JsonMessageType>>)。

3)、調(diao)用 processParsedData 方(fang)(fang)法,根據表名從 CanalWorkRegistry 獲取(qu)對(dui)應的 CanalWork 列表,執行(xing) handle 方(fang)(fang)法處理數(shu)據。

(3)、銷毀階段

程(cheng)序停(ting)止(zhi)時,CanalRunner 中斷(duan)線程(cheng),斷(duan)開與 Canal 服務(wu)的(de)連接。

 

posted on 2025-10-31 10:41  愛文(Iven)  閱讀(45)  評論(0)    收藏  舉報

導航