中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Lind.DDD.LindMQ~關于持久化(hua)到Redis的(de)消(xiao)息格式

回到目錄

關(guan)于(yu)持久(jiu)化到Redis的(de)消(xiao)息格式,主要是說在Broker上把消(xiao)息持久(jiu)化的(de)過程中,需要存(cun)儲哪些類型的(de)消(xiao)息,因為我們的(de)消(xiao)息是分topic的(de),而(er)每個(ge)topic又有(you)若干個(ge)queue組成,而(er)我們的(de)topic和queue由于(yu)redis存(cun)儲結構的(de)原(yuan)因,我們需要將它們分區對(dui)應存(cun)儲一下,而(er)不能像關(guan)系型數據(ju)庫(ku)那樣靈活,所以要額(e)外設計幾個(ge)數據(ju)結構來存(cun)儲它們。

一 Topic字典

二 Topic對(dui)應(ying)的Queue字典

三(san) Queue里的消(xiao)息

四 某(mou)個客戶端對應某(mou)個Queue的(de)消費進度(du)

以上四個結構(gou)是我們(men)要說的(de),它們(men)會在推消息(xi),拉消息(xi),刪消息(xi)時用到,下面一一介紹(shao)一下,講的(de)不好不對的(de)地方,歡迎大家為大叔留(liu)言(yan)。

一 Topic字典

主要(yao)存儲每(mei)個topic,它是一(yi)個set集合(he)(he),redis的(de)(de)我(wo)集合(he)(he)類型之一(yi),每(mei)個key是唯一(yi)的(de)(de)LindMq_Topic,值value就(jiu)是我(wo)們(men)客戶(hu)端傳來的(de)(de)具體topic的(de)(de)名字(zi),這主要(yao)是在刪除過期(qi)的(de)(de)消(xiao)息時用(yong)的(de)(de),主是作用(yong)是遍歷(li)所有(you)的(de)(de)topic消(xiao)息類型,這樣我(wo)們(men)在刪除消(xiao)息時,就(jiu)可(ke)以把(ba)所有(you)注冊的(de)(de)topic都找到了,最(zui)后把(ba)過期(qi)的(de)(de)刪除,默認消(xiao)息存活周期(qi)是一(yi)天。

刪除過期的消息代碼如下(xia)

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
  foreach (var topic in topicList)
   {
     var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);
         foreach (var queue in queueList)
           {
            var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");
              RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);
            }
    }

二 Topic對應的Queue字典

我(wo)(wo)(wo)們(men)(men)知道,為了加大redis的(de)(de)(de)并(bing)發量(liang)和(he)吞(tun)吐(tu)量(liang),我(wo)(wo)(wo)們(men)(men)會把(ba)大數據鍵(jian)值對設計成多(duo)個(ge)鍵(jian),這(zhe)就(jiu)像是一個(ge)集群環境的(de)(de)(de)sharing,就(jiu)是將大數據進行分(fen)片,而我(wo)(wo)(wo)們(men)(men)的(de)(de)(de)分(fen)片規則(ze)是采用按對象取模(mo)的(de)(de)(de)方式,模(mo)數可(ke)(ke)以(yi)(yi)自己設置,比(bi)較我(wo)(wo)(wo)設置8,那說明(ming)我(wo)(wo)(wo)的(de)(de)(de)隊(dui)列(分(fen)片)最多(duo)可(ke)(ke)以(yi)(yi)被分(fen)為8個(ge),這(zhe)個(ge)大家可(ke)(ke)以(yi)(yi)去做測試,挺有意思(si)的(de)(de)(de),比(bi)隨機數來個(ge)直(zhi)接!而這(zhe)一次(ci)redis里的(de)(de)(de)鍵(jian)就(jiu)是某個(ge)topic,而值就(jiu)是我(wo)(wo)(wo)們(men)(men)的(de)(de)(de)topic加上(shang)隊(dui)列索引,例(li)如你的(de)(de)(de)topic是zzl,那么隊(dui)列里的(de)(de)(de)鍵(jian)可(ke)(ke)能就(jiu)是zzl0,zzl1,zzl2...

三 Queue里的消息

我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)生產者將消(xiao)(xiao)(xiao)息(xi)(xi)發送(song)到(dao)broker里,然(ran)后于(yu)broker將消(xiao)(xiao)(xiao)息(xi)(xi)持久化(hua)到(dao)具體(ti)的(de)(de)(de)(de)(de)(de)(de)(de)存儲介質里,當然(ran)這里我(wo)們(men)(men)(men)用的(de)(de)(de)(de)(de)(de)(de)(de)是(shi)Redis,在存儲在redis里時(shi),我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)具體(ti)隊列(lie)(lie)的(de)(de)(de)(de)(de)(de)(de)(de)鍵是(shi)有后綴(zhui)的(de)(de)(de)(de)(de)(de)(de)(de),這主要(yao)用于(yu)消(xiao)(xiao)(xiao)息(xi)(xi)的(de)(de)(de)(de)(de)(de)(de)(de)回(hui)收,因(yin)為(wei)我(wo)們(men)(men)(men)打算1天回(hui)收一(yi)次消(xiao)(xiao)(xiao)息(xi)(xi),所以(yi)我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)消(xiao)(xiao)(xiao)息(xi)(xi)后綴(zhui)是(shi)個日期變量,當然(ran)精確到(dao)天就可(ke)以(yi)了(le)(le),它(ta)可(ke)以(yi)是(shi)這樣鍵名LindMQ_order_Paid4_20161202,每個隊列(lie)(lie)都有自己的(de)(de)(de)(de)(de)(de)(de)(de)后綴(zhui),我(wo)們(men)(men)(men)在清除消(xiao)(xiao)(xiao)息(xi)(xi)時(shi)也就有了(le)(le)方法(fa)了(le)(le)。我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)隊列(lie)(lie)存儲結構是(shi)比(bi)較特殊的(de)(de)(de)(de)(de)(de)(de)(de)sortedSet ,就是(shi)可(ke)排序的(de)(de)(de)(de)(de)(de)(de)(de)集合(he),它(ta)有權重(zhong)的(de)(de)(de)(de)(de)(de)(de)(de)概念,我(wo)們(men)(men)(men)剛(gang)好可(ke)以(yi)使用這個特性來記錄客戶端的(de)(de)(de)(de)(de)(de)(de)(de)消(xiao)(xiao)(xiao)費進度,因(yin)為(wei)我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)權重(zhong)值(zhi)在一(yi)個redis鍵/值(zhi)對(dui)里是(shi)唯一(yi)的(de)(de)(de)(de)(de)(de)(de)(de)。

下(xia)面代碼選(xuan)自Push入隊列(lie)的代碼片(pian)斷,分(fen)享給大家

       //存儲當前Topic
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);

            //要存儲到哪個隊列(lie)
            body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
            var dataKey = body.Topic + body.QueueId;
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);

            //記(ji)錄偏移
            var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
            body.QueueOffset = offset + 1;

            //存(cun)儲(chu)消息
            RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
                GetRedisDataKey(dataKey),
                Utils.SerializeMemoryHelper.SerializeToJson(body),
                score: body.QueueOffset);

四 某個客戶端對應某個Queue的消費進度

消費進度是一個很麻煩的問題,生產者的消息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出隊列后就消失了,這是不靠譜的,萬一消(xiao)失失敗了,也會造(zao)成消(xiao)息的(de)丟失!下(xia)面我們主要看(kan)一(yi)下(xia)消費(fei)(fei)(fei)進(jin)度的存(cun)儲(chu),它是(shi)一(yi)個(ge)Hash集(ji)合(he),其中redis的鍵名(ming)是(shi)LindMQ_ConsumerOffset,而(er)value是(shi)一(yi)個(ge)hash對象,hash里的key是(shi)當前隊列(lie)名(ming)+消費(fei)(fei)(fei)者(zhe)IP地址的hashcode值,hash里的value是(shi)這個(ge)消費(fei)(fei)(fei)者(zhe)(客戶端)的消費(fei)(fei)(fei)進(jin)度(Queue里的權(quan)重,Queue的存(cun)儲(chu)結(jie)構是(shi)一(yi)個(ge)sortedSet)。

客(ke)戶端消費的測試代碼

            #region Client-LindMQ
            var consumer = new ConsumerSetting
            {
                BrokenName = "test",
                BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),
                Callback = new Dictionary<string, Action<MessageBody>>() { 
                {"zzl",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(1000);
                }},
                {"zhz",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(2000);
                }}
                }
            };
            var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
            consumerClient.Start();
            #endregion

客戶端消費的測試(shi)結果(guo)

好(hao)了(le),到這(zhe)里我們的(de)(de)LindMQ里數(shu)據存儲結構的(de)(de)內容就講完(wan)了(le),主要使用了(le)redis里的(de)(de)set,sortedSet,hash等數(shu)據結構,在設(she)(she)計(ji)過(guo)程(cheng)中,使用了(le)分(fen)片(Sharing)的(de)(de)概念,當然也是借(jie)鑒(jian)了(le)mongodb和redis集(ji)群的(de)(de)設(she)(she)計(ji)理念,同時借(jie)鑒(jian)了(le)方(fang)雪華老兄的(de)(de)EQueue設(she)(she)計(ji)理念,在這(zhe)里和他們說一聲:謝(xie)謝(xie)!

感謝各位對(dui)Lind的支持!

回到目錄

 

posted @ 2016-12-02 14:36  張占嶺  閱讀(1326)  評論(0)    收藏  舉報