Class Rufus::SQS::QueueService

  1. lib/rufus/sqs.rb
Parent: Object

As the name implies.

Constants

AWS_VERSION = "2006-04-01"
DEFAULT_QUEUE_HOST = "queue.amazonaws.com"

Public class methods

new (queue_host=nil)
[show source]
    # File lib/rufus/sqs.rb, line 96
96:     def initialize (queue_host=nil)
97: 
98:       @queue_host = queue_host || DEFAULT_QUEUE_HOST
99:     end

Public instance methods

create_queue (queue_name)

Creates a queue.

If the queue name doesn’t comply with SQS requirements for it, an error will be raised.

[show source]
     # File lib/rufus/sqs.rb, line 128
128:     def create_queue (queue_name)
129: 
130:       doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}"
131: 
132:       doc.elements.each("//QueueUrl") do |e|
133:         return e.text.to_s
134:       end
135:     end
delete_message (queue, message_id)

Deletes a given message.

The queue might be a queue name (String) or a Queue instance.

[show source]
     # File lib/rufus/sqs.rb, line 229
229:     def delete_message (queue, message_id)
230: 
231:       queue = resolve_queue(queue)
232: 
233:       path = "#{queue.path}/#{message_id}"
234:       #path = "#{queue.path}/#{CGI::escape(message_id)}"
235: 
236:       doc = do_action :delete, queue.host, path
237: 
238:       SQS::get_element_text(doc, "//StatusCode") == "Success"
239:     end
delete_queue (queue, force=false)

Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method flush_queue

If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.

[show source]
     # File lib/rufus/sqs.rb, line 280
280:     def delete_queue (queue, force=false)
281: 
282:       queue = resolve_queue(queue)
283: 
284:       flush_queue(queue) if force
285: 
286:       begin
287: 
288:         doc = do_action :delete, @queue_host, queue.path
289: 
290:       rescue Exception => e
291: 
292:         return false if e.message.match "^400 .*$"
293:       end
294: 
295:       SQS::get_element_text(doc, "//StatusCode") == "Success"
296:     end
flush_queue (queue)

Use with care !

Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.

A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.

The queue might be a queue name (String) or a Queue instance.

[show source]
     # File lib/rufus/sqs.rb, line 253
253:     def flush_queue (queue)
254: 
255:       count = 0
256: 
257:       loop do
258: 
259:         l = get_messages queue, :timeout => 0, :count => 255
260: 
261:         break if l.length < 1
262: 
263:         l.each do |m|
264:           m.delete
265:           count += 1
266:         end
267:       end
268: 
269:       count
270:     end
get_message (queue, message_id)

Retrieves a single message from a queue. Returns an instance of Message.

The queue might be a queue name (String) or a Queue instance.

[show source]
     # File lib/rufus/sqs.rb, line 208
208:     def get_message (queue, message_id)
209: 
210:       queue = resolve_queue(queue)
211: 
212:       path = "#{queue.path}/#{message_id}"
213: 
214:       begin
215:         doc = do_action :get, queue.host, path
216:         Message.new(queue, doc.root.elements[1])
217:       rescue Exception => e
218:         #puts e.message
219:         return nil if e.message.match "^404 .*$"
220:         raise e
221:       end
222:     end
get_messages (queue, params={})

Retrieves a bunch of messages from a queue. Returns a list of Message instances.

There are actually two optional params that this method understands :

  • :timeout the duration in seconds of the message visibility in the
    queue
  • :count the max number of message to be returned by this call

The queue might be a queue name (String) or a Queue instance.

[show source]
     # File lib/rufus/sqs.rb, line 176
176:     def get_messages (queue, params={})
177: 
178:       queue = resolve_queue(queue)
179: 
180:       path = "#{queue.path}/front"
181: 
182:       path += "?" if params.size > 0
183: 
184:       timeout = params[:timeout]
185:       count = params[:count]
186: 
187:       path += "VisibilityTimeout=#{timeout}" if timeout
188:       path += "&" if timeout and count
189:       path += "NumberOfMessages=#{count}" if count
190: 
191:       doc = do_action :get, queue.host, path
192: 
193:       messages = []
194: 
195:       doc.elements.each("//Message") do |me|
196:         messages << Message.new(queue, me)
197:       end
198: 
199:       messages
200:     end
get_queue (queue_name)

Given a queue name, a Queue instance is returned.

[show source]
     # File lib/rufus/sqs.rb, line 301
301:     def get_queue (queue_name)
302: 
303:       l = list_queues(queue_name)
304: 
305:       l.each do |q|
306:         return q if q.name == queue_name
307:       end
308: 
309:       #return nil
310:       raise "found no queue named '#{queue_name}'"
311:     end
list_queues (prefix=nil)

Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.

[show source]
     # File lib/rufus/sqs.rb, line 106
106:     def list_queues (prefix=nil)
107: 
108:       queues = []
109: 
110:       path = "/"
111:       path = "#{path}?QueueNamePrefix=#{prefix}" if prefix
112: 
113:       doc = do_action :get, @queue_host, path
114: 
115:       doc.elements.each("//QueueUrl") do |e|
116:         queues << Queue.new(self, e)
117:       end
118: 
119:       return queues
120:     end
put_message (queue, content)

Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).

The queue might be a queue name (String) or a Queue instance.

[show source]
     # File lib/rufus/sqs.rb, line 144
144:     def put_message (queue, content)
145: 
146:       queue = resolve_queue(queue)
147: 
148:       doc = do_action :put, queue.host, "#{queue.path}/back", content
149: 
150:       #puts doc.to_s
151: 
152:       #status_code = SQS::get_element_text(doc, '//StatusCode')
153:       #message_id = SQS::get_element_text(doc, '//MessageId')
154:       #request_id = SQS::get_element_text(doc, '//RequestId')
155:       #{ :status_code => status_code,
156:       #  :message_id => message_id,
157:       #  :request_id => request_id }
158: 
159:       SQS::get_element_text(doc, '//MessageId')
160:     end
send_message (queue, content)

Alias for put_message

Protected instance methods

do_action (action, host, path, content=nil)
[show source]
     # File lib/rufus/sqs.rb, line 326
326:       def do_action (action, host, path, content=nil)
327: 
328:         date = Time.now.httpdate
329: 
330:         h = {}
331: 
332:         h['AWS-Version'] = AWS_VERSION
333:         h['Date'] = date
334:         h['Content-type'] = 'text/plain'
335: 
336:         h['Content-length'] = content.length.to_s if content
337: 
338:         h['Authorization'] = generate_auth_header(
339:           action, path, date, "text/plain")
340: 
341:         res = Rufus::Verbs::EndPoint.request(
342:           action,
343:           :host => host,
344:           :path => path,
345:           :d => content,
346:           :headers => h)
347: 
348:         #case res
349:         #when Net::HTTPSuccess, Net::HTTPRedirection
350:         #  doc = REXML::Document.new(res.read_body)
351:         #else
352:         #  doc = res.error!
353:         #end
354:         doc = if res.is_a?(Net::HTTPSuccess)
355:           REXML::Document.new(res.read_body)
356:         else
357:           res.error!
358:         end
359: 
360:         raise_errors doc
361: 
362:         doc
363:       end
generate_auth_header (action, path, date, content_type)

Generates the ‘AWS x:y” authorization header value.

[show source]
     # File lib/rufus/sqs.rb, line 384
384:       def generate_auth_header (action, path, date, content_type)
385: 
386:         s = ""
387:         s << action.to_s.upcase
388:         s << "\n"
389: 
390:         #s << Base64.encode64(Digest::MD5.digest(content)).strip          #  if content
391:           #
392:           # documented but not necessary (not working)
393:         s << "\n"
394: 
395:         s << content_type
396:         s << "\n"
397: 
398:         s << date
399:         s << "\n"
400: 
401:         i = path.index '?'
402:         path = path[0..i-1] if i
403:         s << path
404: 
405:         #puts ">>>#{s}<<<"
406: 
407:         digest = OpenSSL::Digest::Digest.new 'sha1'
408: 
409:         key = ENV['AMAZON_SECRET_ACCESS_KEY']
410: 
411:         raise "No $AMAZON_SECRET_ACCESS_KEY env variable found" \
412:           unless key
413: 
414:         sig = OpenSSL::HMAC.digest(digest, key, s)
415:         sig = Base64.encode64(sig).strip
416: 
417:         "AWS #{ENV['AMAZON_ACCESS_KEY_ID']}:#{sig}"
418:       end
raise_errors (doc)

Scans the SQS XML reply for potential errors and raises an error if he encounters one.

[show source]
     # File lib/rufus/sqs.rb, line 369
369:       def raise_errors (doc)
370: 
371:         doc.elements.each("//Error") do |e|
372: 
373:           code = get_element_text(e, "Code")
374:           return unless code
375: 
376:           message = get_element_text(e, "Message")
377:           raise "Rufus::SQS::#{code} : #{m.text.to_s}"
378:         end
379:       end
resolve_queue (queue)

‘queue’ might be a Queue instance or a queue name. If it’s a Queue instance, it is immediately returned, else the Queue instance is looked up and returned.

[show source]
     # File lib/rufus/sqs.rb, line 320
320:       def resolve_queue (queue)
321: 
322:         return queue if queue.kind_of?(Queue)
323:         get_queue queue.to_s
324:       end