> 经典 >

[消息中间件]RocketMQ消息发送-请求与响应

时间:2022-12-06 21:45:49       来源:阿里云

大家好,我是小郭,前面的文章介绍了,RocketMQ的搭建,以及RocketMQ的NameServer,接下来我们配合着官方提供的demo,进行实际的消息发送学习,主要学习发送方式、发送参数的含义,以及发送中的一些问题。


【资料图】

消息发送的方式

可靠同步发送,需要等待服务器响应结果。

可靠异步发送,不等待服务器响应结果直接返回,当收到Broker的响应结果后调用SendCallback回调函数。

单向发送,不等待响应结果,不调用回调函数。

实战使用

在实际使用中,我们通常会选择可靠同步发送,因为我们快速的得到成功和失败的反馈。

业务场景:用户A下单创建订单,付款、完成订单

在这过程中,可能会产生三条消息,那在发送消息可能会因为负载均衡的策略,被分配到不同的消息队列中去

RocketMQ常用的两种平均分配算法

AllocateMessageQueueAveragely

平均分配,按照总数除以消费者个数进行,对每个消费者进行分配

AllocateMessageQueueAveragelyByCircle轮流平均分配,按照消费者个数,进行轮询分配

所以为了保证局部顺序消息,只要保证每一组消息被顺序消费即可,我们需要考虑修改MessageQueueSelector方法,保证消息投递到同一个队列中。

//实例化消息生产者Producerorg.apache.rocketmq.client.producer.DefaultMQProducerproducer=neworg.apache.rocketmq.client.producer.DefaultMQProducer("please_rename_unique_group_name");//设置NameServer的地址producer.setNamesrvAddr("xxx:9876");//设置重试次数producer.setRetryTimesWhenSendFailed(3);producer.setSendMsgTimeout(5000);privatestaticSendResultsendMsg(DefaultMQProducerproducer,Orderorder)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{Messagemsg=null;try{msg=newMessage("orderTest","TagA",order.getOrderNo,JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));}catch(UnsupportedEncodingExceptione){e.printStackTrace;}//发送消息到一个BrokerSendResultsendResult=producer.send(msg,(list,message,arg)->{if(message==null||list.isEmpty){returnnull;}//取模intindex=Math.abs(arg.hashCode)%list.size;//模拟超时//if(order.getId==1L){try{Thread.sleep(6000);}catch(Exceptione){}}//returnlist.get(Math.max(index,0));},order.getId);Stringresult=JSONObject.toJSONString(sendResult);System.out.println(result);returnsendResult;}

Listmqs:消息要发送的Topic下所有的分区

Messagemsg:消息对象

额外的参数:用户可以传递自己的参数

发送结果

数据按照我们修改的MessageQueueSelector,进行了队列的选择,正式我们想要的结果,这样我们就可以按照队列的顺序消费。

发送结果参数

messageQueue对象

Stringtopic=msg.getTopic;if(StringUtils.isNotEmpty(this.clientConfig.getNamespace)){topic=NamespaceUtil.withoutNamespace(topic,this.clientConfig.getNamespace);}MessageQueuemessageQueue=newMessageQueue(topic,brokerName,responseHeader.getQueueId);

主要有brokerName,topic和QueueId组成,RocketMQ支持顺序投递,利用MessageQueueSelector,将相同的Key投递到同一个队列中,保证局部顺序。

全局msgId生成规则

规则:IP+进程PID+类加载器HashCode+自增

初始化值

static{byte[]ip;try{ip=UtilAll.getIP;}catch(Exceptione){ip=createFakeIP;}LEN=ip.length+2+4+4+2;//分配大小ip长度+2+4ByteBuffertempBuffer=ByteBuffer.allocate(ip.length+2+4);tempBuffer.put(ip);//进程PIDtempBuffer.putShort((short)UtilAll.getPid);//类加载器hashCodetempBuffer.putInt(MessageClientIDSetter.class.getClassLoader.hashCode);FIX_STRING=UtilAll.bytes2string(tempBuffer.array).toCharArray;setStartTime(System.currentTimeMillis);COUNTER=newAtomicInteger(0);}

构建UniqID

publicstaticStringcreateUniqID{char[]sb=newchar[LEN*2];System.arraycopy(FIX_STRING,0,sb,0,FIX_STRING.length);longcurrent=System.currentTimeMillis;if(current>=nextStartTime){setStartTime(current);}intdiff=(int)(current-startTime);if(diff<0&&diff>-1000_000){//maycausebyNTPdiff=0;}intpos=FIX_STRING.length;UtilAll.writeInt(sb,pos,diff);pos+=8;UtilAll.writeShort(sb,pos,COUNTER.getAndIncrement);returnnewString(sb);}

获取MsgId

StringuniqMsgId=MessageClientIDSetter.getUniqID(msg);if(msginstanceofMessageBatch){StringBuildersb=newStringBuilder;for(Messagemessage:(MessageBatch)msg){sb.append(sb.length==0?"":",").append(MessageClientIDSetter.getUniqID(message));}uniqMsgId=sb.toString;}

queueOffsetKey中为剩余的消费偏移量

responseHeader.getQueueOffset;

消息发送参数的作用

Keys

消息发送的时候设置Keys为订单编号,我们可以在RocketMQ-Console里面查询消息

Tags标签

标签的作用:对Topic中的消息进行过滤,选择处理

下面我们做一个Test消费的测试

publicstaticvoidmain(String[]args)throwsException{//实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name");//设置NameServer的地址consumer.setNamesrvAddr("xxx:9876");//订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("TopicTest","");//设置负载均衡|广播模式默认是负载均衡consumer.setMessageModel(MessageModel.CLUSTERING);/***消息顺序:*全局消息顺序*局部消息顺序*///设置回调函数处理消息consumer.registerMessageListener((MessageListenerConcurrently)(list,consumeConcurrentlyContext)->{System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread.getName,list);list.forEach(msg->{System.out.println("Receivemessage[msgId="+msg.getMsgId+"]"+(System.currentTimeMillis-msg.getStoreTimestamp)+"mslater");});returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start;}

在上面发送消息的时候Tags参数设定为TagA,在消费端将tags参数设置为空,则拉取不到消息

使用MessageQueueSelector,报错却没有重发消息,怎么办?

if(order.getId==1L){try{Thread.sleep(6000);}catch(Exceptione){}}//设置重试次数producer.setRetryTimesWhenSendFailed(3);producer.setSendMsgTimeout(5000);

上面的代码中,进行了重试的设置,同时也在代码中设置了超时的场景

代码很快就抛出了异常信息,发现我们设置的重试设置没有作用,没有进行消息重发,也没有对队列阻塞

缺陷:

设置的重试没有生效,发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试

因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大

没有消息重发,也没有队列阻塞

解决方案

典型的就是消息发送失败后存在数据库中,然后定时调度,最终将消息发送到MQ

但是我觉得这个方案还是存在缺陷的,重新发送消息后,消息的顺序性就发生变动了,这个问题需要思考

总结

今天这篇文章,主要介绍了同步发送顺序消息,以及发送消息中的主要参数的作用和生成规则,

最后的问题,在后面的学习中在进行思考,来完善使用MessageQueueSelector,报错却没有重发消息的问题。

下一篇,我们主要来看RocketMQ消息发送的源码,进行更深入的学习。

精彩推送