In this post I would like to show how one can exchange messages using AMQP protocol from Ruby, using RabbitMQ as a broker. I posted the original version of this script to rabbitmq-discuss mailing list back in September 2007.
Prerequesites:
- RabbitMQ broker configured, up and running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).
- Apache QPid Ruby library installed within RUBYPATH (svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/ruby)
- AMQP specification XML form AMQP official site saved as /etc/amqp0-8.xml
You can also download this script from here.
#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby # # __doc__ = %q( disttailf.rb - distributed "tail -f" Aggregates "tail -f" output from multiple machines and multiple files into a single RabbitMQ pubsub queue (kind of splunk's log consolidation function) Usage: Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ... Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c ) require 'qpid' require 'socket' def consumer(client, ch) myqueue = ch.queue_declare() ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic', :routing_key=>'disttailf.#') cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true) ruby_queue = client.queue(cons.consumer_tag) while true raise "Rabbitmq broker disconnected" if client.closed? begin msg = ruby_queue.pop(non_block=true) puts "== #{msg.content.headers[:headers]} " \ "#{msg.routing_key.split('.')[-1]}" puts msg.content.body rescue sleep(0.5) end end end def producer(client, ch, filenames) rkey = "disttailf." + Socket.gethostname.split('.')[-1] tail_f(filenames) do |filename, line| h = {'sent' => Time.now.to_i, 'filename' => filename } c = Qpid::Content.new({:headers=>h}, line) ch.basic_publish(:routing_key=>rkey, :content=>c, :exchange=>'amq.topic') puts "#{filename}: #{line}" end end def tail_f(filenames, &block) filedict = Hash.new filenames.each { |f| filedict[f] = open_or_nil(f) } reopen_counter = 0 while true: if reopen_counter > 120 reopen_counter = 0 filenames.reject { |f| filedict[f] }.each { |f| filedict[f] = open_or_nil(f) } end filedict.values.reject { |f| not f }.each do |f| begin raise "trunc" unless File.stat(f.path).size >= f.tell rescue $stderr << "#{f.path}: removed or truncated\n" f.close filedict[f.path] = nil next end begin block.call(f.path,f.readline) while true rescue EOFError true end end reopen_counter += 1 sleep(0.5) end # while true end def open_or_nil(filename) begin File.open(filename) rescue nil end end if __FILE__ == $0 require 'getoptlong' server = '127.0.0.1' port = 5672 specxml = '/etc/amqp0-8.xml' acts_as_consumer = false opts = GetoptLong.new( ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT], ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT], ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT], ['--consume', '-c', GetoptLong::NO_ARGUMENT]) opts.each do |opt,arg| case opt when '--server' server = arg when '--port' port = arg.to_i when '--specxml' specxml = arg when '--consume' acts_as_consumer = true end end # set up connection to rabbitmq broker client = Qpid::Client.new(server, port, spec=Spec.load(specxml)) client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" }) ch = client.channel(1) ch.channel_open() if acts_as_consumer consumer(client, ch) else if ARGV.length == 0 puts __doc__ raise "List of file names is empty - nothing to do" end producer(client, ch, ARGV) end end




5 responses so far ↓
1 tinomen // Sep 5, 2008 at 10:13 pm
Should the producer be tailing to the console?
I’m getting the following error:
./disttailf.rb -x /usr/local/qpid/specs/amqp.0-8.xml -s 127.0.0.1 -p 5672 -c
CONNECTION CLOSED: 501, FRAME_ERROR – cannot decode <>, 60, 20
writer Qpid::Closed
/usr/local/qpid/ruby/qpid/queue.rb:41:in `pop’: Qpid::Closed (Qpid::Closed)
from /usr/local/qpid/ruby/qpid/peer.rb:208:in `invoke’
from /usr/local/qpid/ruby/qpid/peer.rb:190:in `method_missing’
from ./disttailf.rb:25:in `consumer’
from ./disttailf.rb:126
2 Dmitriy // Sep 6, 2008 at 11:01 am
It looks to me like you are using the spec XML which came with QPid. Please download the official spec XML form amqp.org (see the link in my post above).
3 tinomen // Sep 9, 2008 at 10:20 am
right on. I had both, but was using the wrong one.
Have you played with the ruby ampq client on github? Any Thoughts?
4 Dmitriy // Sep 9, 2008 at 10:24 am
No, haven’t had a chance to play with it yet, but I am hearing it’s very good.
5 darxriggs // Apr 22, 2009 at 4:34 pm
instead of…
def open_or_nil(filename)
begin
File.open(filename)
rescue
nil
end
end
…you can simple write…
File.open(filename) rescue nil