• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

C# 基于消息发布订阅模型的示例(上)

c# 搞代码 4年前 (2022-01-09) 16次浏览 已收录 0个评论

在我们的开发过程中,我们经常会遇到这样的场景就是一个对象的其中的一些状态依赖于另外的一个对象的状态,而且这两个对象之间彼此是没有关联的,及两者之间的耦合性非常低,特别是在这种基于容器模型的开发中遇到的会非常多,比如Prism框架或者MEF这种框架中,而我们会发现在这样的系统中我们经常使用一种Publish和Subscribe的模式来进行交互,这种交互有什么好处呢?基于带着这些问题的思考,我们来一步步来剖析!

首先第一步就是定义一个叫做IEventAggregator的接口,里面定义了一些重载的Subscribe和Publish方法,我们具体来看一看这个接口:

/// <summary>
   ///   Enables loosely-coupled publication of and subscription to events.
   /// </summary>
   public interface IEventAggregator
   {
       /// <summary>
       ///   Gets or sets the default publication thread marshaller.
       /// </summary>
       /// <value>
       ///   The default publication thread marshaller.
       /// </value>
       Action<System.Action> PublicationThreadMarshaller { get; set; }
 
       /// <summary>
       ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
       /// </summary>
       /// <param name = "instance">The instance to subscribe for event publication.</param>
       void Subscribe(object instance);
 
       /// <summary>
       ///   Unsubscribes the instance from all events.
       /// </summary>
       /// <param name = "instance">The instance to unsubscribe.</param>
       void Unsubscribe(object instance);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <remarks>
       ///   Uses the default thread marshaller during publication.
       /// </remarks>
       void Publish(object message);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
       void Publish(object message, Action<System.Action> marshal);
   }

有了这个接口,接下来就是怎样去实现这个接口中的各种方法,我们来看看具体的实现过程。

/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public class EventAggregator : IEventAggregator
    {
        /// <summary>
        ///   The default thread marshaller used for publication;
        /// </summary>
        public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();
 
        readonly List<Handler> handlers = new List<Handler>();
 
        /// <summary>
        ///   Initializes a new instance of the <see cref = "EventAggregator" /> class.
        /// </summary>
        public EventAggregator()
        {
            PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
        }
 
        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        public Action<System.Action> PublicationThreadMarshaller { get; set; }
 
        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        public virtual void Subscribe(object instance)
        {
            lock(handlers)
            {
                if (handlers.Any(x => x.Matches(instance)))
                {
                    return;
                }                   
                handlers.Add(new Handler(instance));
            }
        }
 
        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        public virtual void Unsubscribe(object instance)
        {
            lock(handlers)
            {
                var found = handlers.FirstOrDefault(x => x.Matches(instance));
                if (found != null)
                {
                   handlers.Remove(found);
                }                  
            }
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Does not marshall the the publication to any special thread by default.
        /// </remarks>
        public virtual void Publish(object message)
        {
            Publish(message, PublicationThreadMarshaller);
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }
 
        protected class Handler
        {
            readonly WeakReference reference;
            readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();
 
            public Handler(object handler)
            {
                reference = new WeakReference(handler);
 
                var interfaces = handler.GetType().GetInterfaces()
                    .Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
 
                foreach(var @interface in interfaces)
         <strong>本文来源gaodaima#com搞(代@码$网6</strong>       {
                    var type = @interface.GetGenericArguments()[0];
                    var method = @interface.GetMethod("Handle");
                    supportedHandlers[type] = method;
                }
            }
 
            public bool Matches(object instance)
            {
                return reference.Target == instance;
            }
 
            public bool Handle(Type messageType, object message)
            {
                var target = reference.Target;
                if(target == null)
                    return false;
 
                foreach(var pair in supportedHandlers)
                {
                    if(pair.Key.IsAssignableFrom(messageType))
                    {
                        pair.Value.Invoke(target, new[] { message });
                        return true;
                    }
                }
                return true;
            }
        }
    }

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:C# 基于消息发布订阅模型的示例(上)
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址