中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

數(shu)據倉庫:Mysql大量(liang)數(shu)據快速導出

背景

寫這(zhe)篇文章主要是(shi)介(jie)紹一(yi)下我做數(shu)據倉庫ETL同(tong)步的過程(cheng)中(zhong)遇到的一(yi)些有意思(si)的內(nei)容和提升程(cheng)序(xu)運行效率(lv)的過程(cheng)。

關系型數(shu)據庫:

  項(xiang)目初期:游(you)戲的(de)運營數據(ju)比較輕(qing)量,相關的(de)運營數據(ju)是(shi)通(tong)過Java后(hou)臺程序聚合查(cha)詢關系(xi)型數據(ju)庫(ku)MySQL完全(quan)可以應付,系(xi)統(tong)通(tong)過定(ding)時任務每日統(tong)計相關數據(ju),等待運營人員查(cha)詢即可。

  項目中后(hou)期:隨(sui)著開(kai)服數量(liang)增多(duo),玩家(jia)數量(liang)越(yue)來(lai)越(yue)多(duo),數據庫的數據量(liang)越(yue)來(lai)越(yue)大,運營后(hou)臺查(cha)(cha)詢效率(lv)越(yue)來(lai)越(yue)低。對(dui)于普通的關(guan)系型(xing)來(lai)說,如MySQL,當單(dan)表(biao)存儲記錄數超過500萬條后(hou),數據庫查(cha)(cha)詢性能將變得極為緩慢,而(er)往往我們都不會只做單(dan)表(biao)查(cha)(cha)詢,還有多(duo)表(biao)join。這里假(jia)如有100個游戲(xi)服,每個服有20張表(biao),而(er)每個表(biao)有500W數據,那么(me):

  總數據量 = 100 * 20 * 500W = 10億(yi)  按當時的(de)庫表結構(gou),換算(suan)成磁(ci)盤空間,約為100G左(zuo)右

我的(de)(de)天吶,現(xian)在(zai)沒有單(dan)機的(de)(de)內存能同一時間載(zai)入100G的(de)(de)數據

//www.zhihu.com/question/19719997

  所以,考慮到(dao)這一點(dian),Hive被提出來解決難題!

 

數據倉(cang)庫

Hive適合(he)做海(hai)(hai)量數(shu)據(ju)的數(shu)據(ju)倉(cang)(cang)庫(ku)工(gong)具(ju), 因為數(shu)據(ju)倉(cang)(cang)庫(ku)中的數(shu)據(ju)有(you)這兩個特點:最全的歷史(shi)數(shu)據(ju)(海(hai)(hai)量)、相對穩定的;所謂相對穩定,指(zhi)的是數(shu)據(ju)倉(cang)(cang)庫(ku)不同于業務系統數(shu)據(ju)庫(ku),數(shu)據(ju)經常會(hui)被更(geng)新(xin),數(shu)據(ju)一旦進入數(shu)據(ju)倉(cang)(cang)庫(ku),很少會(hui)被更(geng)新(xin)和刪除,只會(hui)被大量查詢。而Hive,也是具(ju)備這兩個特點

二、項目架構設(she)計

 在(zai)這里先說下(xia)初(chu)期項目架構的探索,因為數據流向,其實最終就是從MYSQL--------->Hive中,我使用的是Jdbc方式。為什(shen)么不使用下(xia)列工具呢(ni)?

  • Sqoop, 因為該游戲(xi)每個服有(you)將近80張表(biao),然后(hou)又(you)有(you)很多服,以后(hou)還會(hui)更多,而每個服的庫表(biao)數據結構其實是(shi)完全一樣的,只是(shi)IP地址不一樣,使用Sqoop的話(hua),將會(hui)需(xu)要(yao)維護(hu)越來越多的腳本,再者Sqoop沒法處理(li)原始數據中(zhong)一些帶有(you)Hive表(biao)定義(yi)的行列分(fen)隔(ge)符(fu)
  • DataX 阿里開源的數據同步中間件,沒做過(guo)詳細(xi)研究

1、全局緩存隊(dui)列

使用生產(chan)者消費者模型(xing),中間使用內存,數據落地(di)成(cheng)txt

 

 

首先生產者(zhe)(zhe)通過Jdbc獲取(qu)源(yuan)數據(ju)內容,放入固定大小的(de)緩存(cun)(cun)隊列,同時(shi)消費(fei)者(zhe)(zhe)不斷的(de)從緩存(cun)(cun)讀取(qu)數據(ju),根(gen)據(ju)不同的(de)數據(ju)類(lei)型(xing)分別讀取(qu)出來,并逐條寫入相應的(de)txt文件。

速度每(mei)秒約8000條。

這樣做(zuo)表面上(shang)看起來非常美好,流(liu)水(shui)式的(de)處(chu)理,來一條處(chu)理一下,可(ke)是發現(xian)消費的(de)速度(du)遠遠趕(gan)不(bu)上(shang)生產的(de)速度(du),生產出(chu)來的(de)數據會堆積在緩存(cun)隊列(lie)(lie)里面,假如(ru)隊列(lie)(lie)不(bu)固定長度(du)的(de)話,這時候還(huan)會大量消耗內存(cun),所以為了提升寫(xie)入的(de)速度(du),決(jue)定采用下一種方案

 

2、每一(yi)張表一(yi)個(ge)緩存隊列及writer接口

每張表各自起一個生產者(zhe)消(xiao)費者(zhe)模型,消(xiao)費者(zhe)啟(qi)動時初始(shi)化相應的writer接口,架構設計如下:

 

table1的(de)(de)(de)生產者通過Jdbc獲取(qu)源數據(ju)內容,放(fang)入(ru)table自帶(dai)的(de)(de)(de)固(gu)定大小的(de)(de)(de)緩存隊(dui)列,同時table1相應的(de)(de)(de)消費者不斷(duan)的(de)(de)(de)從(cong)緩存讀取(qu)數據(ju),根(gen)據(ju)不同的(de)(de)(de)數據(ju)類型分(fen)別讀取(qu)出(chu)來(lai),并逐條寫入(ru)相應的(de)(de)(de)txt文件(jian)。

速度每秒約2W條。

 這樣生產者線(xian)程(cheng)可(ke)以(yi)并發的進行,通(tong)過控制生產者線(xian)程(cheng)的數量,可(ke)以(yi)大大提高(gao)處(chu)理的效率, 項目關(guan)鍵代(dai)碼如下:

1)線程池

/***
 * 
 * 
 * @描(miao)述 任務(wu)線程池(chi)
 */
public class DumpExecuteService {

    private static ExecutorService dumpServerWorkerService; // 游(you)戲服任(ren)務
    private static ExecutorService dumpTableWorkerService; // 表數(shu)據任務
    private static ExecutorService dumpReaderWorkerService; // 讀取數據任務
    private static ExecutorService dumpWriterWorkerService; // 寫(xie)數據(ju)結果任務

    /***
     * 初始化任(ren)務線程池
     * @param concurrencyDBCount 并(bing)發數量(liang)
     */
    public synchronized static void startup(int concurrencyDBCount) {

        if (dumpServerWorkerService != null)
            return;

        if (concurrencyDBCount > 2)
            concurrencyDBCount = 2; // 最多支持兩(liang)個數(shu)據(ju)庫任務并發執行(xing)

        if (concurrencyDBCount < 1)
            concurrencyDBCount = 1;

        dumpServerWorkerService = Executors.newFixedThreadPool(concurrencyDBCount, new NamedThreadFactory(
                "DumpExecuteService.dumpServerWorkerService" + System.currentTimeMillis()));
        dumpTableWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpTableWorkerService"
                + System.currentTimeMillis()));
        dumpWriterWorkerService = Executors.newFixedThreadPool(8, new NamedThreadFactory("DumpExecuteService.dumpWriterWorkerService"
                + System.currentTimeMillis()));
        dumpReaderWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpReaderWorkerService"
                + System.currentTimeMillis()));
    }

    public static Future<Integer> submitDumpServerWorker(DumpServerWorkerLogic worker) {
        return dumpServerWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpWriteWorker(DumpWriteWorkerLogic worker) {
        return dumpWriterWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpReadWorker(DumpReadWorkerLogic worker) {
        return dumpReaderWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpTableWorker(DumpTableWorkerLogic worker) {
        return dumpTableWorkerService.submit(worker);
    }

    /***
     * 關閉線程池
     */
    public synchronized static void shutdown() {

        //執行線程池(chi)關(guan)閉...
    }
}

說(shuo)明:該類定義4個線程池(chi),分(fen)別用于執行不同的任務

2)游戲服任務線程池

/**
 * 1) 獲取 游戲服log庫數據庫連接 
2) 依次處理單張表
*/ public class DumpServerWorkerLogic extends AbstractLogic implements Callable<Integer> { private static Logger logger = LoggerFactory.getLogger(DumpServerWorkerLogic.class); private final ServerPO server;// 數據庫 private final String startDate;// 開始時間 private SourceType sourceType;// 數(shu)據來(lai)源類(lei)型 private Map<String, Integer> resultDBMap;// 表記(ji)錄計數(shu) private GameType gameType; public DumpServerWorkerLogic(ServerPO server, String startDate, SourceType sourceType, Map<String, Integer> resultDBMap, GameType gameType) { CheckUtil.checkNotNull("DumpServerWorkerLogic.server", server); CheckUtil.checkNotNull("DumpServerWorkerLogic.startDate", startDate); CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType); CheckUtil.checkNotNull("DumpServerWorkerLogic.resultDBMap", resultDBMap); CheckUtil.checkNotNull("DumpServerWorkerLogic.gameType", gameType); this.server = server; this.startDate = startDate; this.sourceType = sourceType; this.resultDBMap = resultDBMap; this.gameType = gameType; } @Override public Integer call() { // 獲取(qu)連接, 并取(qu)得該庫的所(suo)有表(biao) Connection conn = null; try { conn = JdbcUtils.getDbConnection(server); }   catch (Exception e) { throw new GameRuntimeException(e.getMessage(), e); } List<String> tableNames = null; DumpDbInfoBO dumpDbInfoBO = DumpConfig.getDumpDbInfoBO(); int totalRecordCount = 0; try { switch (this.sourceType) { case GAME_LOG: tableNames = JdbcUtils.getAllTableNames(conn); break; case INFOCENTER: tableNames = dumpDbInfoBO.getIncludeInfoTables();  tableNames.add("pay_action"); break; case EVENT_LOG: tableNames = new ArrayList<String>(); Date date = DateTimeUtil.string2Date(startDate, "yyyy-MM-dd"); String sdate = DateTimeUtil.date2String(date, "yyyyMMdd"); String smonth = DateTimeUtil.date2String(date, "yyyyMM"); tableNames.add("log_device_startup" + "_" + smonth); tableNames.add("log_device" + "_" + sdate); break; } // 遍歷table for (String tableName : tableNames) {  // 過(guo)濾(lv) if (dumpDbInfoBO.getExcludeTables().contains(tableName))  continue; DumpTableWorkerLogic tableTask = new DumpTableWorkerLogic(conn, server, tableName, startDate, resultDBMap, gameType, sourceType);  Future<Integer> tableFuture = DumpExecuteService.submitDumpTableWorker(tableTask); int count = tableFuture.get(); totalRecordCount += count; logger.info(String.format("DumpServerWorkerLogic %s-%s.%s be done", startDate, server.getLogDbName(), tableName)); } return totalRecordCount; } catch (Exception e) { throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage()); } finally { JdbcUtils.closeConnection(conn); } } }

 

 3)表處(chu)理(li)任務,一(yi)個表一(yi)個

 

/***
 * 
 * 
 * @描述(shu) 創建一個表查(cha)詢結果寫(xie)任(ren)務(wu) (一個表一個)
 */
public class DumpTableWorkerLogic implements Callable<Integer> {
    private static Logger logger = LoggerFactory.getLogger(DumpTableWorkerLogic.class);

    private final String tableName;
    private final Connection conn;

    private ServerPO server;

    private String startDate;

    private Map<String, Integer> resultDBMap;// 表記錄(lu)計(ji)數(shu)

    private GameType gameType;

    private SourceType sourceType;// 數據來源類型

    public DumpTableWorkerLogic(Connection conn, ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap,
            GameType gameType, SourceType sourceType) {
        CheckUtil.checkNotNull("DumpTableWorkerLogic.conn", conn);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.tableName", tableName);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.server", server);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.startDate", startDate);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.resultDBMap", resultDBMap);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.gameType", gameType);
        CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType);

        this.conn = conn;
        this.tableName = tableName;
        this.server = server;
        this.startDate = startDate;
        this.resultDBMap = resultDBMap;
        this.gameType = gameType;
        this.sourceType = sourceType;

        logger.info("DumpTableWorkerLogic[{}] Reg", tableName);
    }

    @Override
    public Integer call() {
        logger.info("DumpTableWorkerLogic[{}] Start", tableName);

        // 寫檢查(cha)結果任(ren)務
        DumpWriteWorkerLogic writerWorker = new DumpWriteWorkerLogic(server, tableName, startDate, resultDBMap, gameType,
                sourceType);
        Future<Integer> writeFuture = DumpExecuteService.submitDumpWriteWorker(writerWorker);
        logger.info("DumpTableWorkerLogic[{}] writer={}", tableName);

        // 數據查詢任(ren)務
        DumpReadWorkerLogic readerWorker = new DumpReadWorkerLogic(conn, tableName, writerWorker, startDate);
        DumpExecuteService.submitDumpReadWorker(readerWorker);
        logger.info("DumpTableWorkerLogic[{}] reader={}", tableName);

        try {
            int writeCount = writeFuture.get();
            logger.info("DumpTableWorkerLogic[{}] ---" + startDate + "---" + server.getId() + "---" + tableName + "---導出(chu)數據(ju)條數---"
                    + writeCount);
            return writeCount;
        }  catch (Exception e) {
            throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
        }
    }

}

 

 

4)單表讀取任務線程

/***
 * mysql讀取數(shu)據任務
 * 
 */
public class DumpReadWorkerLogic implements Callable<Integer> {

    private static Logger logger = LoggerFactory.getLogger(DumpReadWorkerLogic.class);

    private String tableName;

    private final Connection conn;

    private DumpWriteWorkerLogic writerWorker; // 寫結(jie)果數(shu)據任務(wu)

    private String startDate;// 開始導出日期

    private static final int LIMIT = 50000;// 限制(zhi)sql一(yi)次(ci)讀出條數

    public DumpReadWorkerLogic(Connection conn, String tableName, DumpWriteWorkerLogic writerWorker, String startDate) {
        CheckUtil.checkNotNull("MysqlDataReadWorker.conn", conn);
        CheckUtil.checkNotNull("MysqlDataReadWorker.tableName", tableName);
        CheckUtil.checkNotNull("MysqlDataReadWorker.startDate", startDate);

        this.conn = conn;
        this.tableName = tableName;
        this.writerWorker = writerWorker;
        this.startDate = startDate;

        logger.info("DumpReadWorkerLogic Reg. tableName={}", this.tableName);
    }

    @Override
    public Integer call() {
        try {
            List<Map<String, Object>> result = JdbcUtils.queryForList(conn, "show full fields from " + tableName);

            int index = 0;
            String querySql = "";

            int totalCount = 0;
            while (true) {
                int offset = index * LIMIT;
                querySql = DumpLogic.getTableQuerySql(result, tableName, true, startDate) + " limit " + offset + "," + LIMIT;
                int row = DumpLogic.query(conn, querySql, writerWorker);
                totalCount += row;
                logger.info("tableName=" + tableName + ", offset=" + offset + ", index=" + index + ", row=" + row + ", limit=" + LIMIT);
                if (row < LIMIT)
                    break;
                index++;
            }
            writerWorker.prepareClose();
            logger.info(startDate + "---" + tableName + "---Read.End");
            return totalCount;
        }
        catch (Exception e) {
            throw new GameRuntimeException(e, "MysqlDataReadWorker fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
        }
    }

}

 

5)單表寫入任務線程

/***
 * 
 * 
 * @描述 mysql數(shu)據(ju)導(dao)出(chu)任務
 */
public class DumpWriteWorkerLogic implements Callable<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(DumpWriteWorkerLogic.class);
    private String tableName;// 表名(ming)

    private AtomicBoolean alive; // 線程是否(fou)活(huo)著

    private BufferedWriter writer;

    private ArrayBlockingQueue<String> queue; // 消息(xi)隊列

    private ServerPO server;// 服務器

    private String startDate;// 開(kai)始時(shi)間

    private Map<String, Integer> resultDBMap;// 當天某服某表數量記錄

    private GameType gameType;

    private SourceType sourceType;// 數(shu)據來源類型

    public DumpWriteWorkerLogic(ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap, GameType gameType,
            SourceType sourceType) {
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.tableName", tableName);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.server", server);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.startDate", startDate);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.resultDBMap", resultDBMap);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.gameType", gameType);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.sourceType", sourceType);

        this.tableName = tableName;
        this.server = server;
        this.startDate = startDate;
        this.queue = new ArrayBlockingQueue<>(65536);
        this.alive = new AtomicBoolean(true);
        this.gameType = gameType;
        this.sourceType = sourceType;
        this.writer = createWriter();
        this.resultDBMap = resultDBMap;

        logger.info("DumpWriteWorkerLogic Reg. tableName={}", this.tableName);
    }

    /***
     * 創建writer, 若文件不存在,會(hui)新建文件
     * 
     * @param serverId
     * @return
     */
    private BufferedWriter createWriter() {
        try {
            File toFile = FileUtils.getFilenameOfDumpTable(sourceType, tableName, startDate, gameType, ".txt");
            if (!toFile.exists()) {
                FileUtils.createFile(sourceType, tableName, startDate, gameType);
            }
            return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(toFile, true), Charsets.UTF_8), 5 * 1024 * 1024);
        } catch (Exception e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic createWriter fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage());
        }
    }

    /***
     * 寫入文件
     * 
     * @param line
     *            一條(tiao)記錄
     */
    private void writeToFile(String line) {
        try {
            this.writer.write(line + "\n");
        } catch (Exception e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic writeToFile fail. errorMsg={%s} ", e.getMessage());
        }
    }

    /**
     * 記錄數據到消(xiao)息(xi)隊(dui)列(lie); 如果消(xiao)息(xi)隊(dui)列(lie)滿了, 會阻塞直到可以put為止
     * 
     * @param result
     */
    public void putToWriterQueue(String line) {

        CheckUtil.checkNotNull("DumpWriteWorkerLogic putToWriterQueue", line);

        try {
            queue.put(line);
        } catch (InterruptedException e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic putToWriterQueue fail. errorMsg={%s} ", e.getMessage());
        }
    }

    /**
     * 準備關閉 (通知(zhi)一(yi)下"需要處理的用戶(hu)數據都處理完畢了"; task 寫完數據, 就可以(yi)完畢了)
     */
    public void prepareClose() {
        alive.set(false);
    }

    @Override
    public Integer call() {
        logger.info("DumpWriteWorkerLogic Start. tableName={}", this.tableName);
        try {
            int totalCount = 0;
            while (alive.get() || !queue.isEmpty()) {
                List<String> dataList = new ArrayList<String>();
                queue.drainTo(dataList);
                int count = processDataList(dataList);
                totalCount += count;
            }
            logger.info("DumpWriteWorkerLogic ---" + startDate + "---" + tableName + "---Writer.End");
            return totalCount;
        } catch (Exception exp) {
            throw new GameRuntimeException(exp, "DumpWriteWorkerLogic call() fail. errorMsg={%s} ", exp.getMessage());
        } finally {
            FileUtil.close(this.writer);
        }
    }

    /***
     * 處理數據(ju):寫入本地文件及(ji)map
     * 
     * @param dataList
     *            數據(ju)集合
     * @return
     */
    private int processDataList(List<String> dataList) {
        int totalCount = 0;

        // 所有記錄
        String key = server.getId() + "#" + tableName + "#" + sourceType.getIndex();
        if (dataList != null && dataList.size() > 0) {

            for (String line : dataList) {

                // 按行寫入文(wen)件
                writeToFile(line);

                // 記錄(lu)到result_data_record_count
                if (resultDBMap.get(key) != null) {
                    resultDBMap.put(key, resultDBMap.get(key) + 1);
                }
                else {
                    resultDBMap.put(key, 1);
                }

                totalCount++;
            }
        }

        return totalCount;
    }

}

內存優化

1、使用Jdbc方式(shi)獲取數(shu)據,如果這個(ge)數(shu)據表比較大,那么獲取數(shu)據的速度特別慢(man);

2、這個進程還(huan)會(hui)(hui)占用非常(chang)大(da)的(de)內(nei)存,并且GC不掉(diao)。分析(xi)原因(yin),Jdbc獲(huo)取數據(ju)的(de)時候,會(hui)(hui)一次將所有(you)數據(ju)放入到內(nei)存,如果同步的(de)數據(ju)表非常(chang)大(da),那么甚(shen)至會(hui)(hui)將內(nei)存撐爆。

那么優化的(de)方法是(shi)讓Jdbc不是(shi)一次全部(bu)將(jiang)數據拿到內存,而是(shi)分頁獲取,每次最大limit數設置為50000,請(qing)參考read線(xian)程。

 

經過這種架(jia)構優化后,5000W數據大(da)約(yue)花費40min可(ke)完成導出

 

說明:

因(yin)為本文(wen)只是記(ji)錄(lu)項目(mu)的設(she)計過程(cheng),詳細的代(dai)碼后面會開源(yuan)。

posted @ 2017-09-29 18:14  ^_TONY_^  閱讀(11016)  評論(1)    收藏  舉報