• Java并发编程
  • JVM
  • JMX
  • Java数据结构与算法
  • 动态字节码生成技术
  • 常用工具
  • 2.1 Message消息对象

    2019-12-26 23:27:14 12,277 8

    在RocketMQ中,生产者使用Message对象表示一条消息,本文对Message的属性和构造方法进行详细的讲解。

    1 Message属性

    Message的定义如下所示:

    org.apache.rocketmq.common.message.Message

    public class Message implements Serializable {
        private String topic;
        private int flag;
        private Map<String, String> properties;
        private byte[] body;
        private String transactionId;

    其中:

        topic:表示消息要到的发送主题,必填

        flag:选填,消息的标记,完全由应用设置,RocketMQ不做任何处理,类似于memcached中flag的作用。

        properties:消息属性,主要存储一些消息的元数据信息

        body:消息的内容,这是一个字节数组,序列化方式由应用决定,例如你可以将一个json转为字节数组,也可以通过protol buffer、hessian编码转为字节数组。

        transactionId:事务id,仅在事务消息中使用到 

    Message数据结构中各个字段都可以通过get、set方式访问,例如访问topic

    msg.setTopic("TopicTest”)
    msg.getTopic()


    2 构造方法

    Message提供了以下构造方法:

    public Message()
    public Message(String topic, byte[] body)
    public Message(String topic, String tags, byte[] body)
    public Message(String topic, String tags, String keys, byte[] body)
    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)

    上述构造方法中的参数,会被设置到Message的成员变量中。可以看到部分参数在Message成员变量中并没有定义,这些参数都可以可选的,会被设置到properties中:

    tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag

    keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割

    waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。

    除了通过构造方法给Message成员变量赋值,也可以通过相关set方法进行赋值,通过对应的get方法取值。除了可以设置上述构造方法中出现的参数,还可以设置:

    public void setBuyerId(String buyerId)
    public void setDelayTimeLevel(int level)
    public void putUserProperty(final String name, final String value)

    其中:

    BuyerId:

    DelayTimeLevel:设置消息的延迟级别,0表示不延迟,大于0会延迟特定时间才被消费

    putUserProperty:自定义消息属性。前面提到tags、keys、waitStoreMsgOK等,都会设置到properties中。如果开发者有自定义消息属性的需求,可以通过此方法进行设置。 


    3 消息属性保留关键字

    properties字段有一些保留的关键字,这些保留关键字定义在MessageConst类中,用户在自定义消息属性时,需要避开这些关键字。注意,rocketmq还提供了putProperty和setProperties两个方法,但是这里不建议用户使用,因为可能会与rocketmq保留的属性名冲突。 

    在MessageConst类中,定义了这些属性的key,例如生产者可能会使用到:

    org.apache.rocketmq.common.message.MessageConst

    public class MessageConst {
        //消息的key
        public static final String PROPERTY_KEYS = "KEYS";
        //消息的tag
        public static final String PROPERTY_TAGS = "TAGS";
        //是否waitStoreMsgOK
        public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
        //消息延迟级别
        public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
        //BUYER_ID
        public static final String PROPERTY_BUYER_ID = "BUYER_ID";
        ...

            

    上一篇:1.4 集群搭建 下一篇:2.5 延迟消息