Lind.DDD.LindMQ~關于持久化(hua)到Redis的(de)消(xiao)息格式
關(guan)于(yu)持久(jiu)化到Redis的(de)消(xiao)息格式,主要是說在Broker上把消(xiao)息持久(jiu)化的(de)過程中,需要存(cun)儲哪些類型的(de)消(xiao)息,因為我們的(de)消(xiao)息是分topic的(de),而(er)每個(ge)topic又有(you)若干個(ge)queue組成,而(er)我們的(de)topic和queue由于(yu)redis存(cun)儲結構的(de)原(yuan)因,我們需要將它們分區對(dui)應存(cun)儲一下,而(er)不能像關(guan)系型數據(ju)庫(ku)那樣靈活,所以要額(e)外設計幾個(ge)數據(ju)結構來存(cun)儲它們。
一 Topic字典
二 Topic對(dui)應(ying)的Queue字典
三(san) Queue里的消(xiao)息
四 某(mou)個客戶端對應某(mou)個Queue的(de)消費進度(du)
以上四個結構(gou)是我們(men)要說的(de),它們(men)會在推消息(xi),拉消息(xi),刪消息(xi)時用到,下面一一介紹(shao)一下,講的(de)不好不對的(de)地方,歡迎大家為大叔留(liu)言(yan)。
一 Topic字典
主要(yao)存儲每(mei)個topic,它是一(yi)個set集合(he)(he),redis的(de)(de)我(wo)集合(he)(he)類型之一(yi),每(mei)個key是唯一(yi)的(de)(de)LindMq_Topic,值value就(jiu)是我(wo)們(men)客戶(hu)端傳來的(de)(de)具體topic的(de)(de)名字(zi),這主要(yao)是在刪除過期(qi)的(de)(de)消(xiao)息時用(yong)的(de)(de),主是作用(yong)是遍歷(li)所有(you)的(de)(de)topic消(xiao)息類型,這樣我(wo)們(men)在刪除消(xiao)息時,就(jiu)可(ke)以把(ba)所有(you)注冊的(de)(de)topic都找到了,最(zui)后把(ba)過期(qi)的(de)(de)刪除,默認消(xiao)息存活周期(qi)是一(yi)天。
刪除過期的消息代碼如下(xia)
var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY); foreach (var topic in topicList) { var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic); foreach (var queue in queueList) { var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd"); RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey); } }
二 Topic對應的Queue字典
我(wo)(wo)(wo)們(men)(men)知道,為了加大redis的(de)(de)(de)并(bing)發量(liang)和(he)吞(tun)吐(tu)量(liang),我(wo)(wo)(wo)們(men)(men)會把(ba)大數據鍵(jian)值對設計成多(duo)個(ge)鍵(jian),這(zhe)就(jiu)像是一個(ge)集群環境的(de)(de)(de)sharing,就(jiu)是將大數據進行分(fen)片,而我(wo)(wo)(wo)們(men)(men)的(de)(de)(de)分(fen)片規則(ze)是采用按對象取模(mo)的(de)(de)(de)方式,模(mo)數可(ke)(ke)以(yi)(yi)自己設置,比(bi)較我(wo)(wo)(wo)設置8,那說明(ming)我(wo)(wo)(wo)的(de)(de)(de)隊(dui)列(分(fen)片)最多(duo)可(ke)(ke)以(yi)(yi)被分(fen)為8個(ge),這(zhe)個(ge)大家可(ke)(ke)以(yi)(yi)去做測試,挺有意思(si)的(de)(de)(de),比(bi)隨機數來個(ge)直(zhi)接!而這(zhe)一次(ci)redis里的(de)(de)(de)鍵(jian)就(jiu)是某個(ge)topic,而值就(jiu)是我(wo)(wo)(wo)們(men)(men)的(de)(de)(de)topic加上(shang)隊(dui)列索引,例(li)如你的(de)(de)(de)topic是zzl,那么隊(dui)列里的(de)(de)(de)鍵(jian)可(ke)(ke)能就(jiu)是zzl0,zzl1,zzl2...
三 Queue里的消息
我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)生產者將消(xiao)(xiao)(xiao)息(xi)(xi)發送(song)到(dao)broker里,然(ran)后于(yu)broker將消(xiao)(xiao)(xiao)息(xi)(xi)持久化(hua)到(dao)具體(ti)的(de)(de)(de)(de)(de)(de)(de)(de)存儲介質里,當然(ran)這里我(wo)們(men)(men)(men)用的(de)(de)(de)(de)(de)(de)(de)(de)是(shi)Redis,在存儲在redis里時(shi),我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)具體(ti)隊列(lie)(lie)的(de)(de)(de)(de)(de)(de)(de)(de)鍵是(shi)有后綴(zhui)的(de)(de)(de)(de)(de)(de)(de)(de),這主要(yao)用于(yu)消(xiao)(xiao)(xiao)息(xi)(xi)的(de)(de)(de)(de)(de)(de)(de)(de)回(hui)收,因(yin)為(wei)我(wo)們(men)(men)(men)打算1天回(hui)收一(yi)次消(xiao)(xiao)(xiao)息(xi)(xi),所以(yi)我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)消(xiao)(xiao)(xiao)息(xi)(xi)后綴(zhui)是(shi)個日期變量,當然(ran)精確到(dao)天就可(ke)以(yi)了(le)(le),它(ta)可(ke)以(yi)是(shi)這樣鍵名LindMQ_order_Paid4_20161202,每個隊列(lie)(lie)都有自己的(de)(de)(de)(de)(de)(de)(de)(de)后綴(zhui),我(wo)們(men)(men)(men)在清除消(xiao)(xiao)(xiao)息(xi)(xi)時(shi)也就有了(le)(le)方法(fa)了(le)(le)。我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)隊列(lie)(lie)存儲結構是(shi)比(bi)較特殊的(de)(de)(de)(de)(de)(de)(de)(de)sortedSet ,就是(shi)可(ke)排序的(de)(de)(de)(de)(de)(de)(de)(de)集合(he),它(ta)有權重(zhong)的(de)(de)(de)(de)(de)(de)(de)(de)概念,我(wo)們(men)(men)(men)剛(gang)好可(ke)以(yi)使用這個特性來記錄客戶端的(de)(de)(de)(de)(de)(de)(de)(de)消(xiao)(xiao)(xiao)費進度,因(yin)為(wei)我(wo)們(men)(men)(men)的(de)(de)(de)(de)(de)(de)(de)(de)權重(zhong)值(zhi)在一(yi)個redis鍵/值(zhi)對(dui)里是(shi)唯一(yi)的(de)(de)(de)(de)(de)(de)(de)(de)。
下(xia)面代碼選(xuan)自Push入隊列(lie)的代碼片(pian)斷,分(fen)享給大家
//存儲當前Topic RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic); //要存儲到哪個隊列(lie) body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT); var dataKey = body.Topic + body.QueueId; RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey); //記(ji)錄偏移 var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey)); body.QueueOffset = offset + 1; //存(cun)儲(chu)消息 RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd( GetRedisDataKey(dataKey), Utils.SerializeMemoryHelper.SerializeToJson(body), score: body.QueueOffset);
四 某個客戶端對應某個Queue的消費進度
消費進度是一個很麻煩的問題,生產者的消息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出隊列后就消失了,這是不靠譜的,萬一消(xiao)失失敗了,也會造(zao)成消(xiao)息的(de)丟失!下(xia)面我們主要看(kan)一(yi)下(xia)消費(fei)(fei)(fei)進(jin)度的存(cun)儲(chu),它是(shi)一(yi)個(ge)Hash集(ji)合(he),其中redis的鍵名(ming)是(shi)LindMQ_ConsumerOffset,而(er)value是(shi)一(yi)個(ge)hash對象,hash里的key是(shi)當前隊列(lie)名(ming)+消費(fei)(fei)(fei)者(zhe)IP地址的hashcode值,hash里的value是(shi)這個(ge)消費(fei)(fei)(fei)者(zhe)(客戶端)的消費(fei)(fei)(fei)進(jin)度(Queue里的權(quan)重,Queue的存(cun)儲(chu)結(jie)構是(shi)一(yi)個(ge)sortedSet)。
客(ke)戶端消費的測試代碼
#region Client-LindMQ var consumer = new ConsumerSetting { BrokenName = "test", BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406), Callback = new Dictionary<string, Action<MessageBody>>() { {"zzl",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(1000); }}, {"zhz",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(2000); }} } }; var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer }); consumerClient.Start(); #endregion
客戶端消費的測試(shi)結果(guo)
好(hao)了(le),到這(zhe)里我們的(de)(de)LindMQ里數(shu)據存儲結構的(de)(de)內容就講完(wan)了(le),主要使用了(le)redis里的(de)(de)set,sortedSet,hash等數(shu)據結構,在設(she)(she)計(ji)過(guo)程(cheng)中,使用了(le)分(fen)片(Sharing)的(de)(de)概念,當然也是借(jie)鑒(jian)了(le)mongodb和redis集(ji)群的(de)(de)設(she)(she)計(ji)理念,同時借(jie)鑒(jian)了(le)方(fang)雪華老兄的(de)(de)EQueue設(she)(she)計(ji)理念,在這(zhe)里和他們說一聲:謝(xie)謝(xie)!
感謝各位對(dui)Lind的支持!