@ComponentProfile(summary="Workflow with a thread pool handling the service chain", tag="workflow,base") public class PoolingWorkflow extends WorkflowImp
Pooling of ServiceCollection
s is useful in situations where the services are considered the bottleneck for the
throughput of messages (e.g. local FS to local FS, but with a slow JdbcService or WebServicesQueryService to extract data for
lookups).
If you specify min-idle, max-idle and pool-size to be equal to each other then you will effectively end up with a fixed size pool
of the size requested. There are some instances where a fixed size pool is desirable, such as when the service list that is being
pooled takes a significant amount of time to become ready to use (e.g. multiple database connections/JMS connections over a WAN).
By making a pool size fixed you only pay the cost of initialisation once when the workflow is first started. Of course, using a
fixed size pool can cause its own problems if long-lived connections are terminated silently by the remote system. If you are
using SharedConnection
within the service-collection, then it is advised that you use a fixed size pool; otherwise as
workers are deactivated then this could cause the underlying connection instance to be closed, which will cause issues for other
objects sharing the connection.
If stop()
is invoked then any messages that are currently being processed will be allowed to finish, however any new
messages that enter the workflow via onAdaptrisMessage(AdaptrisMessage)
before the
AdaptrisMessageConsumer
is succesfully stopped will be treated as bad messages and sent directly to the
configured ProcessingExceptionHandler
.
ProcessingExceptionHandler
In the adapter configuration file this class is aliased as pooling-workflow which is the preferred alternative to the fully qualified classname when building your configuration.
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_MAX_IDLE
The default max idle size.
|
static int |
DEFAULT_MAX_POOLSIZE
The default maximum pool size.
|
static int |
DEFAULT_MIN_IDLE
the default minimum idle size.
|
eventHandler, log, startTime, stopTime
PREVIOUS_GUID_KEY, WORKFLOW_ID_KEY
Constructor and Description |
---|
PoolingWorkflow() |
PoolingWorkflow(java.lang.String uniqueId) |
Modifier and Type | Method and Description |
---|---|
protected void |
closeWorkflow()
Close the workflow.
|
int |
currentlyActiveObjects()
Return the number of currently active objects.
|
int |
currentlyIdleObjects()
Return the number of currently idle objects.
|
int |
currentObjectPoolCount()
Return the total number of objects in the pool.
|
int |
currentThreadPoolCount()
Return the current number of active threads in the thread pool.
|
void |
doProduce(AdaptrisMessage msg)
This method contains the behaviour that varies between standard and request -reply workflows.
|
TimeInterval |
getInitWaitTime() |
java.lang.Integer |
getMaxIdle()
Return the maximum idle objects in the pool.
|
java.lang.Integer |
getMinIdle()
Return the minimum idle objects in the pool.
|
java.lang.Integer |
getPoolSize()
Get the size of the pool.
|
TimeInterval |
getShutdownWaitTime() |
TimeInterval |
getThreadKeepAlive() |
java.lang.Integer |
getThreadPriority() |
void |
handleBadMessage(AdaptrisMessage msg)
Handle a 'bad' message.
|
protected void |
handleBadMessage(java.lang.String logMsg,
java.lang.Exception e,
AdaptrisMessage msg) |
void |
handleProduceException()
Handle an
Exception encountered producing a message. |
protected void |
initialiseWorkflow()
Initialise the workflow.
|
long |
initWaitTimeMs() |
int |
maxIdle()
Return the maximum idle objects in the pool.
|
int |
minIdle()
Return the maximum idle objects in the pool.
|
void |
onAdaptrisMessage(AdaptrisMessage msg)
Process a message from the
MessageConsumer |
int |
poolSize() |
protected void |
prepareWorkflow() |
protected void |
resubmitMessage(AdaptrisMessage msg)
Resubmit a message upon the channel becoming available again.
|
protected void |
sendMessageLifecycleEvent(AdaptrisMessage msg) |
void |
setInitWaitTime(TimeInterval t)
Set the amount of time to wait for object pool population.
|
void |
setMaxIdle(java.lang.Integer i)
Set the maximum number of idle objects in the pool.
|
void |
setMinIdle(java.lang.Integer i)
Set the minimum number of idle objects in the pool.
|
void |
setPoolSize(java.lang.Integer i)
Set the size of the pool.
|
void |
setShutdownWaitTime(TimeInterval interval)
Set the shutdown wait timeout for the pool.
|
void |
setThreadKeepAlive(TimeInterval interval)
Set the lifetime for threads in the pool.
|
void |
setThreadPriority(java.lang.Integer i) |
long |
shutdownWaitTimeMs() |
protected void |
startWorkflow()
Start the workflow.
|
protected void |
stopWorkflow()
Stop the workflow.
|
long |
threadLifetimeMs() |
int |
threadPriority() |
addConsumeLocation, addInterceptor, changeState, channelUnavailableWait, close, copyExceptionHeaders, disableMessageCount, friendlyName, getChannelUnavailableWaitInterval, getConsumer, getDisableDefaultMessageCount, getInterceptors, getLogPayload, getMessageErrorHandler, getMessageLogger, getProduceExceptionHandler, getProducer, getSendEvents, getServiceCollection, getUniqueId, handleChannelUnavailable, init, lastStartTime, lastStopTime, logSuccess, messageLogger, obtainChannel, obtainWorkflowId, prepare, registerActiveMsgErrorHandler, registerChannel, registerEventHandler, requestClose, requestInit, requestStart, requestStop, retrieveActiveMsgErrorHandler, retrieveComponentState, setChannelUnavailableWaitInterval, setConsumer, setDisableDefaultMessageCount, setInterceptors, setLogPayload, setMessageErrorHandler, setMessageLogger, setProduceExceptionHandler, setProducer, setSendEvents, setServiceCollection, setUniqueId, start, stop, workflowEnd, workflowStart
public static final int DEFAULT_MAX_POOLSIZE
public static final int DEFAULT_MIN_IDLE
public static final int DEFAULT_MAX_IDLE
public PoolingWorkflow()
public PoolingWorkflow(java.lang.String uniqueId) throws CoreException
CoreException
public void setPoolSize(java.lang.Integer i)
i
- the size of the poolpublic java.lang.Integer getPoolSize()
setPoolSize(Integer)
public int poolSize()
public long threadLifetimeMs()
public long shutdownWaitTimeMs()
public TimeInterval getThreadKeepAlive()
public void setThreadKeepAlive(TimeInterval interval)
Threads that have been dormant for the specified interval are discarded.
interval
- the lifetime (default is 60 seconds)public TimeInterval getShutdownWaitTime()
public void setShutdownWaitTime(TimeInterval interval)
When stop()
is invoked, this causes a emptying and shutdown of the pool. The specified value is the amount of time
to wait for a clean shutdown. If this timeout is exceeded then a forced shutdown ensues, which may mean messages are in an
inconsistent state.
interval
- the shutdown time (default is 60 seconds)WorkflowImp.stop()
protected void initialiseWorkflow() throws CoreException
initialiseWorkflow
in class WorkflowImp
CoreException
- if the workflow failed to initialise. This exception encapsulates any underlying exception.WorkflowImp.initialiseWorkflow()
protected void startWorkflow() throws CoreException
WorkflowImp
startWorkflow
in class WorkflowImp
CoreException
- encapsulating any underlying ExceptionWorkflowImp.startWorkflow()
protected void closeWorkflow()
WorkflowImp
closeWorkflow
in class WorkflowImp
WorkflowImp.closeWorkflow()
protected void stopWorkflow()
WorkflowImp
stopWorkflow
in class WorkflowImp
WorkflowImp.stopWorkflow()
public void onAdaptrisMessage(AdaptrisMessage msg)
MessageConsumer
If stop()
is invoked then any messages that are currently being processed will be allowed to finish, however any
new messages that enter the workflow via onAdaptrisMessage(AdaptrisMessage)
will be treated as BAD messages and
sent directly to the configured MessageErrorHandler.
msg
- the AdaptrisMessage.AdaptrisMessageListener.onAdaptrisMessage(AdaptrisMessage)
,
WorkflowImp.handleBadMessage(AdaptrisMessage)
protected void resubmitMessage(AdaptrisMessage msg)
WorkflowImp
resubmitMessage
in class WorkflowImp
msg
- the AdaptrisMessage.WorkflowImp.resubmitMessage(com.adaptris.core.AdaptrisMessage)
public void handleBadMessage(AdaptrisMessage msg)
Workflow
Handle a 'bad' message. A bad message is one which has caused an
Exception
in the ServceCollection
or
AdaptrisMessageProducer
.
handleBadMessage
in interface Workflow
handleBadMessage
in class WorkflowImp
msg
- the original version of the 'bad' messageWorkflowImp.handleBadMessage(AdaptrisMessage)
protected void handleBadMessage(java.lang.String logMsg, java.lang.Exception e, AdaptrisMessage msg)
handleBadMessage
in class WorkflowImp
public void doProduce(AdaptrisMessage msg) throws ServiceException, ProduceException
WorkflowImp
This method contains the behaviour that varies between standard and request -reply workflows. It is overridden in
RequestReplyWorkflow
.
doProduce
in interface Workflow
doProduce
in class WorkflowImp
msg
- the message to processServiceException
- not thrown by this implementationProduceException
- if any occurWorkflowImp.doProduce(com.adaptris.core.AdaptrisMessage)
public void handleProduceException()
Workflow
Handle an Exception
encountered producing a message.
handleProduceException
in interface Workflow
handleProduceException
in class WorkflowImp
WorkflowImp.handleProduceException()
protected void sendMessageLifecycleEvent(AdaptrisMessage msg)
sendMessageLifecycleEvent
in class WorkflowImp
WorkflowImp.sendMessageLifecycleEvent(AdaptrisMessage)
public java.lang.Integer getThreadPriority()
public void setThreadPriority(java.lang.Integer i)
public java.lang.Integer getMinIdle()
public void setMinIdle(java.lang.Integer i)
i
- the minIdle to setpublic int minIdle()
public java.lang.Integer getMaxIdle()
public void setMaxIdle(java.lang.Integer i)
i
- the maxIdle to set (default 10)public int maxIdle()
public int threadPriority()
public TimeInterval getInitWaitTime()
public void setInitWaitTime(TimeInterval t)
Upon start the object pool is populated with the minIdle()
number of workers.
t
- the initWaitTime to set, default if not specified is 1 minutepublic long initWaitTimeMs()
public int currentObjectPoolCount()
public int currentlyActiveObjects()
public int currentlyIdleObjects()
public int currentThreadPoolCount()
protected void prepareWorkflow() throws CoreException
prepareWorkflow
in class WorkflowImp
CoreException