Notice: This Wiki is now read only and edits are no longer possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.
Gyrex/Development Space/Queue Service
This article describes the cloud queue service available in Gyrex. The default implementation persists messages in ZooKeeper. This puts some limits on message size. However, it's possible to provide an alternate implementations based on Amazon SQS, RabbitMQ or ActiveMQ.
API
The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService
in the OSGi service registry.
Publishing Messages
Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.
// the message final byte[] messageBody = ...; // get queue final IQueue queue = getQueueService().getQueue("myqueue"); // add message to queue queue.sendMessage(messageBody);
Consuming Messages
Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.
// get queue final IQueue queue = getQueueService().getQueue("myqueue"); // set receive timeout (5 minutes) final Map<String, Object> requestProperties = new HashMap<String, Object>(1); requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5)); // receive at most one message final List<IMessage> messages = queue.receiveMessages(1, requestProperties); if (!messages.isEmpty()) { final IMessage message = messages.get(0); // do something with the message processMessage(message.getBody()); // delete the message if (queue.deleteMessage(message)) { // consumed successfully --> commit } else { // error consuming (maybe because of network issues) --> rollback or just try again } }
Administration
It's possible to manage queues via the OSGi console. An Admin UI is on our list but contributions are welcome.
osgi> queue --help ---QueueConsoleCommands--- queue <cmd> [args] create <QUEUEID> - creates a queue ls [FILTER] - list queues rm <QUEUEID> - removes a queue osgi>