Jarosław Kijanowski
JMS – a new fact provider for Drools via Pipeline
Abstract:
The Drools Pipeline is a great tool that allows to put facts into the rule engine from the outside world. Currently supported stores are JMS queues and topics and text files. The format of the payload is almost free to choose, since out of the box you can use JAXB, XStream and the powerful Smooks transformer to convert your data into facts, which the rule engine is able to consume.
This article is going to cover the following scenario. We want to use JMS messages stored in a queue to be put as facts into the Drools rule engine.
As I mentioned at the beginning, the Drools Pipeline will suite our needs very well.
What is a pipeline?
A pipeline is a two-way connection between the outside world and the rule engine. Its purpose is to transfer data and it's build up on stages, where every stage has a unique responsibility.
The first stage is the entry point of the pipeline. It's responsible to accept the incoming data, create a context were results are stored and pass it over to the next stage. Dependent on the knowledge session type we will choose a StatefulKnowledgeSessionPipeline or a StatelessKnowledgeSessionPipeline.
The next stages are responsible for receiving, modifying and sending the data. Available are:
- transformers (JAXB, Xstream and Smooks) for converting incoming data into POJOs and vice versa,
- actions to set results in the pipeline context and unwrap JMS messages,
- commands allowing to set/get globals, insert/retrieve facts and signal events and start processes.
For a complete list check the PipelineFactory documentation.
Let's get started
We are going to feed our pipeline from a JMS queue. For this purpose I will use the JBoss Application Server 4.2.3 and the built-in JMS provider, JBossMQ.
We send 3 messages to a queue and expect that the pipeline takes them, converts to POJO facts and insert them into the Drools' knowledge session.
Below is the POJO we will use in this example:
package eu.kijanowski.drools.pipeline.model;
import java.io.Serializable;
public class Person implements Serializable {
private static final long serialVersionUID = 2745509710158664093L;
private String name;
private String surname;
private String type;
private int age;
private boolean family;
public Person() {
super();
}
public Person(String name, String surname, String type, int age,
boolean family) {
super();
this.name = name;
this.surname = surname;
this.type = type;
this.age = age;
this.family = family;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isFamily() {
return family;
}
public void setFamily(boolean family) {
this.family = family;
}
public String toString() {
return "Person: " + getClass().getCanonicalName() + "; name: " +
name + "; surname: " + surname + "; age: " + age + "; type: " +
type + "; family: " + family;
}
}
Class Person.java
Here's a simple class that will send POJOs to a queue called 'testQueue':
package eu.kijanowski.drools.pipeline;
import java.io.Serializable;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import eu.kijanowski.drools.pipeline.model.Person;
public class SendJMSMessage {
QueueConnection conn;
QueueSession session;
Queue que;
public void setupConnection() throws JMSException, NamingException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
properties.put(Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
properties.put(Context.PROVIDER_URL, "jnp://127.0.0.1:1099");
InitialContext iniCtx = new InitialContext(properties);
QueueConnectionFactory qcf = (QueueConnectionFactory) iniCtx
.lookup("ConnectionFactory");
que = (Queue) iniCtx.lookup("queue/testQueue");
conn = qcf.createQueueConnection();
session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
System.out.println("Connection started");
}
public void stop() throws JMSException {
conn.stop();
session.close();
conn.close();
System.out.println("Connection stopped");
}
public void sendAMessage() throws JMSException {
QueueSender send = session.createSender(que);
ObjectMessage tm;
Person[] persons = {
new Person("Eve", "ABC", "mother", 26, false),
new Person("Jake", "ABC", "father", 27, false),
new Person("Katherine", "ABC", "child", 1, false)
};
for (Serializable msg : persons) {
tm = session.createObjectMessage(msg);
send.send(tm);
}
send.close();
}
public static void main(String args[]) throws Exception {
SendJMSMessage sm = new SendJMSMessage();
sm.setupConnection();
sm.sendAMessage();
sm.stop();
}
}
Class SendJMSMessage.java
You need to add $JBOSS_HOME/client/jbossall-client.jar to your classpath to get it compiled.
Last but not least, here's our rule file:
package eu.kijanowski.drools.pipeline
import eu.kijanowski.drools.pipeline.model.Person
rule 'family rule'
when
father:Person(type == 'father', fs:surname)
mother:Person(type == 'mother', surname == fs)
child:Person(type == 'child', surname ==fs)
then
System.out.println("We are family! " + fs);
father.setFamily(true);
mother.setFamily(true);
child.setFamily(true);
end
Rule jms.drl
Building the pipeline
First of all we need to create a knowledge session, done as usual:
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newClassPathResource("jms.drl", getClass()), ResourceType.DRL);
if (kbuilder.hasErrors()) {
System.out.println("KBuilder has errors: " + kbuilder.getErrors());
return;
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession();
We start to build the pipeline always from the end, this is the last stage, and finish at the entry point.
The last stage, 'InsertStage' is responsible to insert facts into the knowledge session:
KnowledgeRuntimeCommand insertStage = PipelineFactory.newStatefulKnowledgeSessionInsert();
The previous stage, called 'UnwrapObjectStage', receives a JMS object message and has to extract the POJO, before it is passed to the 'InsertStage':
Action unwrapObjectStage = PipelineFactory.newJmsUnwrapMessageObject();
unwrapObjectStage.setReceiver(insertStage);
It's time to define the entry of the pipe line. Note that we're working with a stateful knowledge session:
Pipeline entry = PipelineFactory.newStatefulKnowledgeSessionPipeline(ksession);
entry.setReceiver(unwrapObjectStage);
Now we have a queue set up on the JBoss application server and a Drools pipeline. But they are not connected in any way (yet). For this job we need to hire a messenger that will transport JMS messages from 'testQueue' to 'entry':
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
props.put(Context.PROVIDER_URL, "jnp://127.0.0.1:1099");
String destinationName = "queue/testQueue";
Service messenger = PipelineFactory.newJmsMessenger(entry, props,
destinationName, null);
messenger.start();
Let's send some messages to the 'testQueue':
SendJMSMessage sm = new SendJMSMessage();
sm.setupConnection();
sm.sendAMessage();
sm.stop();
This populates the queue with 3 POJOs, the messenger delivers them in form of JMS messages to the pipeline which extracts the POJOs from the JMS messages and insert them into the associated knowledge session.
Let's not rush the messenger and sleep for 10 seconds before we fire our rules:
Thread.sleep(10000);
ksession.fireAllRules();
ksession.dispose();
messenger.stop();
We should see following text displayed by our rule:
We are family! ABC
Extending the pipeline
In the first attempt we were sending POJOs wrapped into JMS messages. Since the Drools Pipeline is quite powerful we can also send XML or CSV (comma separated values) data representing facts.
Let's modify our SendJMSMessage class to send data in the CSV format. Remove or comment out this section:
Person[] persons = {
new Person("Eve", "ABC", "mother", 26, false),
new Person("Jake", "ABC", "father", 27, false),
new Person("Katherine", "ABC", "child", 1, false)
};
and replace it with:
String[] persons = {
"Eve,ABC,26,mother,false",
"Jake,ABC,27,father,false",
"Katharine,ABC,1,child,false"
};
Now our messenger will deliver 3 JMS messages, every will contain a one-line String. We need to convert these Strings into POJOs. Piece of cake with the Smooks transformer. Our pipeline is going to look like this:
KnowledgeRuntimeCommand insertStage = PipelineFactory.newStatefulKnowledgeSessionInsert();
Smooks smooks = new Smooks(getClass().getResourceAsStream( "smooks-config.xml" ));
Transformer transformerStage = PipelineFactory.newSmooksFromSourceTransformer(smooks, "person");
transformerStage.setReceiver(insertStage);
Action unwrapObjectStage = PipelineFactory.newJmsUnwrapMessageObject();
unwrapObjectStage.setReceiver(transformerStage);
Pipeline entry = PipelineFactory.newStatefulKnowledgeSessionPipeline(ksession);
entry.setReceiver(unwrapObjectStage);
Create the smooks config file and put it where your class files resides:
<?xml version='1.0' encoding='UTF-8'?>
<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd" xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.1.xsd">
<csv:reader fields="name,surname,age,type,family" />
<jb:bindings beanId="person"
class="eu.kijanowski.drools.pipeline.model.Person" createOnElement="csv-record">
<jb:value property="name" data="csv-record/name" />
<jb:value property="surname" data="csv-record/surname" />
<jb:value property="age" data="csv-record/age" decoder="Integer" />
<jb:value property="type" data="csv-record/type" />
<jb:value property="family" data="csv-record/family" decoder="Boolean" />
</jb:bindings>
</smooks-resource-list>
smooks-config.xml
For more information about Smooks refer to their website and documentation.
You will need to download Smooks and add following jars to your classpath:
- commons-lang-2.1.jar
- commons-logging-1.1.jar
- milyn-commons-1.1.2.jar
- milyn-smooks-core-1.1.2.jar
- milyn-smooks-csv-1.1.2.jar
- opencsv-1.8.jar
That's all. For completeness, here's the source code:
package eu.kijanowski.drools.pipeline;
import java.util.Properties;
import javax.naming.Context;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.pipeline.Action;
import org.drools.runtime.pipeline.KnowledgeRuntimeCommand;
import org.drools.runtime.pipeline.Pipeline;
import org.drools.runtime.pipeline.PipelineFactory;
import org.drools.runtime.pipeline.Service;
import org.drools.runtime.pipeline.Transformer;
import org.milyn.Smooks;
public class JMSPipelineSmooksDemo {
public static void main(String args[]) throws Exception {
JMSPipelineSmooksDemo demo = new JMSPipelineSmooksDemo();
demo.go();
}
private void go() throws Exception {
//Create a knowledge session
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newClassPathResource("jms.drl", getClass()), ResourceType.DRL);
if (kbuilder.hasErrors()) {
System.out.println("KBuilder has errors: " + kbuilder.getErrors());
return;
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession();
//Stage 4 - inserting facts into the session
KnowledgeRuntimeCommand insertStage = PipelineFactory.newStatefulKnowledgeSessionInsert();
//Stage 3 - convert CSV to POJO
Smooks smooks = new Smooks( getClass().getResourceAsStream( "smooks-config.xml" ) );
Transformer transformerStage = PipelineFactory.newSmooksFromSourceTransformer(smooks, "person");
transformerStage.setReceiver(insertStage);
//Stage 2 - extracting CSV from JMS
Action unwrapObjectStage = PipelineFactory.newJmsUnwrapMessageObject();
unwrapObjectStage.setReceiver(transformerStage);
//Stage 1 - entry to the pipeline
Pipeline entry = PipelineFactory.newStatefulKnowledgeSessionPipeline(ksession);
entry.setReceiver(unwrapObjectStage);
//Setup a JMS messenger delivering JMS messages from testQueue to the pipeline
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
props.put(Context.PROVIDER_URL, "jnp://127.0.0.1:1099");
String destinationName = "queue/testQueue";
Service messenger = PipelineFactory.newJmsMessenger(entry, props, destinationName, null);
messenger.start();
// send JMS messages
SendJMSMessage sm = new SendJMSMessage();
sm.setupConnection();
sm.sendAMessage();
sm.stop();
// wait until the messages have gone through the pipeline
Thread.sleep(10000);
// session is ready to be executed
ksession.fireAllRules();
ksession.dispose();
messenger.stop();
}
}
JMSPipelineSmooksDemo.java
Just run this sample app and you should see the same output again:
We are family! ABC
Summary
This example showed one of the many features of the Drools Pipeline. We send JMS messages to a queue. With the use of a pipeline we consumed these messages, converted them into facts and inserted them into a knowledge session.
Since pipelines are two-way connections it's also possible to retrieve data from the rule engine and process it before exposing it to the outside world.
Read this blog entry and docs for more information. Have also a look at these samples to get a deeper understanding what pipelines can do.
Feel free to leave a comment.