成都网站建设设计

将想法与焦点和您一起共享

MessageQueueSelector如何实现顺序消费

Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名、网页空间、营销软件、网站建设、三水网站维护、网站推广。

顺序消息的定义:

顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。

部分顺序消费实现原理:

1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)

2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
		producer.setNamesrvAddr("10.76.0.38:9876");
		producer.start();
		for (int i = 0; i < 1000; i++) {
			Order order  = new Order();
			order.orderId = i;
			order.status = "生成";

			Message msg1 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult1={}",sendResult1);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="付款";

			Message msg2 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult2={}",sendResult2);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="发货";
			Message msg3 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);


			SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					Integer id = (Integer) arg;
					int index = id % mqs.size();
					return mqs.get(index);
				}
				//MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。
			}, order.orderId);
			log.info("sendResult3={}",sendResult1);
		}

消费端主要逻辑如下,主要MessageListenerOrderly回调实现同一个MessageQueue里面的消息不会被并发消费:

       //同一个MessageQueue里面的消息要顺序消费,不能并发消费。
		//但是同一个Topic的不同MessageQueue是可以同时消费的
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
		consumer.setNamesrvAddr("10.76.0.38:9876");
		consumer.subscribe("test", "");
		consumer.setPullBatchSize(1);
		consumer.setConsumeThreadMin(1);
		consumer.setConsumeThreadMax(1);
	//	consumer.registerMessageListener(new MessageListenerConcurrently() {
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
				List messages = new ArrayList<>();

				for (MessageExt msg : msgs) {
					messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
				}
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		consumer.start();
		Thread.currentThread().join();

源码分析:

我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?

就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。

看完上述内容,你们掌握Message Queue Selector如何实现顺序消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


分享题目:MessageQueueSelector如何实现顺序消费
标题URL:http://chengdu.cdxwcx.cn/article/pjcggi.html