打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
消息队列和发布订阅

编程语言集成了发布订阅

很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用EventListener实现订阅,使用ApplicationEventPublisher使用发布。这种系统集成的我们先叫它“集成组件”

与语言无关的消息队列

事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为rabbitmq为例子。

共同点

  1. 代码解耦,发布者与订阅者可以互不关心
  2. 异步处理,集成组件有的是同步的,需要加@Async注解
  3. 消息安全

不同点

  1. rabbitmq实现的是多服务之间的发布与订阅
  2. 集成组件实现的是一个服务内部的发布与订阅
  3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
  4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

实例

  • 集成组件的发布订阅

订阅

@Getter@Builder(toBuilder = true)@NoArgsConstructor@AllArgsConstructorpublic class CreateBookEvent {  private String address;  private String title;}@Componentpublic class EmailEventListener {  @EventListener  @Async  public void handleEvent(CreateBookEvent event) throws Exception {    System.out.println("email消息:建立图书:" + event.getTitle());  }}

发布

@Autowired  private ApplicationEventPublisher applicationEventPublisher;  public void publish(){      applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建图书").build());}
  • rabbitmq的发布订阅

订阅

@Slf4j@Componentpublic class DistributorSubscriber {  public static final String WORK_QUEUE = "fx.activity.total";  public static final String EXCHANGE = "fx.exchange";  @Autowired  DistributorActivityTotalRepository distributorActivityTotalRepository;  @Autowired  ObjectMapper objectMapper;  @Bean  public TopicExchange phoneTotalExchange() {    return new TopicExchange(EXCHANGE);  }  @Bean  public Queue phoneTotalQueue() {    return new Queue(WORK_QUEUE);  }    @Bean  public Binding bindSignQueue() {    return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);  }   @RabbitListener(queues = WORK_QUEUE)  public void phoneTotalQueueListener(String data) {    try {      logger.debug("fx.activity.total:{}", data);      DistributorActivityTotal entity =          objectMapper.readValue(data, DistributorActivityTotal.class);      distributorActivityTotalRepository.incUpdate(entity);    } catch (Exception ex) {      logger.error("fx.activity.total.error", ex);    }  }

发布

  @Autowired  private RabbitTemplate rabbitTemplate;     public void modifySalesperson(SalesPersonDTO salesPersonDTO) {    try {      rabbitTemplate.convertAndSend(          "EXCHANGE",          "MQName",          objectMapper.writeValueAsString(salesPersonDTO)      );      logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());    } catch (Exception ex) {      logger.error("MQ.modifySalesperson.error", ex);    }  }
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
消息总线VS消息队列
大型网站架构系列:消息队列
1-消息中间件概述
还不知道异步队列?点进来看!
后端通用教程(五)
消息队列MQ、JMS、Kafka怎么选?
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服