The message service provides queues for message consumers, and allows a message producers to send messages to specified queues (point-to-point messaging), or to publish messages with specified topics that may be of interest to registered consumers (publish/subscribe messaging).
Messages that are sent or published to a queue are stored in memory (transient messages) or in a database (persistent messages) until they can be delivered to the queue's consumer, which is either a client or a component. The message service provides a pull-style mechanism for client notification, and a push-style mechanism for component notification.
Once configuration is complete, the message service can be accessed by CORBA 2.0+ clients with IIOP 1.0 or IIOP 1.1 using this CORBA IDL interface.
Add a message listener
for a queue
, optionally
giving the name of a thread pool, which has
been configured with one or more worker threads. If unspecified,
the thread pool defaults to the system default thread pool which has a
single worker thread.
void addListener ( in string queue, in string listener );
Save the listener to persistent storage unless the queue is temporary.
The listener
is either a component name or it is
a stringified IOR for an object that implements the
MessageListener
interface.
Example 1: process messages for queue "MyClient:email" by calling the onMessage method on the "MyPackage/MailService" component.
MessageService cms = getMessageService(); cms.addListener("MyClient:email", "MyPackage/MailService");
Example 2: same as above but use a thread pool to permit more than one message to be processed at a time (assuming the thread pool "work" has more than 1 worker thread).
MessageService cms = getMessageService(); cms.addListener("MyClient:email", "MyPackage/MailService[work]");
Add a required role
for accessing name
.
void addRole ( in long type, in string name, in string role );
The type
parameter is a bit mask composed of one of:
MESSAGE_QUEUE,
MESSAGE_TOPIC,
and one of CONSUMER,
PRODUCER,
SECURITY.
The name
parameter is a queue or topic name.
The role
parameter is a role name. If more than one
role is added to a queue or topic, a client needs to possess only
one of those roles to access the queue or topic. If no roles are
assigned to a queue or topic, then any client will be able to access it.
Example 1: add a role "MyRole" that is required for receiving messages from queue "MyQueue".
MessageService cms = getMessageService(); cms.addRole(MESSAGE_QUEUE.value + CONSUMER.value, "MyQueue", "MyRole");
Example 2: add a role "Broker Role" that is required for publishing messages to topics with names beginning with "stock.".
MessageService cms = getMessageService(); cms.addRole(MESSAGE_TOPIC.value + PRODUCER.value, "stock.*", "Broker Role");
Example 3: add a role "Stock Topic Admin Role" that is required for managing security for topics with names beginning with "stock.".
MessageService cms = getMessageService(); cms.addRole(MESSAGE_TOPIC.value + SECURITY.value, "stock.*", "Stock Topic Admin Role");
Note: to add SECURITY type roles, the calling client must already have SECURITY access to the entity concerned, or otherwise the calling client must be a server administrator.
Add a message selector
for a queue
.
Save the selector to persistent storage unless the queue is
temporary.
void addSelector ( in string queue, in string selector );
The selector
is an expression that conforms to the JMS
(Java Message Service) selector specification, i.e. a subset of SQL-92
syntax, usually with an added clause to test equality of the topic
property, i.e. subscribing to the topic.
Subject to security checks, it is possible to add a selector that has no equality condition on the topic property. For example, a selector of "TRUE" will match every published message, no matter what the topic.
Example - request notification of a new stock value:
MessageService cms = getMessageService(); cms.addSelector("MyClient:stock", "topic = 'stock.SYBS' AND value > 50");
Create a message queue, topic or thread pool if it does not exist already.
void create ( in long type, in string name );
The type parameter is one of:
MESSAGE_QUEUE |
MESSAGE_TOPIC |
THREAD_POOL |
For type
=
MESSAGE_QUEUE,
this creates a queue with a zero timeout
(unless the
queue exists already, in which case its configuration is left
intact).
For type
=
MESSAGE_TOPIC,
this creates a topic with a zero timeout
(unless the
topic exists already, in which case its configuration is left
intact).
For type
=
THREAD_POOL,
this creates a thread pool with
readers=0,writers=0,workers=0
(unless the thread pool exists already, in which case its
configuration is left intact).
See also: setProperties.
It will delete all the messages from the dead message queue.
void deleteAllDq ( );
It will delete a dead message from the dead message queue.
void deleteDq ( in string queue, in CtsComponents::MessageKey key );
The queue
parameter is the name of the queue
and the key
parameter is the key of the message
to be deleted.
Destroy an existing message queue, topic or thread pool.
void destroy ( in long type, in string name );
The type parameter is one of:
MESSAGE_QUEUE |
MESSAGE_TOPIC |
THREAD_POOL |
QUEUE_CONNECTION_FACTORY |
TOPIC_CONNECTION_FACTORY |
For type
=
MESSAGE_QUEUE,
this removes any selectors, and receives and
acknowledges all queued messages for the named queue.
Return all of the message listeners for a queue
.
See addListener.
CtsComponents::StringSeq getListeners ( in string queue );
Get the list of all dead messages by order of queue name. A Message is considered to be dead if message service could not deliver message to the component within given condition.
CtsComponents::MessageSeq getMessageDq ( in long maxrow );
The maxrow
parameter indicates the maximum number
of dead messages expected.
Return a unique message key for a new message. This is used when creating a message to send or publish.
CtsComponents::MessageKey getMessageKey ( );
See also: key16.
Get a message queue object that a client can use to
acknowledge
or receive
messages in queue
.
CtsComponents::MessageQueue getMessageQueue ( in string queue, in string config, in long options );
The queue
name is usually the client user name, or
the client user name followed by a colon and an arbitrary suffix.
A queue name may not contain an equals sign or a comma. An
attempt to access a queue that does not have a prefix of the client
user name will result in a CORBA NO_PERMISSION system exception,
unless access to the queue has been specifically
granted
to the client user.
The config
parameter may optionally specify:
The options
parameter may include the following.
Option | Description |
---|---|
REQUIRES_ACKNOWLEDGE | Messages received from the queue must be explicitly acknowledged, even if they are not sent with the REQUIRES_ACKNOWLEDGE option. |
REQUIRES_TRANSACTION | Messages will be acknowledged in the caller's transaction, or in a new transaction if the caller has no transaction. |
NO_IMPLICIT_CREATION | The queue will not be implicitly created if it does not exist (see create). |
Example: access a message queue "MyQueue" which will use non-default configuration properties from queue "PriceUpdates", and will use the reader and writer threads from "[ThreadPool]" for client notification.
MessageQueue mq = cms.getMessageQueue("MyQueue", "PriceUpdates[ThreadPool]", 0);
Get Message Service Status
long getMessageServiceStatus ( );
Get the names of entities of a particular type:
CtsComponents::StringSeq getNames ( in long type );
The available entity types are:
ACTIVE_QUEUES |
ACTIVE_TOPICS |
CONFIGURED_QUEUES |
CONFIGURED_TOPICS |
THREAD_POOLS |
Obtain a pinned object reference for the message service.
CtsComponents::MessageService getPinnedObject ( );
A pinned object reference does not support transparent failover. This is useful when it is required to avoid the possibility of message duplication due to transparent failover after server or network failure. It is an alternative to the use of the IGNORE_DUPLICATE_KEY option for publish and send calls. However, it should be noted that when an operation on a pinned object fails to provide a response due to a server or network failure, the client cannot always be certain if the operation completed successfully.
Get the configuration properties of a named entity. See setProperties for details.
CtsComponents::Properties getProperties ( in long type, in string name );
Get the queue name from the messages. The message property contains the queue/topic name.
string getQueueName ( in CtsComponents::Message msg );
The msg
parameter is the message from where
queue name is extracted.
List the required roles for accessing name
.
See addRole.
CtsComponents::StringSeq getRoles ( in long type, in string name );
The type
parameter is a bit mask composed of one of:
MESSAGE_QUEUE,
MESSAGE_TOPIC,
and one of CONSUMER,
PRODUCER,
SECURITY.
Return all of the selectors for a queue
.
See addSelector.
CtsComponents::StringSeq getSelectors ( in string queue );
Retrieve message service run-time statistics. For each key in
keys
, a corresponding value is returned in the
result sequence.
CtsComponents::DoubleSeq getStatistics ( in long type, in string name, in CtsComponents::ShortSeq keys );
For type
= MESSAGE_SERVICE
(and name = ""
), the following global counter keys are available:
AVAILABLE_MESSAGES |
DELIVERED_MESSAGES |
DISCARDED_MESSAGES |
PUBLISHED_MESSAGES |
RECEIVE_CALLS |
PUBLISH_CALLS |
SEND_CALLS |
For type
= MESSAGE_QUEUE
(and queue name
), the
following queue counter keys are available:
AVAILABLE_MESSAGES |
DELIVERED_MESSAGES |
DISCARDED_MESSAGES |
RECEIVE_CALLS |
SEND_CALLS |
For type
= MESSAGE_TOPIC
(and topic name
), the
following topic counter keys are available:
ACTIVE_SELECTORS |
PUBLISHED_MESSAGES |
PUBLISH_CALLS |
For type
= THREAD_POOL
(and thread pool name
), the
following thread pool keys are available:
AVERAGE_READER_WAIT |
AVERAGE_WRITER_WAIT |
Return a unique name that may be used for a
temporary queue or topic.
The type
parameter
is MESSAGE_QUEUE
or MESSAGE_TOPIC.
string getUniqueName ( in long type );
Return a copy of at most maximum
messages in
queue
after the message with key after
that match the specified selector
expression (use "TRUE"
to match all messages). If after
is an empty key, the
returned list starts from the first message in the queue. If
maximum
is zero or negative, all available messages
will be returned.
CtsComponents::MessageSeq list ( in string queue, in string selector, in CtsComponents::MessageKey after, in long maximum );
Note: the returned messages are not acknowledged, so they remain in the queue.
Example: list all messages in the queue 100 at a time.
MessageKey after = new MessageKey(new byte[0]); for (;;) { Message[] seq = cms.list(queue, "TRUE", after, 100); if (seq.length == 0) { break; } for (int m = 0; m < seq.length; m++) { Message msg = seq[m]; print(msg); after = msg.key; } }
Move the message with key key
from queue
from
to queue to
, within a single
transaction. Return TRUE if the message was moved, or false
if the message does not exist (or no longer exists). The
resulting message (if moved) will be PERSISTENT, even if the
original message was not.
boolean move ( in string from, in CtsComponents::MessageKey key, in string to );
Publish a message with the specified message topic
.
A copy of this message will be sent to the
queues for all consumers who have registered a message selector
that returns TRUE when applied to this message.
void publish ( in string topic, in CtsComponents::Message msg, in long options );
The options
parameter may include the following.
Option | Description |
---|---|
PERSISTENT | The message will be saved in persistent storage. |
REPLICATED | The message will be replicated to all clustered servers. To
ensure at-most-once delivery, it
will be made PERSISTENT for any non-temporary queues that have
the property store=true
(see setProperties).
|
REQUIRES_TRANSACTION | The message will be published if the caller's transaction commits, or published in a new transaction if the caller has no transaction. |
IGNORE_DUPLICATE_KEY | The message will be ignored by any destination queues
that have already seen a message with this key.
This option can be used to avoid
the possibility of duplicate message delivery in the event of a
recoverable communications failure (i.e. automatic failover on
the publish call). A permanent database log is
used to detect duplicates.
|
NO_IMPLICIT_CREATION | The queue will not be implicitly created if it does not exist (see create). |
Note: all fields of msg
are optional. If the
message key is not provided, one will be generated by default.
However if it is necessary to prevent the generation of duplicate
messages in the event of a recoverable communications failure,
the message key should be set using
getMessageKey
before publish
is called.
Example - notify registered clients of a new stock value:
public void notifyStockValue(MessageService cms, String stock, double value) { String topic = "stock." + stock; String time = new java.util.Date().toString(); String text = time + ": The stock " + stock + " has value " + value; Message msg = new Message(); msg.key = cms.getMessageKey(); msg.props = new Property[2]; msg.props[0] = new Property("stock", new PropertyValue()); msg.props[0].value.stringValue(stock); msg.props[1] = new Property("value", new PropertyValue()); msg.props[1].value.doubleValue(value); msg.replyTo = ""; msg.text = text; cms.publish(topic, msg, 0); }
Remove a message listener
from a queue
.
See addListener.
void removeListener ( in string queue, in string listener );
Remove a required role
for accessing name
.
See addRole.
void removeRole ( in long type, in string name, in string role );
Remove a message selector
from a queue
.
See addSelector.
void removeSelector ( in string queue, in string selector );
Restore the dead queue message. It will take the message from dead message queue and send/publish to the original queue. Then it will delete the message from the dead message queue.
void restoreDq ( in string queue, in CtsComponents::MessageKey key );
The queue
parameter is the destination queue
of this message whereas key
parameter is the
message key.
Send a message to the specified message queue. If any listeners are registered for the queue, they will be notified asynchronously. Otherwise the message remains in the queue until it is received and acknowledged.
void send ( in string queue, in CtsComponents::Message msg, in long options );
The options
parameter may include the following.
Option | Description |
---|---|
PERSISTENT | The message will be saved in persistent storage. |
REPLICATED | The message will be replicated to all clustered servers. To
ensure at-most-once delivery, it
will be made PERSISTENT for any non-temporary queues that have
the property store=true
(see setProperties).
|
REQUIRES_ACKNOWLEDGE | The message must be acknowledged by the recipient. If the
message is not acknowledged by the recipient before it is deleted
due to a message or queue timeout, a copy will be returned to the
replyTo queue (if replyTo is provided)
with the reply options including
WAS_NOT_ACKNOWLEDGED.
|
REQUIRES_TRANSACTION | The message will be sent if the caller's transaction commits, or sent in a new transaction if the caller has no transaction. |
IGNORE_DUPLICATE_KEY | The message will be ignored by the destination queue if
it has already seen a message with this key.
This option can be used to avoid
the possibility of duplicate message delivery in the event of a
recoverable communications failure (i.e. automatic failover on
the send call). A permanent database log is
used to detect duplicates.
|
NO_IMPLICIT_CREATION | The queue will not be implicitly created if it does not exist (see create). |
Note: all fields of msg
are optional. If the
message key is not provided, one will be generated by default.
However if it is necessary to prevent the generation of duplicate
messages in the event of a recoverable communications failure,
the message key should be set using
getMessageKey
before send
is called.
Example - notify client of a completed order.
public void notifyOrder(MessageService cms, String queue, int orderNo, String product) { String time = new java.util.Date().toString(); String text = "Order " + orderNo + " for product " + product + " was completed at " + time; Message msg = new Message(); msg.key = cms.getMessageKey(); msg.props = new Property[2]; msg.props[0] = new Property("orderNo", new PropertyValue()); msg.props[0].value.longValue(orderNo); msg.props[1] = new Property("product", new PropertyValue()); msg.props[1].value.stringValue(product); msg.replyTo = ""; msg.text = text; cms.send(queue, msg, PERSISTENT.value); }
Set the current listeners
for a queue
.
void setListeners ( in string queue, in CtsComponents::StringSeq listeners );
See also: addListener, removeListener.
Note:it is unusual for a queue to have more than one listener. If queue has more than one listener, none of the listeners should be transactional.
Set the configuration properties of a named entity.
If props
is empty, all of the entity's properties
will revert to their default values, and the entity is subsequently considered
to be temporary (not configured).
If props
is not empty, only the specified properties are
set or changed, and the entity is subsequently considered to be
permanent (configured).
void setProperties ( in long type, in string name, in CtsComponents::Properties props );
The available entity types are:
MESSAGE_SERVICE |
MESSAGE_QUEUE |
MESSAGE_TOPIC |
THREAD_POOL |
QUEUE_CONNECTION_FACTORY |
TOPIC_CONNECTION_FACTORY |
For type
= MESSAGE_SERVICE,
the most important configuration properties are:
Property | Type | Default Value | Description |
---|---|---|---|
cms.cache | string | "MessageServiceCache" | The name of the database connection cache used by the message service. |
cms.debug | boolean | false | Indicates whether message service debugging is enabled. |
XX.YYYYYY | string | ... | SQL query and DML statements for accessing message service database tables (in the syntax of JDBC prepared statements). |
For type
= MESSAGE_QUEUE,
the available configuration properties are:
Property | Type | Default Value | Description |
---|---|---|---|
IGNORE_DUPLICATE_KEY | boolean | false | If this property is true, all send/publish calls that add a message to this queue will implicitly use the IGNORE_DUPLICATE_KEY option. Use this property to guarantee at-most-once delivery of persistent messages when you cannot rely on the message producer to specify this option. |
REQUIRES_ACKNOWLEDGE | boolean | false | If this property is true, all calls to getMessageQueue for this queue must include the REQUIRES_ACKNOWLEDGE option. Use this property to guarantee at-least-once delivery of persistent messages (at-least-once delivery of non-persistent messages is guaranteed only if no servers fail). |
REQUIRES_TRANSACTION | boolean | false | If this property is true, all calls to getMessageQueue for this queue must include the REQUIRES_TRANSACTION option. Use this property to guarantee at-most-once delivery of persistent messages (at-most-once delivery of non-persistent messages is guaranteed only if no servers fail). At-most-once delivery of a message means that only one transaction will be able to receive the message and sucessfully commit. |
maximum | long | 0 | Indicates the maximum number of messages that can be held in memory for the queue when it is active. Messages will be discarded from memory in the order that they would be received in order to prevent the maximum being exceeded. Persistent messages will be retained in the database for later retrieval, while non-persistent messages will be discarded permanently. A zero or negative value indicates no maximum queue length. |
timeout | long | 0 | Indicates how many seconds the queue should remain in memory when it is not being actively accessed by a client, and it has no registered listeners. Any non-persistent messages in memory when the timeout occurs will be discarded. |
share | boolean | true | Indicates whether multiple clients can receive messages simultaneously from this queue. When a queue is not shared, a client calling getMessageQueue will take ownership of the queue, and all 'older' clients will receive OBJECT_NOT_EXIST system exceptions if they attempt to access their MessageQueue objects. |
store | boolean | true | Indicates whether or not to make PERSISTENT any REPLICATED transient messages that are added to this queue. This avoids the possibility of duplicate processing of replicated transient messages within a cluster where a queue may reside in memory on multiple servers (e.g. shared queue). |
qop | string | "none" | Indicates the quality of protection required for the message queue object. |
For type
= MESSAGE_TOPIC,
the available configuration properties are:
Property | Type | Default Value | Description |
---|---|---|---|
timeout | long | 0 | Indicates how many seconds the topic should remain in memory when it is not being actively accessed, i.e. no active queues have selectors registered for the topic. A zero or negative size value indicates no timeout. A topic with no timeout will be automatically activated at server startup time. |
For type
= THREAD_POOL,
the available configuration properties are:
Property | Type | Default Value | Description |
---|---|---|---|
readers | long | 0 | Indicates the number of worker threads in the thread pool. Reader threads are used for client notification. |
writers | long | 0 | Indicates the number of worker threads in the thread pool. Writer threads are used for client notification. |
workers | long | 0 | Indicates the number of worker threads in the thread pool. Worker threads are used for component notification. |
For type
= QUEUE_CONNECTION_FACTORY,
or type
= TOPIC_CONNECTION_FACTORY,
the available configuration properties are:
Property | Type | Default Value | Description |
---|---|---|---|
CLIENT_ID | string | "" | Indicates the JMS client ID. An empty client ID is assumed to mean that the client's username will be used as the client ID. |
CONFIG_QUEUE | string | "" | Indicates the config queue to be used for getMessageQueue calls for JMS clients. A config queue can be particularly helpful for the configuration of temporary queues and non-durable subscriptions. |
THREAD_POOL | string | "" | Indicates the thread pool to be used for getMessageQueue calls for JMS clients. Using a thread pool for client notification can provide significant performance gains. |
IGNORE_DUPLICATE_KEY | boolean | false | Indicates whether JMS publish / send calls should use the IGNORE_DUPLICATE_KEY option. |
NO_IMPLICIT_CREATION | boolean | false | Indicates whether JMS publish / send calls should use the NO_IMPLICIT_CREATION option. |
REQUIRES_ACKNOWLEDGE | boolean | true | Indicates whether JMS receive/onMessage calls should carry out the acknowledgement step. Setting this to false gives significantly improved throughput for bulk publishing of transient messages. |
REQUIRES_TRANSACTION | boolean | false | Indicates whether JMS publish / send / receive / onMessage calls should use the REQUIRES_TRANSACTION option. Setting this to false gives significantly improved throughput for bulk publishing of transient messages. Note that rollback of non-transactional publish / send / receive / onMessage calls is not possible. |
SHARED_LISTENER | boolean | false | Indicates that all JMS message consumers for the connection will use a message listener, and all will use the same message listener. Using a shared listener for client notification can provide significant performance gains. |
SUPPORTS_TRANSACTION | boolean | true | Indicates whether JMS publish / send / receive / onMessage calls should use the SUPPORTS_TRANSACTION option. Setting this to false gives significantly improved throughput for bulk publishing of transient messages. |
TRANSPARENT_FAILOVER | boolean | false | Indicates whether JMS clients should allow transparent failover for message service operations. When false, this property indicates that a pinned object will be used. When true, careful consideration should be given to the IGNORE_DUPLICATE_KEY property. |
Set the current selectors
for a queue
.
void setSelectors ( in string queue, in CtsComponents::StringSeq selectors );
See also: addSelector, removeSelector.
Determine the type of message service entity a name represents.
long typeOf ( in string name );
Return MESSAGE_QUEUE, MESSAGE_TOPIC, QUEUE_CONNECTION_FACTORY, TOPIC_CONNECTION_FACTORY, or zero (none of the above).
The following steps must be followed to configure the message service.
The message service uses server clustering to provide high availability. The failure of a single server in a cluster should not make messaging services unavailable. If a CORBA COMM_FAILURE system exception occurs while a client is accessing a MessageQueue, a replacement message queue object can be obtained by calling getMessageQueue again. In that case, if the queue's previous options did not include REQUIRES_ACKNOWLEDGE, or if the queue's messages were not persistent, it is possible that some messages may have been irretrievably lost.
For maximum performance of client notification in high-throughput environments, it is better to design the client to recover from message loss than it is to use persistent messages with acknowledgement. For example, consider a client application which displays the "most recent price" for a selection of 50-100 products out of several thousand possible products, in an environment with frequent price changes. Assume that messages are used to notify clients of price changes. Upon receiving a CORBA COMM_FAILURE system exception, and assuming that some servers in the cluster are still available, the client could simply request the most recent price for each product, and then resume listening for price change messages after calling getMessageQueue again.
The message service uses server clustering to provide automatic load balancing.
The message service provides role-based security for queues and topics. There are three categories of access to a queue or topic.
Permissions for all three types of access must be assigned separately.
The message service provides the following mechanisms for reliable message delivery:
When a thread pool is specified for getMessageQueue, the MessageQueue object is implemented with a specialized server IIOP handler that avoids the use of large numbers of waiting threads to handle blocking receive calls, and thereby avoids waking large numbers of threads for client notification.
Since clients also use operations of the
MessageService interface, whose
implementation is managed by an ordinary IIOP handler that assigns one
thread per concurrent client connection, it is advisable for clients to use the
IdleConnectionTimeout
ORB initialization property so that
a server can handle a large number of concurrent clients
without allocating a large number of threads.
The following operations can optionally be transactional. A transactional operation runs in the caller's transaction, or in a new transaction if the caller is not enlisted in a transaction.
onMessage
operation
can process a message within the same transaction as the
automatic acknowledgement which occurs after onMessage
returns
successfully.