msgId
消息唯一id,发消息时有生产端生成,保存在消息的属性中,也是发送消息的返回值SendResult中的msgId,主要用于创建索引和作为事务消息的事务id。
MessageClientIDSetter.setUniqID(msg);
public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}
在MessageClientIDSetter初始化时,LEN代表,ip四个字节,端口两个字节,类加载器的哈希码四个字节,当前时间戳与该月第一毫秒的差值四个字节,自增计数器两个字节,因为端口只占两个字节8位,最大65535,但是获取端口的时候返回int类型4个字节,所以前两位是空的,因此往后再移动两位放端口,这样前面四位就可以放下ip了,然后再把位置移到6,放入最后一个哈希码,生成长度为20的固定字符串
static {
LEN = 4 + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(10);
tempBuffer.position(2);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(0);
try {
tempBuffer.put(UtilAll.getIP());
} catch (Exception e) {
tempBuffer.put(createFakeIP());
}
tempBuffer.position(6);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
保存当月以及下月的第一个毫秒值。
private synchronized static void setStartTime(long millis) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(millis);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
startTime = cal.getTimeInMillis();
cal.add(Calendar.MONTH, 1);
nextStartTime = cal.getTimeInMillis();
}
生成唯一ID,长度总共为32位,固定字符串占20位,剩下的占12位
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}
创建唯一ID的buffer,如果当前时间大于等于初始保存的下月开始毫秒值,则需要重置当月下月初始毫秒值。计算当前时间戳与当月初始毫秒值的差值,获取自增器的当前数值,转化成short类型,总共6位放入buffer,最后生成长度为12的字符串,这样可以保证一个月内不会出现重复。
private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}
offsetMsgId
具体代表存储消息的broker的ip和端口号,以及在文件中的偏移量
this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
八个字节长度用来保存存储消息的broker的ip和端口信息
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
}
public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.flip();
return byteBuffer;
}
生成msgId,长度也是32位。
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
input.limit(MessageDecoder.MSG_ID_LENGTH);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
一个字节用两个char表示
public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
int v = src[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
这个消息id可以解码出对应消息存储的位置
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
SocketAddress address;
long offset;
byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
ByteBuffer bb = ByteBuffer.wrap(port);
int portInt = bb.getInt(0);
address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
bb = ByteBuffer.wrap(data);
offset = bb.getLong(0);
return new MessageId(address, offset);
}
版权声明:本文为ph3636原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。