The message queue interface allows a client to receive messages from a queue, and acknowledge the receipt of messages.
A message queue is obtained by calling getMessageQueue.
Acknowledge the receipt of a message, so it can be removed from the queue.
void acknowledge ( in CtsComponents::MessageKey key );
Note: if the queue's options and the message's options do not include REQUIRES_ACKNOWLEDGE, then the message does not need to be acknowledged.
Close this message queue object. If there is a blocking
receive
call in progress for this queue, it will
return immediately with a CORBA OBJECT_NOT_EXIST system exception.
void close ( );
A client performing a graceful shutdown should use this operation to allow the server to release resources immediately rather than waiting for the queue to timeout.
Note: A permanent queue can exist independently of a message queue object. To destroy a permanent queue, use destroy.
CtsComponents::MessageSeq flush ( in long maximum, in long timeout );
Interrupt any blocking receive
calls in progress
for this queue, so they will return with no messages.
void interrupt ( );
List available messages in the queue. See also CtsComponents::MessageService::list.
CtsComponents::MessageSeq list ( in string selector, in CtsComponents::MessageKey after, in long maximum );
Receive up to maximum
messages (or all messages if
maximum
is zero or negative) from the queue.
If any messages are already available in the queue, or if no
messages are available and timeout
is zero or negative,
return immediately, otherwise wait up to timeout
seconds
for new messages to arrive.
CtsComponents::MessageSeq receive ( in long maximum, in long timeout );
For optimal use of the network, it is recommended to set
maximum
to zero, or otherwise as large as the client
can manage.
Although polling (timeout = 0
) is permitted, it usually
results in excessive network traffic, so a non-zero timeout is
recommended. The main purpose of the timeout
parameter is
to allow the server to detect disconnected clients with inactive queues,
and thereby release the resources associated with client connections.
Unless the client requires a particular timeout value, it is recommended
to use the DEFAULT_TIMEOUT
value. Since the client thread calling receive
will block
until receive
returns, it is recommended that a
separate client thread be used to receive messages.
Example: a client application that receives messages from a queue. This example uses a Java thread and a client-side listener object for message notification.
import org.omg.CORBA.*; import CtsComponents.*; import java.lang.Object; import SessionManager.*; public class MessageClient implements MessageListenerOperations { private static String _url = "iiop://localhost:9000"; private static String _user = "testuser"; private static String _password = "testpass"; private static String _queue = _user + ":OrderQueue"; public static void main(String[] args) throws Exception { new MessageClient().run(args); } private void run(String[] args) throws Exception { java.util.Properties props = new java.util.Properties(); props.put("org.omg.CORBA.ORBClass", "com.sybase.CORBA.ORB"); props.put("com.sybase.CORBA.IdleConnectionTimeout", "60"); ORB orb = ORB.init(args, props); Manager manager = ManagerHelper.narrow(orb.string_to_object(_url)); Session session = manager.createSession(_user, _password); Factory factory = FactoryHelper.narrow(session.lookup("CtsComponents/MessageService")); MessageService cms = MessageServiceHelper.narrow(factory.create()); new MessageThread(cms, _queue, this, 0).start(); for (;;) { // Do something while MessageThread asynchronously notifies // us of completed orders. System.out.println("sleeping..."); Thread.sleep(60000); // 1 minute } } public void onMessage(Message msg) { System.out.println("Received message: " + msg.text); } class MessageThread extends Thread { private MessageService _cms; private String _queue; private MessageListenerOperations _listener; private int _options; MessageThread (MessageService cms, String queue, MessageListenerOperations listener, int options) { _cms = cms; _queue = queue; _listener = listener; _options = options; setDaemon(true); } public void run() { MessageQueue mq = _cms.getMessageQueue(_queue, "", _options); for (;;) { Message[] seq = mq.receive(0, DEFAULT_TIMEOUT.value); for (int m = 0; m < seq.length; m++) { Message msg = seq[m]; _listener.onMessage(msg); if ((_options | msg.options) & REQUIRES_ACKNOWLEDGE.value) != 0) { mq.acknowledge(msg.key); } } } } } }
Deprecated. Use unlock.
void recover ( );
Unlock (un-receive) a message that has been received but not acknowleged so it can be received again.
void unlock ( in CtsComponents::MessageKey key );