• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

C#实现异步消息队列

c# 搞代码 4年前 (2022-01-09) 51次浏览 已收录 0个评论

消息队列

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

简单的说队列就是贮存了我们需要处理的Command但是并不是及时的拿到其处理结果;

实现

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、Apache Qpid和HTTPSQS。

优点,缺点

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。

和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。

读取队列消息

主要有两种(1)服务端的推;(2)客户端的拉;

拉:主要是客户端定时轮询拿走消息处本文来源gaodai$ma#com搞$代*码*网(理;

推:通过事件订阅方式主动通知订阅者进行处理;

消息的贮存

简单的是通过内存链表实现贮存;也可以借助DB,比如Redis;还可以持久到本地文件中;

如何保证异步处理的一致性

尽管队列主要目的是实现消息贮存,同时将调用与实现异步化。但是如果想达到处理消息一致性,好的方式是区别业务处理顺序,比如操作主从DB,主负责写,从负责读,我们没有机会在写之后立马从读数据库拿到你想要的结果;同时我们需要借助中间状态,当多个中间状态同时符合调用结果才到到业务时间被处理,否则将“异常消息”持久化,待下次操作;

上代码

建立消息对立核心队列

{    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);     public class MessageQueue:Queue<BaseMessage>    {        public static MessageQueue GlobalQueue = new MessageQueue();         private Timer timer = new Timer();        public MessageQueue() {            this.timer.Interval = 5000;            this.timer.Elapsed += Notify;            this.timer.Enabled = true;        }        private void Notify(object sender, ElapsedEventArgs e) {            lock (this) {                if (this.Count > 0) {                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());                    var message = this.Dequeue();                    this.messageNotifyEvent(message);                }            }        }         private MessageQueueEventNotifyHandler messageNotifyEvent;        public event MessageQueueEventNotifyHandler MessageNotifyEvent {            add {                this.messageNotifyEvent += value;            }             remove {                if (this.messageNotifyEvent != null) {                    this.messageNotifyEvent -= value;                }            }        }    }}

事件处理

public const string OrderCodePrefix = "P";        public void Submit(Message.BaseMessage message)        {            Order order = message.Body as Order;             if (order.OrderCode.StartsWith(OrderCodePrefix))            {                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);            }            else {                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);            }        }

可依据具体业务进行个性化处理;

通过Proxy向队列追加消息

public class OrderServiceProxy:IOrderService    {        public void Submit(Message.BaseMessage message)        {            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);        }    }

客户端调用

OrderService orderService = new OrderService();            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;             var orders = new List<Order>() {                new Order(){OrderCode="P001"},                new Order(){OrderCode="P002"},                new Order(){OrderCode="B003"}            };             OrderServiceProxy proxy = new OrderServiceProxy();            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));             Console.ReadLine();

这样就满足了事件的绑定与触发个性化处理,同时达到了消息异步化的目的,希望更细致的拓展用到后期的项目中。

以上就是C#实现异步消息队列的内容,更多相关内容请关注搞代码(www.gaodaima.com)!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:C#实现异步消息队列

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址