You are viewing documentation for version 1 of the AWS SDK for Ruby. Version 2 documentation can be found here.

Class: AWS::SQS::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/sqs/queue.rb

Overview

Represents an Amazon SQS Queue.

Examples:

Sending a message


msg = queue.send_message("HELLO")
puts "Sent message: #{msg.id}"

Polling for messages indefinitely


queue.poll do |msg|
  puts "Got message: #{msg.body}"
end

Defined Under Namespace

Classes: SentMessage

Constant Summary

DEFAULT_POLL_INTERVAL =
Deprecated.

No longer used by #poll

The default number of seconds to wait between polling requests for new messages.

1
DEFAULT_WAIT_TIME_SECONDS =

The default number of seconds to pass in as the SQS long polling value (:wait_time_seconds) in #receive_message.

Since:

  • 1.8.0

15

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#urlString (readonly)

Returns The queue URL.

Returns:

  • (String)

    The queue URL.



48
49
50
# File 'lib/aws/sqs/queue.rb', line 48

def url
  @url
end

Instance Method Details

#==(other) ⇒ Boolean Also known as: eql?

Returns true if the other queue has the same url.

Returns:

  • (Boolean)

    Returns true if the other queue has the same url.



684
685
686
# File 'lib/aws/sqs/queue.rb', line 684

def ==(other)
  other.kind_of?(Queue) and other.url == url
end

#approximate_number_of_messagesInteger Also known as: visible_messages

Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:



319
320
321
# File 'lib/aws/sqs/queue.rb', line 319

def approximate_number_of_messages
  get_attribute("ApproximateNumberOfMessages").to_i
end

#approximate_number_of_messages_delayedInteger

Returns an approximate count of messages delayed.

Returns:

  • (Integer)

    Returns an approximate count of messages delayed.



428
429
430
# File 'lib/aws/sqs/queue.rb', line 428

def approximate_number_of_messages_delayed
  get_attribute("ApproximateNumberOfMessagesDelayed").to_i
end

#approximate_number_of_messages_not_visibleInteger Also known as: invisible_messages

Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.



328
329
330
# File 'lib/aws/sqs/queue.rb', line 328

def approximate_number_of_messages_not_visible
  get_attribute("ApproximateNumberOfMessagesNotVisible").to_i
end

#arnString

Returns The queue's Amazon resource name (ARN).

Returns:

  • (String)

    The queue's Amazon resource name (ARN).



433
434
435
# File 'lib/aws/sqs/queue.rb', line 433

def arn
  @arn ||= get_attribute("QueueArn")
end

#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil

Overloads:

  • #batch_change_visibility(visibility_timeout, *messages) ⇒ nil

    Accepts a single :visibility_timeout value and a list of messages (ReceivedMessage objects or receipt handle strings). This form of the method is useful when you want to set the same timeout value for each message.

    queue.batch_change_visibility(10, messages)

    Parameters:

    • visibility_timeout (Integer)

      The new value for the message's visibility timeout (in seconds).

    • message (ReceivedMessage, String)

      A list of up to 10 messages to change the visibility timeout for.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

  • #batch_change_visibility(*messages_with_timeouts) ⇒ nil

    Accepts a list of hashes. Each hash should provide the visibility timeout and message (a ReceivedMessage object or the recipt handle string).

    Use this form when each message needs a different visiblity timeout.

    messages = [] messages << { :message => 'handle1', :visibility_timeout => 5 } messages << { :message => 'handle2', :visibility_timeout => 10 }

    queue.batch_change_visibility(*messages)

    Parameters:

    • message (Hash)

      A list hashes, each with a :visibility_timeout and a :message.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

Raises:



649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
# File 'lib/aws/sqs/queue.rb', line 649

def batch_change_visibility *args

  args = args.flatten

  if args.first.is_a?(Integer)
    timeout = args.shift
    messages = args.collect{|m| [m, timeout] }
  else
    messages = args.collect{|m| [m[:message], m[:visibility_timeout]] }
  end

  entries = []
  messages.each do |msg,timeout|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << {
      :id => entries.size.to_s,
      :receipt_handle => handle,
      :visibility_timeout => timeout,
    }
  end

  response = client.change_message_visibility_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchChangeVisibilityError.new(failures) unless
    failures.empty?

  nil

end

#batch_delete(*messages) ⇒ nil

Parameters:

  • messages (ReceivedMessage, String)

    A list of up to 10 messages to delete. Each message should be a ReceivedMessage object or a received message handle (string).

Returns:

  • (nil)

Raises:

  • (Errors::BatchDeleteSend)

    Raised when one or more of the messages failed to delete. The raised error has a list of the failures.



588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/aws/sqs/queue.rb', line 588

def batch_delete *messages

  entries = []
  messages.flatten.each_with_index do |msg,n|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << { :id => n.to_s, :receipt_handle => handle }
  end

  response = client.delete_message_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchDeleteError.new(failures) unless failures.empty?

  nil

end

#batch_send(*messages) ⇒ Array<SentMessage>

Sends a batch of up to 10 messages in a single request.

queue.send_messages('message-1', 'message-2')

You can also set an optional delay for all of the messages:

# delay all messages 15 minutes
queue.batch_send(msg1, msg2, :delay_seconds => 900)

If you need to set a custom delay for each message you can pass hashes:

messages = []
messages << { :message_body => 'msg1', :delay_seconds => 60 }
messages << { :message_body => 'msg2', :delay_seconds => 30 }

queue.batch_send(messages)

Parameters:

  • messages (String, Hash)

    A list of messages. Each message should be a string, or a hash with a :message_body, and optionally :delay_seconds.

Returns:

  • (Array<SentMessage>)

    Returns an array of sent message objects. Each object responds to #message_id and #md5_of_message_body. The message id is generated by Amazon SQS.

Raises:

  • (Errors::BatchSendError)

    Raises this error when one or more of the messages failed to send, but others did not. On the raised object you can access a list of the messages that failed, and a list of messages that succeeded.



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/aws/sqs/queue.rb', line 533

def batch_send *messages

  entries = messages.flatten

  unless entries.first.is_a?(Hash)
    options = entries.last.is_a?(Hash) ? entries.pop : {}
    entries = entries.collect{|msg| { :message_body => msg } }
    if delay = options[:delay_seconds]
      entries.each {|entry| entry[:delay_seconds] = delay }
    end
  end

  entries.each_with_index {|entry,n| entry[:id] = n.to_s }

  client_opts = {}
  client_opts[:queue_url] = url
  client_opts[:entries] = entries

  response = client.send_message_batch(client_opts)

  failed = batch_failures(entries, response, true)

  checksum_failed = verify_send_message_batch_checksum entries, response

  sent = response[:successful].collect do |sent|
    msg = SentMessage.new
    msg.message_id = sent[:message_id]
    msg.md5 = sent[:md5_of_message_body]
    msg
  end

  if !failed.empty? && !checksum_failed.empty?
    send_error = Errors::BatchSendError.new(sent, failed)
    checksum_error = Errors::ChecksumError.new(checksum_failed)
    raise Errors::BatchSendMultiError.new send_error, checksum_error
  elsif !failed.empty?
    raise Errors::BatchSendError.new(sent, failed) unless failed.empty?
  elsif !checksum_failed.empty?
    raise Errors::ChecksumError.new(checksum_failed)
  end

  sent

end

#created_timestampTime

Returns The time when the queue was created.

Returns:

  • (Time)

    The time when the queue was created.



356
357
358
# File 'lib/aws/sqs/queue.rb', line 356

def created_timestamp
  Time.at(get_attribute("CreatedTimestamp").to_i)
end

#delay_secondsInteger

Returns Gets the current default delay for messages sent to the queue.

Returns:

  • (Integer)

    Gets the current default delay for messages sent to the queue.



402
403
404
# File 'lib/aws/sqs/queue.rb', line 402

def delay_seconds
  get_attribute("DelaySeconds").to_i
end

#delay_seconds=(seconds) ⇒ Object

Sets the default delay for messages sent to the queue.

Parameters:

  • seconds (Integer)

    How many seconds a message will be delayed.



408
409
410
# File 'lib/aws/sqs/queue.rb', line 408

def delay_seconds= seconds
  set_attribute("DelaySeconds", seconds.to_s)
end

#deletenil

Deletes the queue, regardless of whether it is empty.

When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.

Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.

Returns:

  • (nil)


67
68
69
70
# File 'lib/aws/sqs/queue.rb', line 67

def delete
  client.delete_queue(:queue_url => url)
  nil
end

#exists?Boolean

Note:

This may raise an exception if you don't have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.

Returns True if the queue exists.

Returns:

  • (Boolean)

    True if the queue exists.



443
444
445
446
447
448
449
450
# File 'lib/aws/sqs/queue.rb', line 443

def exists?
  client.get_queue_attributes(:queue_url => url,
                              :attribute_names => ["QueueArn"])
rescue Errors::NonExistentQueue, Errors::InvalidAddress
  false
else
  true
end

#last_modified_timestampTime

Returns The time when the queue was last changed.

Returns:

  • (Time)

    The time when the queue was last changed.



361
362
363
# File 'lib/aws/sqs/queue.rb', line 361

def last_modified_timestamp
  Time.at(get_attribute("LastModifiedTimestamp").to_i)
end

#maximum_message_sizeInteger

Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.

Returns:

  • (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it.



367
368
369
# File 'lib/aws/sqs/queue.rb', line 367

def maximum_message_size
  get_attribute("MaximumMessageSize").to_i
end

#maximum_message_size=(size) ⇒ Object

Sets the maximum message size for the queue.

Parameters:

  • size (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it. This must be an integer from 1024 bytes (1KB) up to 65536 bytes (64KB). The default for this attribute is 8192 (8KB).

Returns:

  • Retuns the passed size argument.



378
379
380
# File 'lib/aws/sqs/queue.rb', line 378

def maximum_message_size=(size)
  set_attribute("MaximumMessageSize", size.to_s)
end

#message_retention_periodInteger

Returns The number of seconds Amazon SQS retains a message.

Returns:

  • (Integer)

    The number of seconds Amazon SQS retains a message.



384
385
386
# File 'lib/aws/sqs/queue.rb', line 384

def message_retention_period
  get_attribute("MessageRetentionPeriod").to_i
end

#message_retention_period=(period) ⇒ Object

Sets the message retention period for the queue

Parameters:

  • period (Integer)

    The number of seconds Amazon SQS retains a message. Must be an integer from 3600 (1 hour) to 1209600 (14 days). The default for this attribute is 345600 (4 days).

Returns:

  • Returns the passed period argument.



395
396
397
398
# File 'lib/aws/sqs/queue.rb', line 395

def message_retention_period=(period)
  set_attribute("MessageRetentionPeriod", period.to_s)
  period
end

#policyPolicy

Returns the current queue policy if there is one. Returns nil otherwise.

Returns:

  • (Policy)

    Returns the current queue policy if there is one. Returns nil otherwise.



470
471
472
473
474
475
476
477
478
479
# File 'lib/aws/sqs/queue.rb', line 470

def policy
  if policy_json = get_attribute('Policy')
    policy = SQS::Policy.from_json(policy_json)
    policy.extend(PolicyProxy)
    policy.queue = self
    policy
  else
    nil
  end
end

#policy=(policy) ⇒ nil

Set the policy on this queue.

If you pass nil or an empty string then it will have the same effect as deleting the policy.

Parameters:

  • policy

    The policy to set. This policy can be a Policy object, a json policy string, or any other object that responds with a policy string when it received #to_json.

Returns:

  • (nil)


492
493
494
495
496
497
498
499
500
# File 'lib/aws/sqs/queue.rb', line 492

def policy= policy
  policy_string = case policy
  when nil, '' then ''
  when String  then policy
  else policy.to_json
  end
  set_attribute('Policy', policy_string)
  nil
end

#poll(opts = {}) {|message| ... } ⇒ nil

Polls continually for messages. For example, you can use this to poll indefinitely:

queue.poll { |msg| puts msg.body }

Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:

queue.poll(:initial_timeout => false,
           :idle_timeout => 10) { |msg| puts msg.body }

As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for polling.

Options Hash (opts):

  • :wait_time_seconds (Integer)

    The number of seconds the service should wait for a response when requesting a new message. Defaults to DEFAULT_WAIT_TIME_SECONDS. Use nil to use the queue's global long polling wait time setting. See #wait_time_seconds to set the global long poll setting on the queue.

  • :idle_timeout (Integer)

    The maximum number of seconds to spend polling while no messages are being returned. By default this method polls indefinitely whether messages are received or not.

  • :initial_timeout (Integer)

    The maximum number of seconds to spend polling before the first message is received. This option defaults to the value of :idle_timeout. You can specify false to poll indefinitely for the first message when :idle_timeout is set.

  • :batch_size (Integer)

    The maximum number of messages to retrieve in a single request. By default messages are received one at a time. Valid values: integers from 1 to 10.

  • :visibility_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)
    • :sender_id
    • :sent_at
    • :receive_count
    • :first_received_at

    See ReceivedMessage for documentation on each attribute's meaning.

  • :poll_interval (Float, Integer)

    As of v1.7.2, this option is no longer used. See the :wait_time_seconds option for long polling instead.

Yield Parameters:

Returns:

  • (nil)


292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/aws/sqs/queue.rb', line 292

def poll(opts = {}, &block)
  opts[:limit] = opts.delete(:batch_size) if
    opts.key?(:batch_size)

  opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless
    opts.has_key?(:wait_time_seconds)

  last_message_at = Time.now
  got_first = false
  loop do
    got_msg = false
    receive_messages(opts) do |message|
      got_msg = got_first = true
      last_message_at = Time.now
      yield(message)
    end
    unless got_msg
      return if hit_timeout?(got_first, last_message_at, opts)
    end
  end
  nil
end

#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages

Note:

Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.

Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for receiving messages.

Options Hash (opts):

  • :limit (Integer)

    The maximum number of messages to receive. By default this is 1, and the return value is a single message object. If this options is specified and is not 1, the return value is an array of message objects; however, the array may contain fewer objects than you requested. Valid values: integers from 1 to 10.

    Not necessarily all the messages in the queue are returned (for more information, see the preceding note about machine sampling).

  • :wait_time_seconds (Integer)

    The number of seconds the service should wait for a response when requesting a new message. Defaults to the #wait_time_seconds attribute defined on the queue. See #wait_time_seconds to set the global long poll setting on the queue.

  • :visibility_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)
    • :sender_id
    • :sent_at
    • :receive_count
    • :first_received_at

    See ReceivedMessage for documentation on each attribute's meaning.

  • :message_attribute_names (Array<String>)

    A list of message attribute names to receive. These will be available on the ReceivedMessage as #message_attributes.

Yield Parameters:

Returns:

  • (ReceivedMessage)

    Returns the received message (or messages) only if a block is not given to this method.

Raises:



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/aws/sqs/queue.rb', line 200

def receive_message(opts = {}, &block)
  resp = client.receive_message(receive_opts(opts))

  failed = verify_receive_message_checksum resp

  raise Errors::ChecksumError.new(failed) unless failed.empty?

  messages = resp[:messages].map do |m|
    ReceivedMessage.new(self, m[:message_id], m[:receipt_handle],
      :body => m[:body],
      :md5 => m[:md5_of_body],
      :request_id => (resp[:response_metadata] || {})[:request_id],
      :attributes => m[:attributes],
      :message_attributes => m[:message_attributes])
  end

  if block
    call_message_block(messages, block)
  elsif opts[:limit] && opts[:limit] != 1
    messages
  else
    messages.first
  end
end

#send_message(body, options = {}) ⇒ SentMessage

Delivers a message to this queue.

Parameters:

  • body (String)

    The message to send. The maximum allowed message size is 64 KB. The message may only contain Unicode characters from the following list, according to the W3C XML specification (for more information, go to http://www.w3.org/TR/REC-xml/#charsets). If you send any characters not included in the list, your request will be rejected.

    • #x9
    • #xA
    • #xD
    • #x20 to #xD7FF
    • #xE000 to #xFFFD
    • #x10000 to #x10FFFF
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :delay_seconds (Integer)

    The number of seconds to delay the message. The message will become available for processing after the delay time has passed. If you don't specify a value, the default value for the queue applies. Should be from 0 to 900 (15 mins).

Returns:

  • (SentMessage)

    An object containing information about the message that was sent.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/aws/sqs/queue.rb', line 119

def send_message body, options = {}

  client_opts = options.dup
  client_opts[:queue_url] = url
  client_opts[:message_body] = body

  response = client.send_message(client_opts)

  msg = SentMessage.new
  msg.message_id = response[:message_id]
  msg.request_id = (response[:response_metadata] || {})[:request_id]
  msg.md5 = response[:md5_of_message_body]

  verify_send_message_checksum body, msg.md5

  msg

end

#visibility_timeoutInteger

Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.



337
338
339
# File 'lib/aws/sqs/queue.rb', line 337

def visibility_timeout
  get_attribute("VisibilityTimeout").to_i
end

#visibility_timeout=(timeout) ⇒ Object

Sets the visibility timeout for the queue.

Parameters:

  • timeout (Integer)

    The length of time (in seconds) that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Valid values: integers from 0 to 43200 (12 hours).

Returns:

  • Returns the value passed as a timeout.



350
351
352
353
# File 'lib/aws/sqs/queue.rb', line 350

def visibility_timeout=(timeout)
  set_attribute("VisibilityTimeout", timeout.to_s)
  timeout
end

#wait_time_secondsInteger

Returns Gets the number of seconds the service will wait for a response when requesting a new message

Returns:

  • (Integer)

    Gets the number of seconds the service will wait for a response when requesting a new message

Since:

  • 1.8.0



415
416
417
# File 'lib/aws/sqs/queue.rb', line 415

def wait_time_seconds
  get_attribute("ReceiveMessageWaitTimeSeconds").to_i
end

#wait_time_seconds=(seconds) ⇒ Object

Sets the number of seconds that the service should wait for a response when requesting a new message

Parameters:

  • seconds (Integer)

    How many seconds to wait for a response

Since:

  • 1.8.0



423
424
425
# File 'lib/aws/sqs/queue.rb', line 423

def wait_time_seconds= seconds
  set_attribute("ReceiveMessageWaitTimeSeconds", seconds.to_s)
end