RocketMQ源码分析之消息ID

msgId

消息唯一id,发消息时有生产端生成,保存在消息的属性中,也是发送消息的返回值SendResult中的msgId,主要用于创建索引和作为事务消息的事务id。

MessageClientIDSetter.setUniqID(msg);
public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
public static void setUniqID(final Message msg) {
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}

在MessageClientIDSetter初始化时,LEN代表,ip四个字节,端口两个字节,类加载器的哈希码四个字节,当前时间戳与该月第一毫秒的差值四个字节,自增计数器两个字节,因为端口只占两个字节8位,最大65535,但是获取端口的时候返回int类型4个字节,所以前两位是空的,因此往后再移动两位放端口,这样前面四位就可以放下ip了,然后再把位置移到6,放入最后一个哈希码,生成长度为20的固定字符串

static {
    LEN = 4 + 2 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(10);
    tempBuffer.position(2);
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(0);
    try {
        tempBuffer.put(UtilAll.getIP());
    } catch (Exception e) {
        tempBuffer.put(createFakeIP());
    }
    tempBuffer.position(6);
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}

保存当月以及下月的第一个毫秒值。


private synchronized static void setStartTime(long millis) {
    Calendar cal = Calendar.getInstance();
    cal.setTimeInMillis(millis);
    cal.set(Calendar.DAY_OF_MONTH, 1);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    startTime = cal.getTimeInMillis();
    cal.add(Calendar.MONTH, 1);
    nextStartTime = cal.getTimeInMillis();
}

生成唯一ID,长度总共为32位,固定字符串占20位,剩下的占12位


public static String createUniqID() {
    StringBuilder sb = new StringBuilder(LEN * 2);
    sb.append(FIX_STRING);
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
    return sb.toString();
}

创建唯一ID的buffer,如果当前时间大于等于初始保存的下月开始毫秒值,则需要重置当月下月初始毫秒值。计算当前时间戳与当月初始毫秒值的差值,获取自增器的当前数值,转化成short类型,总共6位放入buffer,最后生成长度为12的字符串,这样可以保证一个月内不会出现重复。

private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}

 

offsetMsgId

具体代表存储消息的broker的ip和端口号,以及在文件中的偏移量

this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

八个字节长度用来保存存储消息的broker的ip和端口信息

private final ByteBuffer hostHolder = ByteBuffer.allocate(8);

public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
    return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
}

public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
    InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
    byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
    byteBuffer.putInt(inetSocketAddress.getPort());
    byteBuffer.flip();
    return byteBuffer;
}

生成msgId,长度也是32位。

public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    input.limit(MessageDecoder.MSG_ID_LENGTH);

    input.put(addr);
    input.putLong(offset);

    return UtilAll.bytes2string(input.array());
}

一个字节用两个char表示

public static String bytes2string(byte[] src) {
    char[] hexChars = new char[src.length * 2];
    for (int j = 0; j < src.length; j++) {
        int v = src[j] & 0xFF;
        hexChars[j * 2] = HEX_ARRAY[v >>> 4];
        hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
    }
    return new String(hexChars);
}

这个消息id可以解码出对应消息存储的位置


public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
    SocketAddress address;
    long offset;

    byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
    byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
    ByteBuffer bb = ByteBuffer.wrap(port);
    int portInt = bb.getInt(0);
    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

    // offset
    byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
    bb = ByteBuffer.wrap(data);
    offset = bb.getLong(0);

    return new MessageId(address, offset);
}

 


版权声明:本文为ph3636原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。