AWS的SQS队列教程

基本架构

 

 

queue

标准队列

  1. 标准队列保证消息至少被传输一次,但无法保证只有一次,

  2. 队列会最大努力保证消息是有序的。

性能

sqs的标准队列提供每秒近乎无限的api调用(SendMessageReceiveMessageDeleteMessage、SendMessageBatch, DeleteMessageBatch

FIFO队列

FIFO队列是基于标准队列的,

  1. 它保证消息的顺序按先进先出的顺序严格一致

  2. 它还可以设置重复删除策略,保证消息只被传输一次

对于同一个消息组来说,如果前面的消息没被删除,后续的消息无法被消费(不可见)

性能

每个分区每秒300次api调用

如果不使用批处理(SendMessageReceiveMessageDeleteMessage),则每秒300条消息

如果使用批处理(SendMessageBatchReceiveMessage,或者DeleteMessageBatch),每秒最多支持3000条消息。因为每次批处理最多支持10条消息

消息积压

对于 FIFO 队列,最多可能有 20,000 个正在进行中的消息(由使用者从队列中接收,但尚未从队列中删除)。如果您达到此配额,Amazon SQS 不返回任何错误消息。FIFO 队列查看前 20k 条消息以确定可用的消息组。这意味着,如果您在单个消息组中有积压消息,则在您成功处理完这些积压消息前,您无法使用在之后发送到队列的其他消息组中的消息

消费者组

FIFO队列提供消费组的概念

如果你需要保证全局顺序,那就只使用一个消费者组,但是效率会很低

如果你需要的只是保证同组顺序一致,那就可以使用消费者组来提高FIFO队列的效率,这样,FIFO可以让你在消费者A在消息组1时,让消费者B去消费消息组2的消息

批消费原理

如果多条消息连续发送到 FIFO 队列(每条消息具有不同的消息重复数据删除 ID),Amazon SQS 将存储消息并确认传输。然后,可按传输每条消息的确切顺序接收和处理消息。

在 FIFO 队列中,消息基于消息组 ID 进行排序。如果多台主机(或同一主机上的不同线程)将具有相同消息组 ID 的消息发送到 FIFO 队列,Amazon SQS 将按消息到达以供处理的顺序存储消息。要确保 Amazon SQS 保留发送和接收消息的顺序,每位创建者应使用唯一的消息组 ID 来发送其所有消息。

FIFO 队列逻辑仅应用于每个消息组 ID。每个消息组 ID 代表 Amazon SQS 队列中不同的有序的消息组。对于每一个消息组 ID,所有消息的发送和接收均严格遵循一定的顺序。但是,具有不同的消息组 ID 值的消息可能不会按顺序发送和接收。您必须将消息组 ID 与消息关联。如果您未提供消息组 ID,此操作将失败。如果需要一组有序的消息,请为要将消息发送到 FIFO 队列提供相同的消息组 ID。

接受处理逻辑

 

消费者无法消费特定消费者组的消息

当批处理时,每次接受消息会尽可能返回相同消息组的消息,此时其他消费者可以消费跟前面拿到的消息的消费组不同的消息。因此,如果消费者A拿到消息(1,1,1,2,2),那么消费者B则可以拿到3,4而无法拿到1,2。

如果你想直到自己是哪个消费者组的消息,可以拿到message的Attributes属性中的MessageGroupId字段

分区发送逻辑

 

对于 FIFO 队列,Amazon SQS 会在以下情况下修改队列中的分区数:

  • 如果当前请求速率接近或超过现有分区可以支持的速率,则会分配额外的分区,直到队列达到区域配额。有关配额的信息,请参阅与消息相关的配额

  • 如果当前分区的利用率较低,则分区的数量可能会减少。

去除重复消息

  1. fifo会将正文进行md5操作,然后比较md5的值来判断是否是重复消息。因此,fifo将相同正文的值来判断做重复消息

  2. fifo可以让你提供一个重复数据删除id,来判断消息是否重复(推荐)

与标准队列不同,FIFO 队列不会引入重复消息。FIFO 队列可帮助您避免向队列发送重复消息。如果您重试SendMessage操作时,Amazon SQS 不会将任何重复消息引入队列。

要配置重复数据删除,必须执行以下操作之一:

  1. 启用基于内容的重复数据删除。这将指示 Amazon SQS 使用 SHA-256 哈希通过消息的正文(而不是消息的属性)生成消息重复数据删除 ID。有关更多信息,请参阅CreateQueueGetQueueAttributes, 和SetQueueAttributes中的操作Amazon Simple Queue Service API 参考

  2. 为消息显式提供消息重复数据删除 ID (或查看序列号)。有关更多信息,请参阅SendMessageSendMessageBatch, 和ReceiveMessage中的操作Amazon Simple Queue Service API 参考

FIFO 队列允许生产者或使用者尝试多次重试:

  • 如果生产者检测到失败SendMessage操作时,它可以使用相同的消息重复数据消除 ID,根据需要重试发送多次。假设创建者在重复数据消除间隔过期之前至少收到一个确认,多次重试既不会影响消息的排序,也不会引入重复项。

  • 如果使用者检测到失败ReceiveMessage操作,它可以根据需要重试次数,使用相同的接收请求尝试 ID。假设使用者在可见性超时到期之前至少收到一个确认,多次重试不会影响消息的排序。

  • 当您收到具有消息组 ID 的消息时,除非您删除该消息或其变为可见,否则不会返回同一消息组 ID 的其他消息。

FIFO 队列中的重复数据删除过程具有时效性。在设计应用程序时,请确保创建者和使用者均可在客户端故障或网络中断的情况下进行恢复。

  • 创建者必须知道队列的重复数据删除时间。Amazon SQS 具有minimum5 分钟的重复数据删除时间,在重复数据删除时间间隔过期后重试 SendMessage 请求可能会将重复的消息引入队列中。例如,车辆中的移动设备将发送其顺序很重要的消息。如果车辆在接收确认前一段时间失去手机网络连接,则在重新获得手机网络连接之前重试请求可能产生重复项。

  • 使用者必须具有可见性超时,以便将在可见性超时过期之前无法处理消息的风险降至最低。您可通过调用 ChangeMessageVisibility 操作延长处理消息时的可见性超时。但是,如果可见性超时过期,其他使用者可立即开始处理消息,从而导致多次处理消息。要避免这种情况,请配置死信队列

长轮询短轮询

Amazon SQS 提供短轮询和长轮询以接收来自队列的消息。默认情况下,队列使用短轮询。

短轮询ReceiveMessage请求仅查询服务器的一个子集(基于加权随机分布),以查找可包含在响应中的消息。即使查询没有找到任何消息,Amazon SQS 也会立即发送响应。

长轮询ReceiveMessage请求查询所有服务器的消息。Amazon SQS 在收集至少一条可用消息(最多为可在请求中指定的最大消息数)后发送响应。仅当轮询等待时间过期时,Amazon SQS 才会发送空响应。

  • 长轮询使您能够在消息可用时立即使用 Amazon SQS 队列中的消息。

    • 要降低使用 Amazon SQS 的成本并减少空队列的空接收次数(对ReceiveMessage操作,不返回任何消息),请启用长轮询。有关更多信息,请参阅 。Amazon SQS 长轮询

    • 要提高轮询具有多次接收的多个线程的效率,请减少线程数。

    • 在大多数情况下,长轮询优于短轮询。

  • 短轮询会立即返回响应,即使轮询的 Amazon SQS 队列为空。

    • 要满足应立即响应 ReceiveMessage 请求的应用程序的要求,请使用短轮询。

    • 短轮询的计费与长轮询相同。

message

消息可见性

正确使用sqs的情况下,消息应该在消费后进行删除

sqs有个可见性的概念,它的作用是在队列级别防止消费者收到重复的消息。当消息被消费的时候,这条消息对其他消费者而言是不可见的,因此此时其他消费者消费不到这条消息。

但是如果这条消息没有被及时删除,在经过可见性超时这一时间段后,其他消费者就可以看见此消息,也就可以被其他消费者进行消费。因此可见性超时这个时间段可以根据业务进行设置,用于重试

 

消息的默认可见性超时为 30 秒。最短为 0 秒。最长时间为 12 小时

消息的其他属性

消息的其他属性,都会在Attributes的map中

比如ApproximateFirstReceiveTimestamp和SentTimestamp会返回一个int值,单位是毫秒的时间戳

const (
	// MessageSystemAttributeNameSenderId is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameSenderId = "SenderId"

	// MessageSystemAttributeNameSentTimestamp is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameSentTimestamp = "SentTimestamp"

	// MessageSystemAttributeNameApproximateReceiveCount is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameApproximateReceiveCount = "ApproximateReceiveCount"

	// MessageSystemAttributeNameApproximateFirstReceiveTimestamp is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameApproximateFirstReceiveTimestamp = "ApproximateFirstReceiveTimestamp"

	// MessageSystemAttributeNameSequenceNumber is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameSequenceNumber = "SequenceNumber"

	// MessageSystemAttributeNameMessageDeduplicationId is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameMessageDeduplicationId = "MessageDeduplicationId"

	// MessageSystemAttributeNameMessageGroupId is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameMessageGroupId = "MessageGroupId"

	// MessageSystemAttributeNameAwstraceHeader is a MessageSystemAttributeName enum value
	MessageSystemAttributeNameAwstraceHeader = "AWSTraceHeader"
)

const (
	// MessageSystemAttributeNameForSendsAwstraceHeader is a MessageSystemAttributeNameForSends enum value
	MessageSystemAttributeNameForSendsAwstraceHeader = "AWSTraceHeader"
)

const (
	// QueueAttributeNameAll is a QueueAttributeName enum value
	QueueAttributeNameAll = "All"

	// QueueAttributeNamePolicy is a QueueAttributeName enum value
	QueueAttributeNamePolicy = "Policy"

	// QueueAttributeNameVisibilityTimeout is a QueueAttributeName enum value
	QueueAttributeNameVisibilityTimeout = "VisibilityTimeout"

	// QueueAttributeNameMaximumMessageSize is a QueueAttributeName enum value
	QueueAttributeNameMaximumMessageSize = "MaximumMessageSize"

	// QueueAttributeNameMessageRetentionPeriod is a QueueAttributeName enum value
	QueueAttributeNameMessageRetentionPeriod = "MessageRetentionPeriod"

	// QueueAttributeNameApproximateNumberOfMessages is a QueueAttributeName enum value
	QueueAttributeNameApproximateNumberOfMessages = "ApproximateNumberOfMessages"

	// QueueAttributeNameApproximateNumberOfMessagesNotVisible is a QueueAttributeName enum value
	QueueAttributeNameApproximateNumberOfMessagesNotVisible = "ApproximateNumberOfMessagesNotVisible"

	// QueueAttributeNameCreatedTimestamp is a QueueAttributeName enum value
	QueueAttributeNameCreatedTimestamp = "CreatedTimestamp"

	// QueueAttributeNameLastModifiedTimestamp is a QueueAttributeName enum value
	QueueAttributeNameLastModifiedTimestamp = "LastModifiedTimestamp"

	// QueueAttributeNameQueueArn is a QueueAttributeName enum value
	QueueAttributeNameQueueArn = "QueueArn"

	// QueueAttributeNameApproximateNumberOfMessagesDelayed is a QueueAttributeName enum value
	QueueAttributeNameApproximateNumberOfMessagesDelayed = "ApproximateNumberOfMessagesDelayed"

	// QueueAttributeNameDelaySeconds is a QueueAttributeName enum value
	QueueAttributeNameDelaySeconds = "DelaySeconds"

	// QueueAttributeNameReceiveMessageWaitTimeSeconds is a QueueAttributeName enum value
	QueueAttributeNameReceiveMessageWaitTimeSeconds = "ReceiveMessageWaitTimeSeconds"

	// QueueAttributeNameRedrivePolicy is a QueueAttributeName enum value
	QueueAttributeNameRedrivePolicy = "RedrivePolicy"

	// QueueAttributeNameFifoQueue is a QueueAttributeName enum value
	QueueAttributeNameFifoQueue = "FifoQueue"

	// QueueAttributeNameContentBasedDeduplication is a QueueAttributeName enum value
	QueueAttributeNameContentBasedDeduplication = "ContentBasedDeduplication"

	// QueueAttributeNameKmsMasterKeyId is a QueueAttributeName enum value
	QueueAttributeNameKmsMasterKeyId = "KmsMasterKeyId"

	// QueueAttributeNameKmsDataKeyReusePeriodSeconds is a QueueAttributeName enum value
	QueueAttributeNameKmsDataKeyReusePeriodSeconds = "KmsDataKeyReusePeriodSeconds"
)

 

详细可以查看

GetQueueAttributes - Amazon Simple Queue Service

go-example

aws-doc-sdk-examples/go/example_code/sqs at master · awsdocs/aws-doc-sdk-examples


版权声明:本文为ljt735029684原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。