记一次消息总线的打造

虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。

先说下原始需求:

  • Web前端发送命令消息,后端Consumer处理,然后前端得到结果
  • 需要支持Windows服务

很快,下图就出来了:

先来分析分析:

    • 前端怎么知道后端已经处理完成?
    • 前端如何在处理完后的第一时间被触发去执行某些callback呢?
    • Web前端很可能会通过ajax来定时查看某消息的处理状态

  第一反应是增加应答队列,此时:

    • 前端能够很及时的被通知到(后端处理完触发),来执行callback
    • 但是
      • ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
      • 如果前端关闭一段时间,消息会积压下来,性能越变越差

  因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:

 再来分析分析,此时

    • 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
    • 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
    • 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
      • 基于DB行记录的通知效率低
    • 但,即便增加了这个应答队列,也会出现如下问题
      • 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差

  此时该咋办?

  答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到

  因此,继续出一张图

    

  图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。

 

  再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:

    • 在业务逻辑角度,消息只能被无错处理一次
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
    • 对于报Exception的消息,人工处理要方便

  分别分析    

    • 在业务逻辑角度,消息只能被无错处理一次
      • 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
      • 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
      • 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
        • 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
        • 解决方法
          • 在业务逻辑代码中加入幂等性
          • 在业务逻辑代码中加入检查性质代码
          • 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
      • 增加相应的Exception队列,实际中是增加了2个:如:
        • 比如目前有队列messages.CommandA,则异常队列有:
          • messages.CommandA.exceptions1
          • messages.CommandA.exceptions2
          • 为啥是2个?看后续
    • 对于报Exception的消息,人工处理要方便

      • 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
      • 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
      • 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
      • 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
      • 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
        • 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的

 

剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:

 

注意:RABBITMQ默认是不支持客户端消息的优先级的,默认只支持Consumer的优先级,那如何实现客户端消息的优先级呢?

    1. 发送消息时,设置properties属性,记得设置Priority属性值
    2. 安装插件rabbitmq_priority_queue
    3. DeclareQueue时,加入x-max-priority特性值

 

DONE.

 

posted @ 2014-10-21 22:24 McKay 阅读(...) 评论(...) 编辑 收藏