Fubaredness Is Contagious

Dmitriy Samovskiy’s Blog

Ruby + AMQP + RabbitMQ Example

June 24th, 2008 · by Dmitriy (@somic on Twitter) · 5 Comments

In this post I would like to show how one can exchange messages using AMQP protocol from Ruby, using RabbitMQ as a broker. I posted the original version of this script to rabbitmq-discuss mailing list back in September 2007.

Prerequesites:

  • RabbitMQ broker configured, up and running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).
  • Apache QPid Ruby library installed within RUBYPATH (svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/ruby)
  • AMQP specification XML form AMQP official site saved as /etc/amqp0-8.xml

You can also download this script from here.

#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby
#
#
__doc__ = %q(
 
disttailf.rb - distributed "tail -f"
 
Aggregates "tail -f" output from multiple machines and multiple files
into a single RabbitMQ pubsub queue (kind of splunk's log consolidation
function)
 
Usage:
Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ...
Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c
 
)
 
require 'qpid'
require 'socket'
 
def consumer(client, ch)
    myqueue = ch.queue_declare()
    ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic',
                    :routing_key=>'disttailf.#')
    cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true)
    ruby_queue = client.queue(cons.consumer_tag)
 
    while true
        raise "Rabbitmq broker disconnected" if client.closed?
        begin
          msg = ruby_queue.pop(non_block=true)
          puts "== #{msg.content.headers[:headers]} " \
                "#{msg.routing_key.split('.')[-1]}"
          puts msg.content.body
        rescue
          sleep(0.5)
        end
    end
end
 
def producer(client, ch, filenames)
    rkey = "disttailf." + Socket.gethostname.split('.')[-1]
    tail_f(filenames) do |filename, line|
        h = {'sent' => Time.now.to_i, 'filename' => filename }
        c = Qpid::Content.new({:headers=>h}, line)
        ch.basic_publish(:routing_key=>rkey, :content=>c,
                            :exchange=>'amq.topic')
        puts "#{filename}: #{line}"
    end
end
 
def tail_f(filenames, &block)
    filedict = Hash.new
    filenames.each { |f| filedict[f] = open_or_nil(f) }
    reopen_counter = 0
    while true:
        if reopen_counter > 120
            reopen_counter = 0
            filenames.reject { |f| filedict[f] }.each {
                |f| filedict[f] = open_or_nil(f) }
        end
 
        filedict.values.reject { |f| not f }.each do |f|
            begin
                raise "trunc" unless File.stat(f.path).size >= f.tell
            rescue
                $stderr << "#{f.path}: removed or truncated\n"
                f.close
                filedict[f.path] = nil
                next
            end
 
            begin
              block.call(f.path,f.readline) while true
            rescue EOFError
              true
            end
        end
 
        reopen_counter += 1
        sleep(0.5)
   end # while true
end
 
def open_or_nil(filename)
    begin
        File.open(filename)
    rescue
        nil
    end
end
 
if __FILE__ == $0
    require 'getoptlong'
 
    server = '127.0.0.1'
    port = 5672
    specxml = '/etc/amqp0-8.xml'
    acts_as_consumer = false
 
    opts = GetoptLong.new(
        ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT],
        ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
        ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT],
        ['--consume', '-c', GetoptLong::NO_ARGUMENT])
    opts.each do |opt,arg|
      case opt
        when '--server'
            server = arg
        when '--port'
            port = arg.to_i
        when '--specxml'
            specxml = arg
        when '--consume'
            acts_as_consumer = true
      end
    end
 
    # set up connection to rabbitmq broker
    client = Qpid::Client.new(server, port, spec=Spec.load(specxml))
    client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" })
    ch = client.channel(1)
    ch.channel_open()
 
    if acts_as_consumer
        consumer(client, ch)
    else
        if ARGV.length == 0
            puts __doc__
            raise "List of file names is empty - nothing to do"
        end
        producer(client, ch, ARGV)
    end
 
end

Tags: rabbitmq · ruby

Related posts:

5 responses so far ↓

  • 1 tinomen // Sep 5, 2008 at 10:13 pm

    Should the producer be tailing to the console?
    I’m getting the following error:

    ./disttailf.rb -x /usr/local/qpid/specs/amqp.0-8.xml -s 127.0.0.1 -p 5672 -c
    CONNECTION CLOSED: 501, FRAME_ERROR – cannot decode <>, 60, 20
    writer Qpid::Closed
    /usr/local/qpid/ruby/qpid/queue.rb:41:in `pop’: Qpid::Closed (Qpid::Closed)
    from /usr/local/qpid/ruby/qpid/peer.rb:208:in `invoke’
    from /usr/local/qpid/ruby/qpid/peer.rb:190:in `method_missing’
    from ./disttailf.rb:25:in `consumer’
    from ./disttailf.rb:126

  • 2 Dmitriy // Sep 6, 2008 at 11:01 am

    It looks to me like you are using the spec XML which came with QPid. Please download the official spec XML form amqp.org (see the link in my post above).

  • 3 tinomen // Sep 9, 2008 at 10:20 am

    right on. I had both, but was using the wrong one.

    Have you played with the ruby ampq client on github? Any Thoughts?

  • 4 Dmitriy // Sep 9, 2008 at 10:24 am

    No, haven’t had a chance to play with it yet, but I am hearing it’s very good.

  • 5 darxriggs // Apr 22, 2009 at 4:34 pm

    instead of…
    def open_or_nil(filename)
    begin
    File.open(filename)
    rescue
    nil
    end
    end
    …you can simple write…
    File.open(filename) rescue nil