中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

18、Flink CDC監聽(ting)MySQL-Binlog實(shi)現數(shu)據(ju)監聽(ting)

一(yi)、CDC簡介:

CDC(Change Data Capture)是(shi)變(bian)更數(shu)(shu)據(ju)捕獲(huo)(huo)的(de)簡稱,其(qi)核心思想(xiang)是(shi)監測并捕獲(huo)(huo)數(shu)(shu)據(ju)庫的(de)變(bian)動(包括數(shu)(shu)據(ju)或數(shu)(shu)據(ju)表的(de)插入、更新(xin)、刪除等),將這些變(bian)更按發生的(de)順序(xu)完整記錄下來,并寫入到消息中間件或數(shu)(shu)據(ju)倉(cang)庫中以供其(qi)他服務(wu)進行訂閱及消費。CDC技術廣(guang)泛應用于數(shu)(shu)據(ju)同(tong)步、數(shu)(shu)據(ju)分(fen)發、數(shu)(shu)據(ju)采(cai)集等場景,是(shi)數(shu)(shu)據(ju)集成領域(yu)的(de)重要工具。

1、CDC常用工具:

CDC工具

Debezium

Canal

Maxwell

Flink CDC

核心定位

多(duo)數據源 CDC 框架

輕(qing)量 MySQL 同步工具

MySQL 專屬極簡工具

實時處理一體化框架

支持數據源

MySQL、PostgreSQL、Oracle 等

MySQL(最佳)、PostgreSQL 等

僅 MySQL

MySQL、PostgreSQL 等(基于(yu) Debezium)

典型輸出目標

Kafka、Flink/Spark

Kafka、RocketMQ、數據(ju)庫(ku)

Kafka、Redis、文件

Kafka、ES、Hive 等

突出優勢

支持廣泛,全量(liang) + 增量(liang)同步

部(bu)署簡(jian)單,國內生態適配好

配置極(ji)簡,資源占(zhan)用低

支持(chi)實時處理,Exactly-Once 語義

適用場景

多源同步、復雜(za)數(shu)據管(guan)道

MySQL 為(wei)主(zhu)的輕量同步

簡單 MySQL 變更(geng)同步

實時數倉、捕獲(huo) + 處(chu)理(li)一體化

2、相關參考:

Canal學習筆記

 

二、Flink CDC工作原(yuan)理(li):

Flink CDC(Change Data Capture)的(de)核心工作原理(li)是通過(guo)捕獲數(shu)據庫的(de)變(bian)更日(ri)志(如 binlog、WAL 等),將(jiang)其(qi)(qi)轉換為結構化事件流,接(jie)入 Flink 實時計算引擎進行處(chu)理(li),并最終同(tong)步(bu)到(dao)目(mu)標系統(tong)。其(qi)(qi)工作流程可拆解為以下關鍵步(bu)驟:

1、Debezium 捕獲解析數據庫日志:

Flink CDC 本身不直接解(jie)析(xi)數據(ju)庫(ku)日志,而是集(ji)成 Debezium(開(kai)源 CDC 框架)作為底層捕獲引(yin)擎,支持(chi) MySQL、PostgreSQL、Oracle 等(deng)多種(zhong)數據(ju)庫(ku),具體邏(luo)輯如下(xia):

(1)、模擬從(cong)節點獲取日志(zhi):

  • 對于支持主從復制的數據庫(如 MySQL),Debezium 會偽裝成數據庫的從節點,向主庫發送復制請求,獲取變更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。

(2)、解析日(ri)志為(wei)結(jie)構化事件(jian):

(3)數據庫日志通(tong)常是二(er)進制(zhi)格(ge)式,Debezium 會將其解析(xi)為包(bao)含詳細信息(xi)的結構化事(shi)件,包(bao)括:

  • 操作類型(INSERT/UPDATE/DELETE);
  • 變更數據(UPDATE 時包含舊值和新值,INSERT/DELETE 包含對應數據);
  • 表名、數據庫名、操作時間戳等元數據。

2. 全量 + 增量同步(無鎖機制):

Flink CDC 支持 “全量數(shu)據初(chu)始(shi)化 + 增量變(bian)更同步” 的無(wu)縫銜(xian)接,且通過無(wu)鎖機制避(bi)免影響源(yuan)庫性(xing)能:

(1)、全量快照階段:

  • 首次同步時,會對數據庫表進行一次全量快照(讀取當前所有數據),確保初始數據完整。

(2)、增量同步階段(duan):

  • 快照完成后,自動切換到增量同步模式,通過監控 binlog 等日志獲取實時變更,且通過記錄日志位置(如 binlog 的文件名和偏移量)保證全量與增量數據的連續性(無重復、無遺漏)。

3、 封裝為 Flink Source 流入引擎:

解析后的(de)結構化事件會被封裝為(wei) Flink Source 連接(jie)器,直(zhi)接(jie)作為(wei) Flink 的(de)輸入流(liu):

(1)、變更事(shi)件以(yi)流的(de)(de)形式進入 Flink 計算引擎,每條事(shi)件對(dui)應一條數(shu)據記(ji)錄,可通過(guo) Flink 的(de)(de) DataStream API 或 Table/SQL 進行處(chu)理。

4、Flink實時數據處理:

Flink CDC 不僅是 “捕獲工具(ju)”,更能(neng)結合(he) Flink 的實時計算能(neng)力對(dui)變更數據進行處理:

(1)、數據清洗與轉換:

  • 過濾無效數據、格式轉換(如 JSON 轉 Avro)、字段映射等。

(2)、關聯與聚合:

  • 支持與維度表(如 MySQL 維表、HBase 維表)關聯,或進行窗口聚合(如統計分鐘級變更量)。

(3)、狀態管理:

  • 利用 Flink 的狀態后端(如 RocksDB)保存中間結果,支持復雜邏輯(如去重、累計計算)。

5、基于 Checkpoint 保證一致性:

Flink CDC 依賴 Flink 的(de) Checkpoint 機制 確保(bao)數據處理的(de)一致性:

(1)、Checkpoint 觸發:

  • 定期將當前處理進度(包括 CDC 捕獲的日志位置、算子狀態等)持久化到存儲(如 HDFS、本地文件)。

(2)、故障恢復:

  • 若 Flink 任務失敗,可從最近一次 Checkpoint 恢復狀態和日志位置,保證數據不丟失、不重復,實現 Exactly-Once 語義(端到端一致性需下游 Sink 配合支持)。

6. 通過 Sink 同步到目標系統:

處理后的變更數據通過(guo) Flink 的 Sink 連(lian)接器 寫入(ru)目標存儲(chu),支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多種系統。

 

三、MySQL 配置(zhi)(開啟 Binlog):

1、開啟 Binlog(ROW 模式):

# MySQL 配(pei)置(zhi)文件
# Linux:my.cnf配(pei)置(zhi)文件(/etc/mysql/)
# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 開啟 Binlog
log_bin = mysql-bin

# 選擇(ze) ROW 模式(記錄(lu)行級(ji)變更(geng))
binlog-format = ROW

# 配置數(shu)據庫唯一 ID(與 Canal 服務端的 slaveId 不同(tong))
server-id = 1

1

2

2、重啟 MySQL 并驗證:

# 打開命令提示符(cmd/services.msc):
# 按 Win + R 鍵,輸入 cmd,然后按 Enter 鍵打開命令提示符窗口。
# 停止MySQL服(fu)務(wu):
net stop MySQL57

# 啟動(dong)MySQL服(fu)務(wu):
net start MySQL57

# 驗證(zheng)
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

3

 

四、SpringBoot整合Flink CDC實現MySQL數據監聽:

1、POM配置:

        <!-- Flink 核心 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.18.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.1</version>
        </dependency>
        <!-- Flink Table 核心依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.18.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.18.1</version>
        </dependency>
        <!-- Flink Connector Base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.1</version>
        </dependency>
        <!-- Flink CDC MySQL -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.1</version>
        </dependency>

2、YML配置:

flink:
  cdc:
    # 是否開啟(qi)CDC監聽
    auto-start: true
    # 自定義(yi)一個唯一的id
    server-id: "123456"
    # 數(shu)據(ju)庫配置
    mysql:
      hostname: localhost
      port: 3306
      username: root
      password: 123

3、Entity類聲明:

DataChangeType.class

/**
 * Flink CDC數據變更類型枚舉
 * 1、"c"表(biao)示(shi)創建
 * 2、"u"表(biao)示(shi)更新
 * 3、"d"表(biao)示(shi)刪除(chu)
 * 4、"r"表(biao)示(shi)讀取
 */
public enum DataChangeType {

    INSERT("c"),
    UPDATE("u"),
    DELETE("d"),
    READ("r");

    private final String code;

    DataChangeType(String code) {
        this.code = code;
    }

    public static DataChangeType getByCode(String code) {
        for (DataChangeType type : values()) {
            if (type.code.equals(code)) {
                return type;
            }
        }
        return null;
    }

}

FlinkCdcProperties.class

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;



@Data
@Component
@ConfigurationProperties(prefix = "flink.cdc")
public class FlinkCdcProperties {

    /**
     * 是否自動啟動CDC監聽
     */
    private boolean autoStart;

    private String serverId;

    /**
     * MySQL配(pei)置(zhi)
     */
    private Mysql mysql = new Mysql();

    @Data
    public static class Mysql {
        private String hostname;
        private int port;
        private String username;
        private String password;
    }

}

4、FlinkCdcRunner數據變更監聽啟動器:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.iven.flinkcdcdemoservice.entity.DataChangeType;
import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.*;

@Slf4j
@Component
@RequiredArgsConstructor
public class FlinkCdcRunner implements CommandLineRunner {

    // 配置屬性
    private final FlinkCdcProperties properties;

    // 處理器注冊中心
    private final FlinkCdcHandlerRegistry handlerRegistry;

    @Override
    public void run(String... args) throws Exception {
        // 總開關關閉則(ze)不啟(qi)動
        if (!properties.isAutoStart()) {
            log.info("Flink CDC 總開(kai)關關閉,不啟動監聽");
            return;
        }

        // 沒有需要監聽的表則不啟(qi)動
        List<String> monitoredTables = handlerRegistry.getMonitoredTables();
        if (monitoredTables.isEmpty()) {
            log.warn("未發現需要監聽的表(未實現FlinkCdcTableHandler),不啟動監聽");
            return;
        }

        // 1. 創建Flink執行環境(jing)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設(she)置(zhi)并(bing)行度(du)為 1: server-id 的數量必須(xu) ≥ 并(bing)行度(du)
        env.setParallelism(1);
        // 啟用(yong)檢查點(可選)
        // env.enableCheckpointing(5000);
        // 配置檢查點存儲路徑(本地路徑或分布式存儲如HDFS)
        // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
        // 檢查點超(chao)時(shi)時(shi)間(60秒未完成則取消)
        // env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 允許檢查(cha)點失敗次數(默(mo)認0,即一(yi)次失敗則任務(wu)失敗)
        // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
        // 禁用檢(jian)查(cha)點(dian)
        env.getCheckpointConfig().disableCheckpointing();
        // 重試(shi)次數/重試(shi)間(jian)隔(ge)
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,  Time.seconds(10)));

        // 2. 配置(zhi)(zhi)MySQL CDC源(動態設置(zhi)(zhi)需要監聽的表(biao))
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .serverId(properties.getServerId())
                .hostname(properties.getMysql().getHostname())
                .port(properties.getMysql().getPort())
                .username(properties.getMysql().getUsername())
                .password(properties.getMysql().getPassword())
                // 從監聽的表中提取數(shu)據庫列(lie)表(去重)
                .databaseList(extractDatabases(monitoredTables))
                // 直接使用注冊中(zhong)心收集(ji)的表列表
                .tableList(monitoredTables.toArray(new String[0]))
                // 反序列化為JSON
                .deserializer(new JsonDebeziumDeserializationSchema())
                /* initial: 初始化(hua)快照,即全(quan)量導(dao)入(ru)后增量導(dao)入(ru)(檢(jian)測更(geng)新數(shu)據(ju)(ju)寫入(ru))
                 * latest: 只進(jin)行增量導(dao)入(ru)(不讀(du)取歷史變化(hua))
                 * timestamp: 指定(ding)(ding)時間(jian)(jian)戳進(jin)行數(shu)據(ju)(ju)導(dao)入(ru)(大于等于指定(ding)(ding)時間(jian)(jian)錯讀(du)取數(shu)據(ju)(ju))
                 */
                .startupOptions(StartupOptions.latest())
                .build();

        // 3. 讀取CDC數據流并(bing)處理(li)
        DataStreamSource<String> dataStream = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL-CDC-Source"
        );

        // 4. 解析數據并路由(you)到(dao)對應(ying)處理器,使用靜態內(nei)(nei)部類(lei)代(dai)替匿名內(nei)(nei)部類(lei)
        dataStream.process(new CdcDataProcessFunction(handlerRegistry));

        // 5. 啟動(dong)Flink作業
        env.execute("Flink-CDC-動態監聽作業");
    }

    /**
     * 靜態內部類實(shi)現ProcessFunction,確保可序列化
     */
    private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
        private final FlinkCdcHandlerRegistry handlerRegistry;

        // 通過構造(zao)函數傳入依賴
        public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
            this.handlerRegistry = handlerRegistry;
        }

        @Override
        public void processElement(String json, Context ctx, Collector<Void> out) {
            try {
                JSONObject cdcData = JSON.parseObject(json);
                // 操作(zuo)類型:c/u/d
                String op = cdcData.getString("op");
                JSONObject source = cdcData.getJSONObject("source");
                String dbName = source.getString("db");
                String tableName = source.getString("table");
                // 庫名(ming).表名(ming)
                String fullTableName = dbName + "." + tableName;

                // 找到(dao)對應(ying)表的(de)處理器
                FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
                if (handler == null) {
                    log.warn("表[{}]無(wu)處理(li)器,跳過處理(li)", fullTableName);
                    return;
                }

                // 按事(shi)件(jian)類型分發(fa)
                DataChangeType changeType = DataChangeType.getByCode(op);
                if (changeType == null) {
                    log.warn("未知操作類型:{}", op);
                    return;
                }

                switch (changeType) {
                    case INSERT:
                        List<Map<String, Object>> insertData = Collections.singletonList(
                                cdcData.getJSONObject("after").getInnerMap()
                        );
                        handler.handleInsert(insertData);
                        break;
                    case UPDATE:
                        List<Map<String, Object>> beforeData = Collections.singletonList(
                                cdcData.getJSONObject("before").getInnerMap()
                        );
                        List<Map<String, Object>> afterData = Collections.singletonList(
                                cdcData.getJSONObject("after").getInnerMap()
                        );
                        handler.handleUpdate(beforeData, afterData);
                        break;
                    case DELETE:
                        List<Map<String, Object>> deleteData = Collections.singletonList(
                                cdcData.getJSONObject("before").getInnerMap()
                        );
                        handler.handleDelete(deleteData);
                        break;
                    case READ:
                        // 可以忽略快照階(jie)段的(de)讀取操作,或根(gen)據需要處(chu)理
                        log.debug("處理快(kuai)照讀取操作: {}", fullTableName);
                        break;
                }
            } catch (Exception e) {
                log.error("Flink-CDC數(shu)據處理發生未(wei)預期異常", e);
            }
        }
    }

    /**
     * 從表(biao)(biao)名(庫名.表(biao)(biao)名)中提取數據庫列表(biao)(biao)(去重)
     *
     * @param tables
     * @return
     */
    private String[] extractDatabases(List<String> tables) {
        // 截取庫名(如demo.tb_user → demo)
        return tables.stream()
                .map(table -> table.split("\\.")[0])
                .distinct()
                .toArray(String[]::new);
    }

}

5、FlinkCdcHandlerRegistry策略路由:

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Flink CDC處(chu)理(li)器注冊中(zhong)心
 * 處(chu)理(li)器注冊中(zhong)心(自動掃描監聽(ting)表(biao))
 */
@Slf4j
@Component
public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {

    // 緩存(cun):表名(庫名.表名)→ 處理器
    private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();

    // 收集(ji)所有需要監聽的表(供Flink CDC配置使用)
    private List<String> monitoredTables;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        // 掃描所有實(shi)現類
        Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
        beans.values().forEach(handler -> {
            String tableName = handler.getTableName();
            handlerMap.put(tableName, handler);
            log.info("注(zhu)冊監聽表:{} → 處理器:{}", tableName, handler.getClass().getSimpleName());
        });
        // 提取所有需要監(jian)聽的表(biao)
        monitoredTables = new ArrayList<>(handlerMap.keySet());
    }

    /**
     * 獲(huo)取指定(ding)表的處理(li)器
     *
     * @param tableName
     * @return
     */
    public FlinkCdcHandler getHandler(String tableName) {
        return handlerMap.get(tableName);
    }

    /**
     * 獲(huo)取所(suo)有需要監聽的表(供Flink CDC配置(zhi))
     *
     * @return
     */
    public List<String> getMonitoredTables() {
        return monitoredTables;
    }

}

6、FlinkCdcHandler策略模式數據處理:

 FlinkCdcHandler

import java.util.List;
import java.util.Map;

/**
 * 表數據處理接口,每個監聽(ting)的表需實現此接口
 */
public interface FlinkCdcHandler {

    /**
     * 獲取監聽的表名(ming)(ming)(格(ge)式:庫名(ming)(ming).表名(ming)(ming),如demo.tb_user)
     */
    String getTableName();

    /**
     * 處理新增數據
     */
    default void handleInsert(List<Map<String, Object>> dataList) {
        // 默認空(kong)實現,子(zi)類(lei)可(ke)重寫
    }

    /**
     * 處理更新(xin)數據(ju)(包含變(bian)更前(qian)和(he)變(bian)更后的數據(ju))
     * @param beforeList 變(bian)更前(qian)數據(ju)
     * @param afterList 變(bian)更后數據(ju)
     */
    default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
        // 默(mo)認空實現,子類可(ke)重寫(xie)
    }

    /**
     * 處理刪除數據(ju)
     */
    default void handleDelete(List<Map<String, Object>> dataList) {
        // 默(mo)認空實現,子類可重寫(xie)
    }

}

TbUserFlinkCdcHandler

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
    @Override
    public String getTableName() {
        return "demo.tb_user";
    }

    @Override
    public void handleInsert(List<Map<String, Object>> dataList) {
        log.info("處(chu)理(li)tb_user新(xin)增數(shu)據,共{}條", dataList.size());
        dataList.forEach(data -> {
            String id = (String)data.get("id");
            String username = (String) data.get("name");
            // 業務邏輯:如同步到ES、緩存等
            log.info("新增(zeng)用戶:id={}, name={}", id, username);
        });
    }

    @Override
    public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
        log.info("處理tb_user更(geng)新數據,共(gong){}條", afterList.size());
        for (int i = 0; i < afterList.size(); i++) {
            Map<String, Object> before = beforeList.get(i);
            Map<String, Object> after = afterList.get(i);
            log.info("更新用戶(hu):id={}, 舊用戶(hu)名={}, 新用戶(hu)名={}",
                    after.get("id"), before.get("name"), after.get("name"));
        }
    }

    @Override
    public void handleDelete(List<Map<String, Object>> dataList) {
        log.info("處(chu)理(li)tb_user刪除數據,共{}條", dataList.size());
        dataList.forEach(data -> {
            log.info("刪除用戶:id={}", data.get("id"));
        });
    }

}

4

項目啟動時,FlinkCdcHandlerRegistry 掃描并注冊所有 FlinkCdcHandler 實現(xian)類,建立表(biao)與處(chu)理器(qi)的映射(she);FlinkCdcRunner 在 Spring 容器(qi)初始化(hua)后(hou)觸發,檢(jian)查啟動條(tiao)件(jian),初始化(hua) Flink 環境(jing)并構(gou)建 CDC 數(shu)據源,將數(shu)據流接入 CdcDataProcessFunction,該函(han)數(shu)解析變更(geng)事(shi)件(jian)并路(lu)由到(dao)對(dui)應處(chu)理器(qi)執行業務邏輯,最后(hou)啟動 Flink 作業持續監聽(ting)處(chu)理。

 

posted on 2025-11-04 23:03  愛文(Iven)  閱讀(91)  評論(0)    收藏  舉報

導航