消息隊列(lie)和發布訂閱
編程語言集成了發布訂閱
很多編程語言框架里都提供了發布訂閱的組件,或者叫事件處理機制,而spring框架對這個功能也有支持,主要使用EventListener實現訂閱,使用ApplicationEventPublisher使用(yong)發(fa)布。這種系統集(ji)成(cheng)的(de)我們(men)先叫它“集(ji)成(cheng)組件”
與語言無關的消息隊列
事實上,發布訂閱真的與開發語言沒有什么關系,所以出現了另一種產品,消息中間件,或者叫消息隊列,它是以發布訂閱模式為理論基礎的,同時很多消息隊列產品又有自己的特色,這種獨立的消息隊列我們為rabbitmq為例子。
共同點
- 代碼解耦,發布者與訂閱者可以互不關心
- 異步處理,集成組件有的是同步的,需要加
@Async注解 - 消息安全
不同點
- rabbitmq實現的是多服務之間的發布與訂閱
- 集成組件實現的是一個服務內部的發布與訂閱
- rabbitmq是異步的,集成組件可以是異步,也可以是同步
- rabbitmq可以有廣播,點對點等模式,而集成組件只有廣播模式
基于以上的介紹,主要(yao)幫助大家理解和認識,在什么(me)時候用什么(me)類(lei)型的工具(ju)。
實例
- 集成組件的發布訂閱
訂閱
@Getter
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class CreateBookEvent {
private String address;
private String title;
}
@Component
public class EmailEventListener {
@EventListener
@Async
public void handleEvent(CreateBookEvent event) throws Exception {
System.out.println("email消息:建立圖書:" + event.getTitle());
}
}
發布
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publish(){
applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建圖書").build());
}
- rabbitmq的發布訂閱
訂閱
@Slf4j
@Component
public class DistributorSubscriber {
public static final String WORK_QUEUE = "fx.activity.total";
public static final String EXCHANGE = "fx.exchange";
@Autowired
DistributorActivityTotalRepository distributorActivityTotalRepository;
@Autowired
ObjectMapper objectMapper;
@Bean
public TopicExchange phoneTotalExchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
public Queue phoneTotalQueue() {
return new Queue(WORK_QUEUE);
}
@Bean
public Binding bindSignQueue() {
return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);
}
@RabbitListener(queues = WORK_QUEUE)
public void phoneTotalQueueListener(String data) {
try {
logger.debug("fx.activity.total:{}", data);
DistributorActivityTotal entity =
objectMapper.readValue(data, DistributorActivityTotal.class);
distributorActivityTotalRepository.incUpdate(entity);
} catch (Exception ex) {
logger.error("fx.activity.total.error", ex);
}
}
發布
@Autowired
private RabbitTemplate rabbitTemplate;
public void modifySalesperson(SalesPersonDTO salesPersonDTO) {
try {
rabbitTemplate.convertAndSend(
"EXCHANGE",
"MQName",
objectMapper.writeValueAsString(salesPersonDTO)
);
logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());
} catch (Exception ex) {
logger.error("MQ.modifySalesperson.error", ex);
}
}