How to Use JMS Queues Without the RemoteRuntimeEngine in Red Hat JBoss BPMS
Updated
It is possible to work with JMS queues directly without using the RemoteRuntimeEngine. However, this approach is not recommended way to use JMS interface. See the following two examples:
Sending and Receiving JMS Messages
The sendAndReceiveJmsMessage example below creates the JaxbCommandsRequest instance and adds commands from the user. In addition, it retrieves JNDI context from the server, creates a JMS connection and other.
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.task.model.TaskSummary;
public void sendAndReceiveJmsMessage() {
String USER = "charlie";
String PASSWORD = "ch0c0licious";
String DEPLOYMENT_ID = "test-project";
String PROCESS_ID_1 = "oompa-processing";
URL serverUrl;
try {
serverUrl = new URL("http://localhost:8080/business-central/");
} catch (MalformedURLException murle) {
logger.error("Malformed URL for the server instance!", murle);
return;
}
// Create JaxbCommandsRequest instance and add commands:
Command<?> cmd = new StartProcessCommand(PROCESS_ID_1);
int oompaProcessingResultIndex = 0;
JaxbCommandsRequest req = new JaxbCommandsRequest(DEPLOYMENT_ID, cmd);
req.getCommands().add(new GetTaskAssignedAsPotentialOwnerCommand(USER));
int loompaMonitoringResultIndex = 1;
// Get JNDI context from server:
InitialContext context = getRemoteJbossInitialContext(serverUrl, USER, PASSWORD);
// Create JMS connection:
ConnectionFactory connectionFactory;
try {
connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory");
} catch (NamingException ne) {
throw new RuntimeException("Unable to lookup JMS connection factory.", ne);
}
// Setup queues:
Queue sendQueue, responseQueue;
try {
sendQueue = (Queue) context.lookup("jms/queue/KIE.SESSION");
responseQueue = (Queue) context.lookup("jms/queue/KIE.RESPONSE");
} catch (NamingException ne) {
throw new RuntimeException("Unable to lookup send or response queue", ne);
}
// Send command request:
Long processInstanceId = null; // needed if you're doing an operation on a PER_PROCESS_INSTANCE deployment
String humanTaskUser = USER;
JaxbCommandsResponse cmdResponse = sendJmsCommands(
DEPLOYMENT_ID, processInstanceId, humanTaskUser, req,
connectionFactory, sendQueue, responseQueue,
USER, PASSWORD, 5);
// Retrieve results:
ProcessInstance oompaProcInst = null;
List<TaskSummary> charliesTasks = null;
for (JaxbCommandResponse<?> response : cmdResponse.getResponses()) {
if (response instanceof JaxbExceptionResponse) {
// something went wrong on the server side
JaxbExceptionResponse exceptionResponse = (JaxbExceptionResponse) response;
throw new RuntimeException(exceptionResponse.getMessage());
}
if (response.getIndex() == oompaProcessingResultIndex) {
oompaProcInst = (ProcessInstance) response.getResult();
} else if (response.getIndex() == loompaMonitoringResultIndex) {
charliesTasks = (List<TaskSummary>) response.getResult();
}
}
}
Sending JMS Commands
The sendJmsCommands example below is a continuation of the previous example. It sets up user-created classes and sends, receives and extracts responses.
private JaxbCommandsResponse sendJmsCommands(String deploymentId, Long processInstanceId, String user,
JaxbCommandsRequest req, ConnectionFactory factory, Queue sendQueue, Queue responseQueue, String jmsUser,
String jmsPassword, int timeout) {
req.setProcessInstanceId(processInstanceId);
req.setUser(user);
Connection connection = null;
Session session = null;
String corrId = UUID.randomUUID().toString();
String selector = "JMSCorrelationID = '" + corrId + "'";
JaxbCommandsResponse cmdResponses = null;
try {
// Setup:
MessageProducer producer;
MessageConsumer consumer;
try {
if (jmsPassword != null) {
connection = factory.createConnection(jmsUser, jmsPassword);
} else {
connection = factory.createConnection();
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(sendQueue);
consumer = session.createConsumer(responseQueue, selector);
connection.start();
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to setup a JMS connection.", jmse);
}
JaxbSerializationProvider serializationProvider = new JaxbSerializationProvider();
// If necessary, add user-created classes here:
// xmlSerializer.addJaxbClasses(MyType.class, AnotherJaxbAnnotatedType.class);
// Create message:
BytesMessage msg;
try {
msg = session.createBytesMessage();
// Serialize request:
String xmlStr = serializationProvider.serialize(req);
msg.writeUTF(xmlStr);
// Set properties:
msg.setJMSCorrelationID(corrId);
msg.setIntProperty(SerializationConstants.SERIALIZATION_TYPE_PROPERTY_NAME, JaxbSerializationProvider.JMS_SERIALIZATION_TYPE);
Collection<Class<?>> extraJaxbClasses = serializationProvider.getExtraJaxbClasses();
if (!extraJaxbClasses.isEmpty()) {
String extraJaxbClassesPropertyValue = JaxbSerializationProvider
.classSetToCommaSeperatedString(extraJaxbClasses);
msg.setStringProperty(SerializationConstants.EXTRA_JAXB_CLASSES_PROPERTY_NAME, extraJaxbClassesPropertyValue);
msg.setStringProperty(SerializationConstants.DEPLOYMENT_ID_PROPERTY_NAME, deploymentId);
}
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to create and fill a JMS message.", jmse);
} catch (SerializationException se) {
throw new RemoteCommunicationException("Unable to deserialze JMS message.", se.getCause());
}
// Send:
try {
producer.send(msg);
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to send a JMS message.", jmse);
}
// Receive:
Message response;
try {
response = consumer.receive(timeout);
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to receive or retrieve the JMS response.", jmse);
}
if (response == null) {
logger.warn("Response is empty, leaving");
return null;
}
// Extract response:
assert response != null : "Response is empty.";
try {
String xmlStr = ((BytesMessage) response).readUTF();
cmdResponses = (JaxbCommandsResponse) serializationProvider.deserialize(xmlStr);
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
+ " instance from JMS response.", jmse);
} catch (SerializationException se) {
throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
+ " instance from JMS response.", se.getCause());
}
assert cmdResponses != null : "Jaxb Cmd Response was null!";
} finally {
if (connection != null) {
try {
connection.close();
if( session != null ) {
session.close();
}
} catch (JMSException jmse) {
logger.warn("Unable to close connection or session!", jmse);
}
}
}
return cmdResponses;
}
Article Type