@ComponentProfile(summary="Split a message and then execute the associated services on the split items, aggregating the split messages afterwards", since="3.11.1", tag="service,splitjoin") public class PooledSplitJoinService extends ServiceImp implements EventHandlerAware, ServiceWrapper
This supersedes SplitJoinService
and PoolingSplitJoinService
as our preferred
service for handling split/aggregation since it has more predictable peformance characteristics
in constrained environments where the numbers of messages that are generated and aggregated can
be large/unknown.
This service splits a message according to the configured MessageSplitter
implementation,
executes the configured Service
and subsequently aggregates all the messages back using
the configured MessageAggregator
implementation
A pool of Service
instances is maintained and re-used for each message;
the cost of initialisation for the wrapped service, is incurred during this service's
initialisation phase.
Aggregation may start happening as soon as messages are available to be aggregated (i.e. a split
message has been operated on) using the
MessageAggregator.aggregate(AdaptrisMessage, Iterable)
method. Performance
characteristics will largely depend on how the splitter and aggregator implementations iterate
over the messages.
In the adapter configuration file this class is aliased as pooled-split-join-service which is the preferred alternative to the fully qualified classname when building your configuration.
log
Constructor and Description |
---|
PooledSplitJoinService() |
Modifier and Type | Method and Description |
---|---|
protected void |
closeService() |
void |
doService(AdaptrisMessage msg)
Apply the service to the message.
|
protected java.lang.Iterable<AdaptrisMessage> |
doSplitService(java.lang.Iterable<AdaptrisMessage> msgs,
com.adaptris.core.services.splitter.PooledSplitJoinService.CountingExceptionHandlerWrapper exceptionHandler) |
@NonNull MessageAggregator |
getAggregator()
The
MessageAggregator implementation to use to join messages together. |
java.lang.Integer |
getPoolsize()
The size of the underlying object/thread pool used to execute services.
|
java.lang.Boolean |
getSendEvents()
Whether or not to send events for the split message once service execution has completed.
|
@NonNull Service |
getService()
The
Service to execute over all the split messages. |
ServiceErrorHandler |
getServiceErrorHandler()
The strategy to use when encountering any errors during execution.
|
@NonNull MessageSplitter |
getSplitter()
The
MessageSplitter implementation to use to split the incoming message. |
TimeInterval |
getTimeout()
The max amount of time to wait for all the operations to complete.
|
protected void |
initService() |
void |
prepare()
Prepare for initialisation.
|
void |
registerEventHandler(EventHandler eh)
Register the current event handler against this component.
|
protected AdaptrisMessage |
sendEvents(AdaptrisMessage msg) |
void |
setAggregator(@NonNull MessageAggregator aggregator)
The
MessageAggregator implementation to use to join messages together. |
void |
setPoolsize(java.lang.Integer poolsize)
The size of the underlying object/thread pool used to execute services.
|
void |
setSendEvents(java.lang.Boolean sendEvents)
Whether or not to send events for the split message once service execution has completed.
|
void |
setService(@NonNull Service service)
The
Service to execute over all the split messages. |
void |
setServiceErrorHandler(ServiceErrorHandler serviceErrorHandler)
The strategy to use when encountering any errors during execution.
|
void |
setSplitter(@NonNull MessageSplitter splitter)
The
MessageSplitter implementation to use to split the incoming message. |
void |
setTimeout(TimeInterval timeout)
The max amount of time to wait for all the operations to complete.
|
void |
start()
Starts the component.
|
void |
stop()
Stop the component
|
protected void |
waitQuietly(java.lang.Object monitor,
long timeoutMs) |
Service[] |
wrappedServices()
Return all the services that are wrapped by this service.
|
changeState, close, continueOnFailure, createName, createQualifier, getContinueOnFail, getIsTrackingEndpoint, getUniqueId, init, isBranching, isTrackingEndpoint, requestClose, requestInit, requestStart, requestStop, retrieveComponentState, setContinueOnFail, setIsTrackingEndpoint, setUniqueId
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
continueOnFailure, isBranching, setUniqueId
createName, createQualifier, isTrackingEndpoint
changeState, requestClose, requestInit, requestStart, requestStop, retrieveComponentState
getUniqueId
close, init
public void prepare() throws CoreException
ComponentLifecycleExtension
prepare
in interface ComponentLifecycleExtension
CoreException
protected void initService() throws CoreException
initService
in class ServiceImp
CoreException
public void start() throws CoreException
ComponentLifecycle
Once a component is started it should be ready to process messages. In the case of AdaptrisMessageConsumer
, calling start
will begin message delivery.
start
in interface ComponentLifecycle
start
in class ServiceImp
CoreException
- wrapping any underlying Exception
spublic void stop()
ComponentLifecycle
A stopped component is not expected to be ready to process messages. In the case of AdaptrisMessageConsumer
, calling stop
will pause message delivery. Throwing a RuntimeException
may cause unintended consequences
stop
in interface ComponentLifecycle
stop
in class ServiceImp
protected void closeService()
closeService
in class ServiceImp
public void doService(AdaptrisMessage msg) throws ServiceException
Service
Apply the service to the message.
doService
in interface Service
msg
- the AdaptrisMessage
to processServiceException
- wrapping any underlying Exception
sprotected java.lang.Iterable<AdaptrisMessage> doSplitService(java.lang.Iterable<AdaptrisMessage> msgs, com.adaptris.core.services.splitter.PooledSplitJoinService.CountingExceptionHandlerWrapper exceptionHandler)
protected void waitQuietly(java.lang.Object monitor, long timeoutMs)
public void registerEventHandler(EventHandler eh)
EventHandlerAware
registerEventHandler
in interface EventHandlerAware
eh
- the event handler currently in use.public Service[] wrappedServices()
ServiceWrapper
wrappedServices
in interface ServiceWrapper
protected AdaptrisMessage sendEvents(AdaptrisMessage msg) throws CoreException
CoreException
@NonNull public @NonNull Service getService()
Service
to execute over all the split messages.public void setService(@NonNull @NonNull Service service)
Service
to execute over all the split messages.@NonNull public @NonNull MessageSplitter getSplitter()
MessageSplitter
implementation to use to split the incoming message.public void setSplitter(@NonNull @NonNull MessageSplitter splitter)
MessageSplitter
implementation to use to split the incoming message.@NonNull public @NonNull MessageAggregator getAggregator()
MessageAggregator
implementation to use to join messages together.public void setAggregator(@NonNull @NonNull MessageAggregator aggregator)
MessageAggregator
implementation to use to join messages together.public TimeInterval getTimeout()
If not explicitly specified then is set to be 10 minutes; in the event that the timeout is exceeded, then an exception will be thrown eventually.
public void setTimeout(TimeInterval timeout)
If not explicitly specified then is set to be 10 minutes; in the event that the timeout is exceeded, then an exception will be thrown eventually.
public java.lang.Boolean getSendEvents()
Note that even if this is set to true, because each child message has its own unique id, you
will have to externally correlate the message lifecycle events together. Child messages will
always have the metadata CoreConstants.PARENT_UNIQUE_ID_KEY
set with
the originating message id.
If not explicitly specified then is set to false which means no events are sent for 'split' messages.
public void setSendEvents(java.lang.Boolean sendEvents)
Note that even if this is set to true, because each child message has its own unique id, you
will have to externally correlate the message lifecycle events together. Child messages will
always have the metadata CoreConstants.PARENT_UNIQUE_ID_KEY
set with
the originating message id.
If not explicitly specified then is set to false which means no events are sent for 'split' messages.
public java.lang.Integer getPoolsize()
The default value is '10' unless explicitly configured
public void setPoolsize(java.lang.Integer poolsize)
The default value is '10' unless explicitly configured
public ServiceErrorHandler getServiceErrorHandler()
Defaults to ServiceExceptionHandler
if not explicitly specified
public void setServiceErrorHandler(ServiceErrorHandler serviceErrorHandler)
Defaults to ServiceExceptionHandler
if not explicitly specified