ActiveMQ + Ruby Stomp Client: How to process elements one by one
30 Oct2008

Few months ago I’ve switched one of our internal projects from doing synchronous database saves of analytics data to an asynchronous processing using starling + a pool of workers. This was the day when I really understood the power of specialized queue servers. I was using database (mostly, MySQL) for this kind of tasks for years and sometimes (especially under a highly concurrent load) it worked not so fast… Few times I worked with some queue servers, but those were either some small tasks or I didn’t have a time to really get the idea, that specialized queue servers were created just to do these tasks quickly and efficiently.

All this time (few months now) I was using starling noticed really bad thing in how it works: if workers die (really die, or lock on something for a long time, or just start lagging) and queue start growing, the thing could kill your server and you won’t be able to do something about it – it just eats all your memory and this is it. Since then I’ve started looking for a better solution for our queuing, the technology was too cool to give up. I’ve tried 5 or 6 different popular solutions and all of them sucked… They ALL had the same problem – if your queue grows, this is your problem and not queue broker’s :-/ The last solution I’ve tested was ActiveMQ and either I wasn’t able to push it to its limits or it is really so cool, but looks like it does not have this memory problem. So, we’ve started using it recently.

In this small post I’d like to describe a few things that took me pretty long to figure out in ruby Stomp client: how to make queues persistent (really!) and how to process elements one by one with clients’ acknowledgments.

First, we need to connect to the queue server and this code is the same for clients and servers:

1
2
3
client = Stomp::Client.open "stomp://localhost:61613"
or
client = Stomp::Client.open(login, password, host, port, reliable)

Second form looks better for me because it allows you to specify if you want client to be reliable (lock on errors and try to reconnect, etc) or just want client to raise an error in case of any problems.

When you’ve connected to the server, you can push your data to a queue:

1
client.send('/queue/some_queue', "hello world", headers)

And this is where you have an ability to specify if you want your data to be persistent (survive server crashes, etc): headers is a hash that could have an element :persistent => true, which would do the thing. So, your code would looks like this (for example):

1
2
client.send('/queue/some_queue', "hello world", :persistent => true)
client.close # this is needed only in your push code

Now, when you have your data submitted to the queue, you need to be able to read it and process with some script. This is as simple as the following code:

1
2
3
4
5
6
# Processing loop
client.subscribe('/queue/some_queue', headers) do |msg|
  # Process your message here
  # Your submitted data is in msg.body
end
client.join # Wait until listening thread dies

Again, we wanted to receive messages one by one and acknowledge successful processing in our code. This is simple too. You need:

  • Pass :ack => :client as an element in your headers hash
  • Call client.acknowledge(msg) in the loop if you’re sure that an element could be removed from the queue

This is basically it with the stuff I wanted to explain today. If you’ve never tried to work with any queue servers, try today and maybe tomorrow you won’t be able to imagine your systems architecture without such a component 🙂