Redis學習筆記(ji)~分布式的Pub/Sub模式
redis的客戶端有很(hen)多,這次用它的pub/sub發(fa)布與訂閱我選擇了StackExchange.Redis,發(fa)布與訂閱大家應該很(hen)清(qing)楚了,首先一個(ge)訂閱者,訂閱一個(ge)服務(wu)(wu),服務(wu)(wu)執(zhi)行一些處理程序(可(ke)能是寫個(ge)日(ri)志,插入個(ge)數據,發(fa)個(ge)email)然后當另一個(ge)項目的某(mou)個(ge)業務(wu)(wu)發(fa)布這個(ge)服務(wu)(wu)后,被訂閱的程序將會被執(zhi)行,這個(ge)聽起來很(hen)有意(yi)思,redis有好(hao)的實現了這個(ge)pub/sub功(gong)能。
看一下結構圖
Sub訂閱(消息消費者)
對(dui)于訂(ding)閱方,這里類(lei)似于一(yi)個服務,它會長期運行著,被啟(qi)動(dong)后動(dong)態訂(ding)閱一(yi)些(xie)服務進來(lai),以便以后再被其(qi)它服務調用(yong)
//sub a function in A-project PubSubManager.Instance.Subscribe("UserLog", (msg) => { //訂閱(yue)者處理自己的業務(wu)邏輯,這(zhe)相(xiang)關(guan)于(yu)隊列服務(wu)要干的事 Console.WriteLine(msg); });
Pub發布(消息生產者)
而對于其它項目,如A網站,它可能在被在POST動作后發布這個UserLog的(de)服務(wu),這(zhe)時上(shang)面(mian)的(de)訂閱方將會消費它(ta)(消費者模式(shi)),或者服務(wu)代碼(ma)被執(zhi)行(xing)!
[HttpPost] public ActionResult Index(string user) { PubSubManager.Instance.Publish("UserLog", user + "這(zhe)個用戶提交表(biao)單了"); return View(); }
對于一直運行的(de)服(fu)務,將(jiang)會收(shou)到來自不同項目的(de)消息,而它(ta)只負(fu)責消費(fei)它(ta)!
Lind.DDD.PublishSubscribe
封裝(zhuang)了一(yi)些標準的pub/sub方(fang)法,它(ta)可以有多種實(shi)現方(fang)法,本例使用redis這(zhe)個中(zhong)間件
/// <summary> /// 發布訂閱的接口規則 /// </summary> public interface IPubSub { /// <summary> /// 發布(bu),有順序,對象源是字符串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void Publish(string channel, string value); /// <summary> /// 訂閱,對象源(yuan)是(shi)字符(fu)串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe(string channel, Action<string> action); /// <summary> /// 異步發布,無順序,對象源是(shi)字(zi)符串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync(string channel, string value); /// <summary> /// 異步訂閱,無順(shun)序,對象源是字符串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync(string channel, Action<string> action); /// <summary> /// 發布(bu),有順序,對(dui)象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByte(string channel, byte[] value); /// <summary> /// 訂(ding)閱,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByte(string channel, Action<byte[]> action); /// <summary> /// 異步(bu)發布,有順序,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByteAsync(string channel, byte[] value); /// <summary> /// 異(yi)步訂閱,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByteAsync(string channel, Action<byte[]> action); /// <summary> /// 發(fa)布(bu),有順(shun)序,對(dui)象源是泛型對(dui)象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void Publish<T>(string channel, T value); /// <summary> /// 訂(ding)閱,對象源(yuan)是泛型對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe<T>(string channel, Action<T> action); /// <summary> /// 異步(bu)發布(bu),有順序,對象(xiang)源是泛型對象(xiang) /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync<T>(string channel, T value); /// <summary> /// 異步(bu)訂閱,對(dui)象源(yuan)是泛型對(dui)象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync<T>(string channel, Action<T> action); /// <summary> /// 取消指定訂閱(yue) /// </summary> /// <param name="channel"></param> void UnSubscribe(string channel); /// <summary> /// 取消所有訂閱 /// </summary> /// <param name="channel"></param> void UnSubscribeAll(); }