中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

RabbitMQ~消費者實時與消息服(fu)務器保持通話

這個文章主要介紹簡單的消費者的實現,rabbitMQ實現的消費者可以對消息服務器進行實時監聽,當有消息(生產者把消息推到服務器上之后),消費者可以自動去消費它,這通常是開啟一個進程去維護這個對話,它與消息服務器保持一個TCP的長連接,整個這個過程于rabbitMQ為我們提供,程序開發人員只需要實現(xian)自己的回調(diao)方法即可.

簡單的rabbitMQ消費者

    /// <summary>
    /// 消息消費者
    /// </summary>
    public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase
    {
        private readonly string exchangeName;
        private readonly string queueName;
        private readonly IConnection connection;
        private readonly IModel channel;
        private bool disposed;

        /// <summary>
        /// 從消(xiao)息服務器(qi)拉(la)到消(xiao)息后觸發
        /// </summary>
        public event EventHandler<MessageReceivedEventArgs> MessageReceived;

        /// <summary>
        /// Initializes a new instance of <c>RabbitMqMessageSubscriber</c> class.
        /// </summary>
        /// <param name="uri"></param>
        /// <param name="exchangeName"></param>
        /// <param name="queueName"></param>
        public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "")
        {
            this.exchangeName = exchangeName;
            this.queueName = queueName;
            var factory = new ConnectionFactory() { Uri = uri };
            if (!string.IsNullOrWhiteSpace(userName))
                factory.UserName = userName;
            if (!string.IsNullOrWhiteSpace(password))
                factory.Password = password;
            this.connection = factory.CreateConnection();
            this.channel = connection.CreateModel();
        }

        public void Subscribe()
        {
            channel.QueueDeclare(
                queue: this.queueName, 
                durable: false,//持久化
                exclusive: false, //獨占,只(zhi)能被(bei)一個consumer使用
                autoDelete: false,//自己刪除(chu),在最后(hou)一個consumer完(wan)成(cheng)后(hou)刪除(chu)它
                arguments: null);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var json = Encoding.UTF8.GetString(body);
                var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
                this.OnMessageReceived(new MessageReceivedEventArgs(message));
                channel.BasicAck(e.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: false,
                                 consumer: consumer);
        }

        private void OnMessageReceived(MessageReceivedEventArgs e)
        {
            this.MessageReceived?.Invoke(this, e);
        }

        protected override void Finalize(bool disposing)
        {
            if (disposing)
            {
                if (!disposed)
                {
                    this.channel.Dispose();
                    this.connection.Dispose();
                    disposed = true;
                }
            }
        }
    }

簡單調用

   class Program
    {
        static void Main(string[] args)
        {
            var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl");
            subscriber.MessageReceived += Subscriber_MessageReceived;
            subscriber.Subscribe();
            Console.ReadKey();
        }

        private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e)
        {
            Console.WriteLine("消費者2->消費了一(yi)個消息{0}", e.Message);
            Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消費者2->消費了一個消息{0}" + e.Message);
            Thread.Sleep(2000);
        }

    }

實時拉消息

RabbitMQ消息模型

通過上面圖我們(men)可以更(geng)容易和清(qing)晰的去理解rabbitmq的工作流程.

posted @ 2017-02-28 11:06  張占嶺  閱讀(3062)  評論(1)    收藏  舉報