@ComponentProfile(summary="Workflow with a thread pool handling the service chain", tag="workflow,base") public class PoolingWorkflow extends WorkflowWithObjectPool
Pooling of ServiceCollection
s is useful in situations where the services are considered the bottleneck for the
throughput of messages (e.g. local FS to local FS, but with a slow JdbcService or WebServicesQueryService to extract data for
lookups).
If you specify min-idle, max-idle and pool-size to be equal to each other then you will effectively end up with a fixed size pool
of the size requested. There are some instances where a fixed size pool is desirable, such as when the service list that is being
pooled takes a significant amount of time to become ready to use (e.g. multiple database connections/JMS connections over a WAN).
By making a pool size fixed you only pay the cost of initialisation once when the workflow is first started. Of course, using a
fixed size pool can cause its own problems if long-lived connections are terminated silently by the remote system. If you are
using SharedConnection
within the service-collection, then it is advised that you use a fixed size pool; otherwise as
workers are deactivated then this could cause the underlying connection instance to be closed, which will cause issues for other
objects sharing the connection.
If stop()
is invoked then any messages that are currently being processed will be allowed to finish, however any new
messages that enter the workflow via onAdaptrisMessage(AdaptrisMessage)
before the
AdaptrisMessageConsumer
is succesfully stopped will be treated as bad messages and sent directly to the
configured ProcessingExceptionHandler
.
ProcessingExceptionHandler
In the adapter configuration file this class is aliased as pooling-workflow which is the preferred alternative to the fully qualified classname when building your configuration.
WorkflowWithObjectPool.Worker, WorkflowWithObjectPool.WorkerFactory
DEFAULT_MAX_IDLE, DEFAULT_MAX_POOLSIZE, DEFAULT_MIN_IDLE
eventHandler, log, startTime, stopTime
PREVIOUS_GUID_KEY, WORKFLOW_ID_KEY
Constructor and Description |
---|
PoolingWorkflow() |
PoolingWorkflow(java.lang.String uniqueId) |
Modifier and Type | Method and Description |
---|---|
protected void |
closeWorkflow()
Close the workflow.
|
int |
currentlyActiveObjects()
Return the number of currently active objects.
|
int |
currentlyIdleObjects()
Return the number of currently idle objects.
|
int |
currentObjectPoolCount()
Return the total number of objects in the pool.
|
int |
currentThreadPoolCount()
Return the current number of active threads in the thread pool.
|
void |
doProduce(AdaptrisMessage msg)
This method contains the behaviour that varies between standard and request -reply workflows.
|
TimeInterval |
getShutdownWaitTime()
Set the shutdown wait timeout for the pool.
|
TimeInterval |
getThreadKeepAlive()
Set the lifetime for threads in the pool.
|
java.lang.Integer |
getThreadPriority()
The priority for threads created to handle messages.
|
void |
handleBadMessage(AdaptrisMessage msg)
Handle a 'bad' message.
|
void |
handleProduceException()
Handle an
Exception encountered producing a message. |
protected void |
initialiseWorkflow()
Initialise the workflow.
|
protected void |
onMessage(AdaptrisMessage msg) |
protected void |
sendMessageLifecycleEvent(AdaptrisMessage msg) |
void |
setShutdownWaitTime(TimeInterval shutdownWaitTime)
Set the shutdown wait timeout for the pool.
|
void |
setThreadKeepAlive(TimeInterval threadKeepAlive)
Set the lifetime for threads in the pool.
|
void |
setThreadPriority(java.lang.Integer threadPriority)
The priority for threads created to handle messages.
|
long |
shutdownWaitTimeMs() |
protected void |
startWorkflow()
Start the workflow.
|
protected void |
stopWorkflow()
Stop the workflow.
|
long |
threadLifetimeMs() |
int |
threadPriority() |
checkPoolConfig, cloneServiceCollection, createObjectPool, getInitWaitTime, getMaxIdle, getMinIdle, getPoolSize, initWaitTimeMs, maxIdle, minIdle, onAdaptrisMessage, poolSize, populatePool, preFlightServiceCheck, prepareWorkflow, resubmitMessage, returnObject, setInitWaitTime, setMaxIdle, setMinIdle, setPoolSize
addConsumeLocation, addInterceptor, changeState, channelUnavailableWait, close, copyExceptionHeaders, disableMessageCount, friendlyName, getChannelUnavailableWaitInterval, getComments, getConsumer, getDisableDefaultMessageCount, getInterceptors, getMessageErrorHandler, getMessageLogger, getProduceExceptionHandler, getProducer, getSendEvents, getServiceCollection, getUniqueId, handleBadMessage, handleChannelUnavailable, init, lastStartTime, lastStopTime, logSuccess, messageLogger, obtainChannel, obtainWorkflowId, prepare, processingStart, registerActiveMsgErrorHandler, registerChannel, registerEventHandler, requestClose, requestInit, requestStart, requestStop, retrieveActiveMsgErrorHandler, retrieveComponentState, sendEvents, setChannelUnavailableWaitInterval, setComments, setConsumer, setDisableDefaultMessageCount, setInterceptors, setMessageErrorHandler, setMessageLogger, setProduceExceptionHandler, setProducer, setSendEvents, setServiceCollection, setUniqueId, start, stop, workflowEnd, workflowStart
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
onAdaptrisMessage, onAdaptrisMessage
public PoolingWorkflow()
public PoolingWorkflow(java.lang.String uniqueId) throws CoreException
CoreException
public long threadLifetimeMs()
public long shutdownWaitTimeMs()
protected void initialiseWorkflow() throws CoreException
initialiseWorkflow
in class WorkflowImp
CoreException
- if the workflow failed to initialise. This exception encapsulates any underlying exception.WorkflowImp.initialiseWorkflow()
protected void startWorkflow() throws CoreException
WorkflowImp
startWorkflow
in class WorkflowImp
CoreException
- encapsulating any underlying ExceptionWorkflowImp.startWorkflow()
protected void closeWorkflow()
WorkflowImp
closeWorkflow
in class WorkflowImp
WorkflowImp.closeWorkflow()
protected void stopWorkflow()
WorkflowImp
stopWorkflow
in class WorkflowImp
WorkflowImp.stopWorkflow()
protected void onMessage(AdaptrisMessage msg)
onMessage
in class WorkflowWithObjectPool
public void handleBadMessage(AdaptrisMessage msg)
Workflow
Handle a 'bad' message. A bad message is one which has caused an
Exception
in the ServceCollection
or
AdaptrisMessageProducer
.
handleBadMessage
in interface Workflow
handleBadMessage
in class WorkflowImp
msg
- the original version of the 'bad' messageWorkflowImp.handleBadMessage(AdaptrisMessage)
public void doProduce(AdaptrisMessage msg) throws ServiceException, ProduceException
WorkflowImp
This method contains the behaviour that varies between standard and request -reply workflows. It is overridden in
RequestReplyWorkflow
.
doProduce
in interface Workflow
doProduce
in class WorkflowImp
msg
- the message to processServiceException
- not thrown by this implementationProduceException
- if any occurWorkflowImp.doProduce(com.adaptris.core.AdaptrisMessage)
public void handleProduceException()
Workflow
Handle an Exception
encountered producing a message.
handleProduceException
in interface Workflow
handleProduceException
in class WorkflowImp
WorkflowImp.handleProduceException()
protected void sendMessageLifecycleEvent(AdaptrisMessage msg)
sendMessageLifecycleEvent
in class WorkflowImp
WorkflowImp.sendMessageLifecycleEvent(AdaptrisMessage)
public int threadPriority()
public int currentObjectPoolCount()
public int currentlyActiveObjects()
public int currentlyIdleObjects()
public int currentThreadPoolCount()
public TimeInterval getThreadKeepAlive()
Threads that have been dormant for the specified interval are discarded.
public void setThreadKeepAlive(TimeInterval threadKeepAlive)
Threads that have been dormant for the specified interval are discarded.
public TimeInterval getShutdownWaitTime()
When stop()
is invoked, this causes a emptying and shutdown of the pool. The
specified value is the amount of time to wait for a clean shutdown. If this timeout is exceeded
then a forced shutdown ensues, which may mean messages are in an inconsistent state.
public void setShutdownWaitTime(TimeInterval shutdownWaitTime)
When stop()
is invoked, this causes a emptying and shutdown of the pool. The
specified value is the amount of time to wait for a clean shutdown. If this timeout is exceeded
then a forced shutdown ensues, which may mean messages are in an inconsistent state.
public java.lang.Integer getThreadPriority()
public void setThreadPriority(java.lang.Integer threadPriority)