JBoss.orgCommunity Documentation
In the following text, we will refer to two types of "multi-threading": logical and technical. Technical multi-threading is what happens when multiple threads or processes are started on a computer, for example by a Java or C program. Logical multi-threading is what we see in a BPM process after the process reaches a parallel gateway, for example. From a functional standpoint, the original process will then split into two processes that are executed in a parallel fashion.
Of course, the jBPM engine supports logical multi-threading: for example, processes that include a parallel gateway. We've chosen to implement logical multi-threading using one thread: a jBPM process that includes logical multi-threading will only be executed in one technical thread. The main reason for doing this is that multiple (technical) threads need to be be able to communicate state information with each other if they are working on the same process. This requirement brings with it a number of complications. While it might seem that multi-threading would bring performance benefits with it, the extra logic needed to make sure the different threads work together well means that this is not guaranteed. There is also the extra overhead incurred because we need to avoid race conditions and deadlocks.
In general, the jBPM engine executes actions in serial. For example, when the engine encounters a script task in a process, it will synchronously execute that script and wait for it to complete before continuing execution. Similarly, if a process encounters a parallel gateway, it will sequentially trigger each of the outgoing branches, one after the other. This is possible since execution is almost always instantaneous, meaning that it is extremely fast and produces almost no overhead. As a result, the user will usually not even notice this. Similarly, action scripts in a process are also synchronously executed, and the engine will wait for them to finish before continuing the process. For example, doing a Thread.sleep(...) as part of a script will not make the engine continue execution elsewhere but will block the engine thread during that period.
The same principle applies to service tasks. When a service task is reached in a process, the engine will also invoke the handler of this service synchronously. The engine will wait for the completeWorkItem(...) method to return before continuing execution. It is important that your service handler executes your service asynchronously if its execution is not instantaneous.
An example of this would be a service task that invokes an external service. Since the delay in invoking this service remotely and waiting for the results might be too long, it might be a good idea to invoke this service asynchronously. This means that the handler will only invoke the service and will notify the engine later when the results are available. In the mean time, the process engine then continues execution of the process.
Human tasks are a typical example of a service that needs to be invoked asynchronously, as we don't want the engine to wait until a human actor has responded to the request. The human task handler will only create a new task (on the task list of the assigned actor) when the human task node is triggered. The engine will then be able to continue execution on the rest of the process (if necessary) and the handler will notify the engine asynchronously when the user has completed the task.
The simplest way to run multiple processes is to run them all using one knowledge session. However, there are cases in which it's necessary to run multiple processes in different knowledge sessions, even in different (technical) threads. Both are supported by jBPM.
When we add persistence (using a database, for example) to a situation in which we have multiple knowledge sessions (and processes), there is a guideline that users should be aware of. The following paragraphs explain why this guideline is important to follow.
Please make sure to use a database that allows row-level locks as well as table-level locks.
For example, a user could have a situation in which there are 2 (or more) threads running, each with its own knowledge session instance. On each thread, jBPM processes are being started using the local knowledge session instance.
In this use case, a race condition exists in which both thread A and thread B will have coincidentally simultaneously finished a process. At this point, because persistence is being used, both thread A and B will be committing changes to the database. If row-level locks are not possible, then the following situation can occur:
Thread A has a lock on the ProcessInstanceInfo table, having just committed a change to that table.
Thread A wants a lock on the SessionInfo table in order to commit a change there.
Thread B has the opposite situation: it has a lock on the SessionInfo table, having just committed a change there.
Thread B wants a lock on the ProcessInstanceInfo table, even though Thread A already has a lock on it.
This is a deadlock situation which the database and application will not be able to solve. However, if row-level locks are possible (and enabled!!) in the database (and tables used), then this situation will not occur.
How can we implement an asynchronous service handler? To start with, this depends on the technology you're using. If you're only using Java, you could execute the actual service in a new thread:
public class MyServiceTaskHandler implements WorkItemHandler {
public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
new Thread(new Runnable() {
public void run() {
// Do the heavy lifting here ...
}
}).start();
}
public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
}
}
It's advisable to have your handler contact a service that executes the business operation, instead of having it perform the actual work. If anything goes wrong with a business operation, it doesn't affect your process. The loose coupling that this provides also gives you greater flexibility in reusing services and developing them.
For example, you can have your human task handler simply invoke the human task service to add a task there. To implement an asynchronous handler, you usually have to simply do an asynchronous invocation of this service. This usually depends on the technology you use to do the communication, but this might be as simple as asynchronously invoking a web service, or sending a JMS message to the external service.
In version 6, jBPM introduces new component called jbpm executor which provides quite advanced features for asynchronous execution. It delivers generic environment for background execution of commands. Commands are nothing more than business logic encapsulated within simple interface. It does not have any process runtime related information, that means no need to complete work items, or anything of that sort. It purely focuses on the business logic to be executed. It receives data via CommandContext and returns results of the execution with ExecutionResults.
Before looking into details on jBPM support for asynchronous execution let's look at what are the common requirements for such execution:
allows asynchronous execution of given piece of business logic
allows to retry in case of resources are temporarily unavailable e.g. external system interaction
allows to handle errors in case all retries have been attempted
provides cancellation option
provides history log of execution
When confronting these requirements with the "simple async handler" (executed as separate thread) you can directly notice that all of these would need to be implemented all over again by different systems. Due to that a common, generic component has been provided out of the box to simplify and empower usage.
jBPM executor operates on commands, which are essential piece of code that is going to be executed as background job.
/**
* Executor's Command are dedicated to contain purely business logic that should be executed.
* It should not have any reference to underlying process engine and should not be concerned
* with any process runtime related logic such us completing work item, sending signals, etc.
* <br/>
* Information that are taken from process will be delivered as part of data instance of
* <code>CommandContext</code>. Depending on the execution context that data can vary but
* in most of the cases following will be given:
* <ul>
* <li></li>
* <li>businessKey - usually unique identifier of the caller</li>
* <li>callbacks - FQCN of the <code>CommandCollback</code> that shall be used on command completion</li>
* </ul>
* When executed as part of the process (work item handler) additional data can be expected:
* <ul>
* <li>workItem - the actual work item that is being executed with all its parameters</li>
* <li>processInstanceId - id of the process instance that triggered this work</li>
* <li>deploymentId - if given process instance is part of an active deployment</li>
* </ul>
* Important note about implementations is that it shall always be possible to be initialized with default constructor
* as executor service is an async component so it will initialize the command on demand using reflection.
* In case there is a heavy logic on initialization it should be placed in another service implementation that
* can be looked up from within command.
*/
public interface Command {
/**
* Executed this command's logic.
* @param ctx - contextual data given by the executor service
* @return returns any results in case of successful execution
* @throws Exception in case execution failed and shall be retried if possible
*/
public ExecutionResults execute(CommandContext ctx) throws Exception;
}
Looking at the interface above, there is no specific integration with the jBPM runtime engine, it's decoupled from it to put main focus on the actual logic that shall be executed as part of that command rather to worry about integration with process engine. This design promotes reuse of already existing logic by simply wrapping it with Command implementation.
Input data is transferred from process engine to command via CommandContext. It acts purely as data transfer object and puts single requirement on the data it holds - all objects must be serializable.
/**
* Data holder for any contextual data that shall be given to the command upon execution.
* Important note that every object that is added to the data container must be serializable
* meaning it must implement <code>java.io.Seriazliable</code>
*
*/
public class CommandContext implements Serializable {
private static final long serialVersionUID = -1440017934399413860L;
private Map<String, Object> data;
public CommandContext() {
data = new HashMap<String, Object>();
}
public CommandContext(Map<String, Object> data) {
this.data = data;
}
public void setData(Map<String, Object> data) {
this.data = data;
}
public Map<String, Object> getData() {
return data;
}
public Object getData(String key) {
return data.get(key);
}
public void setData(String key, Object value) {
data.put(key, value);
}
public Set<String> keySet() {
return data.keySet();
}
@Override
public String toString() {
return "CommandContext{" + "data=" + data + '}';
}
}
Next outcome is provided to process engine via ExecutionResults, which is very similar in nature to the CommandContext and acts as data transfer object.
/**
* Data holder for command's result data. Whatever command produces should be placed in
* this results so they can be later on referenced by name by the requester - e.g. process instance.
*
*/
public class ExecutionResults implements Serializable {
private static final long serialVersionUID = -1738336024526084091L;
private Map<String, Object> data = new HashMap<String, Object>();
public ExecutionResults() {
}
public void setData(Map<String, Object> data) {
this.data = data;
}
public Map<String, Object> getData() {
return data;
}
public Object getData(String key) {
return data.get(key);
}
public void setData(String key, Object value) {
data.put(key, value);
}
public Set<String> keySet() {
return data.keySet();
}
@Override
public String toString() {
return "ExecutionResults{" + "data=" + data + '}';
}
}
Executor covers all requirements listed above and provides user interface as part of jbpm console and kie workbench (kie-wb) applications.
Above screenshot illustrates history view of executor's job queue. As can be seen on it there are several options available:
view details of the job
cancel given job
create new job
jBPM (again in version 6) provides an out of the box async work item handler that is backed by the jbpm executor. So by default all features that executor delivers will be available for background execution within process instance. AsyncWorkItemHandler can be configured in two ways:
as generic handler that expects to get the command name as part of work item parameters
as specific handler for given type of work item - for example web service
Option 1 is by default configured for jbpm console and kie-wb web applications and is registered under async name in every ksession that is bootstrapped within the applications. So whenever there is a need to execute some logic asynchronously following needs to be done at modeling time (using jbpm web designer):
specify async as TaskName property
create data input called CommandClass
assign fully qualified class name for the CommandClass data input
Next follow regular way to complete process modeling. Note that all data inputs will be transferred to executor so they must be serializable.
Second option allows to register different instances of AsyncWorkItemHandler for different work items. Since it's registered for dedicated work item most likely the command will be dedicated to that work item as well. If so CommandClass can be specified on registration time instead of requiring it to be set as work item parameters. To register such handlers for jbpm console or kie-wb additional class is required to inform what shall be registered. A CDI bean that implements WorkItemHandlerProducer interface needs to be provided and placed on the application classpath so CDI container will be able to find it. Then at modeling time TaskName property needs to be aligned with those used at registration time.
jbpm executor is configurable to allow fine tuning of its environment. In general jbpm executor runs as a thread pool that periodically checks for waiting jobs and executes them when needed. Configuration of jbpm executor is done via system properties:
org.kie.executor.disabled = true|false - allows to completely disable executor component
org.kie.executor.pool.size = Integer - allows to specify thread pool size where default it 1
org.kie.executor.retry.count = Integer - allows to specify number of retries in case of errors while running a job
org.kie.executor.interval = Integer - allows to specify interval (in seconds) that executor will use while checking for waiting jobs where default is 3 seconds
org.kie.executor.timeunit = String - allows to specify timer unit used for calculating interval, value must be a valid constant of java.util.concurrent.TimeUnit, by default it's SECONDS.
jbpm executor introduced (in verion 6.2) extension to jobs (aka commands) that allow single job to be executed multiple times. That feature is brought to the executor via additional interface that command should implement.
/**
* Marks given executor command it is reoccurring and shall be rescheduled after completion of single instance.
*
*/
public interface Reoccurring {
/**
* Returns next time to be scheduled. Date must be in future as jobs cannot be scheduled in past.
* Returns null in case it should not be scheduled any more.
* @return
*/
Date getScheduleTime();
}
Reoccurring interface is very simple and enforces implementation to provide the next schedule time that the command should be executed at. It must already be valid date that is not in the past. In case no more invocation of given command should happen return value of this method should be null.
An excellent example of such command is org.jbpm.executor.commands.LogCleanupCommand that provides easy and convineint way to schedule periodic clean up of jBPM log tables on defined time intervals. See this article to see it in action and how to configure and run it.
By default jbpm executor is cluster ready and by that will distribute jobs across all cluster members. That might result in execution of given job on different cluster member than it was scheduled which is not always desired. To override this mechanism job can set 'Owner' as part of their data when being registered wher owner is the executor instance that is scheduling the job.
CommandContext ctx = new CommandContext();
ctx.setData("some data", "data...");
ctx.setData("retries", 0);
ctx.setData("owner", ExecutorService.EXECUTOR_ID);
That will ensure that only the isntance that scheduled the job will be the one which will execute it. Note that it might impact the time when the job is executed especially in cases where given cluster member will be unavailable.
Asynchronous jobs are by default executed based on their scheduled time thus in case several jobs are scheduled to be executed at the same time there might be an issue with which one will be executed first. To override default behavior priorities can be assigned to individual jobs. Priority is given as integer from 0-9 range where 0 is the lowest priority and 9 is the highest. This can be done:
directly via CommandContext using priority context data object where value is a valid integer from 0-9 range
via data inputs of Async task where data input property name is Priority and value is a valid integer from 0-9 range
With assigned priority jBPM executor will pick the jobs based on the scheduled time and their priority. In case there is no priority assigned jBPM executor will set it to 5 as default.
jBPM executor can utilize JMS broker for notifying about job to be executed (only jobs to be executed immediately) and the priority then is also set on JMS message so the JMS broker will take this into account on delivery.