Dubbo事件派发策略
默认是All:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。 即worker线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他事。
Direct:worker线程接收到事件后,由worker执行到底(不绝对,后续会根据源码讲解)。
Message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO线程上执行
Execution:只请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行
Connection:在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
All事件派发策略
默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中。下面我们来分析一下 AllChannelHandler 的代码。
connected主要负责请求、响应的发起和接收,received方法主要负责请求或者相应的处理业务。先大致描述下有关AllChannelHandler在这一部分的一次请求流程:client发起请求connected->server接收请求connected->nettyHandler.receive方法接收请求->AllChannelHandler#receive处理请求->server发起响应请求connected->client接收请求connected。
/**
* 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
*/
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
/**
* 处理连接事件
* @param channel 代表一个网络连接的成功建立
* @throws RemotingException
*/
@Override
public void connected(Channel channel) throws RemotingException {
/** 针对网络连接建立的事件,此时就会拿到一个业务线程池 */
ExecutorService executor = getSharedExecutorService();
try {
/** 封装成ChannelEventRunnable任务,提交给线程池进行执行 */
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
/**
* 如果发现跟一个consumer端的网络连接断开了
* 则会获取一个线程池去执行任务
* @param channel
* @throws RemotingException
*/
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
/** 封装成ChannelEventRunnable任务,提交给线程池进行执行 */
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
/**
* 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response
* @param channel
* @param message
* @throws RemotingException
*/
@Override
public void received(Channel channel, Object message) throws RemotingException {
/** 获取线程池 */
ExecutorService executor = getPreferredExecutorService(message);
try {
/** 将请求和响应消息派发到线程池中处理 */
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
/**
* 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
* 错误信息封装到 Response 中,并返回给服务消费方
*/
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
WrapperChannelHandler负责的是事件派发
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
}
public void close() {
}
@Override
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
handler.received(channel, message);
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
protected void sendFeedback(Channel channel, Request request, Throwable t) throws RemotingException {
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") thread pool is exhausted, detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
/** 返回包含错误信息的 Response 对象 */
channel.send(response);
return;
}
}
@Override
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
public URL getUrl() {
return url;
}
/**
* Currently, this method is mainly customized to facilitate the thread model on consumer side.
* 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call.
* 2. Use shared executor to execute the callback.
*
* @param msg
* @return
*/
public ExecutorService getPreferredExecutorService(Object msg) {
if (msg instanceof Response) {
Response response = (Response) msg;
DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// a typical scenario is the response returned after timeout, the timeout response may have completed the future
if (responseFuture == null) {
return getSharedExecutorService();
} else {
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
executor = getSharedExecutorService();
}
return executor;
}
} else {
return getSharedExecutorService();
}
}
/**
* get the shared executor for current Server or Client
*
* @return
*/
public ExecutorService getSharedExecutorService() {
// Application may be destroyed before channel disconnected, avoid create new application model
// see https://github.com/apache/dubbo/issues/9127
if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {
return GlobalResourcesRepository.getGlobalExecutorService();
}
// note: url.getOrDefaultApplicationModel() may create new application model
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
/**
* ExtensionLoader,基于SPI扩展机制去获取实现类对象
* ExecutorRepository:主要是负责获取线程池组件的
*/
ExecutorRepository executorRepository =
applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
/**
* 根据传入的url,会从url中提取一些参数出来,在根据url的参数去决定要获取什么样的线程池
*/
ExecutorService executor = executorRepository.getExecutor(url);
if (executor == null) {
/** 创建和构建线程池的行为 */
executor = executorRepository.createExecutorIfAbsent(url);
}
return executor;
}
@Deprecated
public ExecutorService getExecutorService() {
return getSharedExecutorService();
}
}
请求对象会被封装 ChannelEventRunnable 中被线程池执行,ChannelEventRunnable 将会是服务调用过程的新起点。所以接下来我们以 ChannelEventRunnable 为起点向下探索。
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
this(channel, handler, state, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
this(channel, handler, state, message, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
this(channel, handler, state, null, t);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
/** 检测通道状态,对于请求或响应消息,此时 state = RECEIVED */
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
/**
* ChannelState
*
*
*/
public enum ChannelState {
/**
* CONNECTED
*/
CONNECTED,
/**
* DISCONNECTED
*/
DISCONNECTED,
/**
* SENT
*/
SENT,
/**
* RECEIVED
*/
RECEIVED,
/**
* CAUGHT
*/
CAUGHT
}
}
我们在接着分析DecodeHandler
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
/** 对 Decodeable 接口实现类对象进行解码 */
decode(message);
}
if (message instanceof Request) {
/** 对 Request 的 data 字段进行解码 */
decode(((Request) message).getData());
}
if (message instanceof Response) {
/** 对 Response 的 result 字段进行解码 */
decode(((Response) message).getResult());
}
/** 执行后续逻辑 */
handler.received(channel, message);
}
private void decode(Object message) {
/**
* Decodeable 接口目前有两个实现类,
* 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
*/
if (message instanceof Decodeable) {
try {
/** 执行解码逻辑 */
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
DecodeHandler 主要是包含了一些解码逻辑。请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时的配置。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。
/**
* ExchangeReceiver
* 交换接收器
*/
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
private final ExchangeHandler handler;
public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
private static boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(url.getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
void handlerEvent(Channel channel, Request req) throws RemotingException {
if (req.getData() != null && req.getData().equals(READONLY_EVENT)) {
channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
}
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
/** 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应 */
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
/** 设置 BAD_REQUEST 状态 */
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
/** 获取 data 字段值,也就是 RpcInvocation 对象 */
Object msg = req.getData();
try {
/** 继续向下调用 */
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
/** 设置 OK 状态码 */
res.setStatus(Response.OK);
/** 设置调用结果 */
res.setResult(appResult);
} else {
/** 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常 */
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
/** 将调用结果返回给服务消费端 */
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
/** 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常 */
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
handler.connected(exchangeChannel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.disconnected(exchangeChannel);
} finally {
DefaultFuture.closeChannel(channel);
HeaderExchangeChannel.removeChannel(channel);
}
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
handler.sent(exchangeChannel, message);
} catch (Throwable t) {
exception = t;
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
/** 处理请求对象 */
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
/** 处理事件 */
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
/** 双向通信 */
if (request.isTwoWay()) {
/** 向后调用服务得到调用结果,并将调用结果返回给服务消费端 */
handleRequest(exchangeChannel, request);
} else {
/** 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果 */
handler.received(exchangeChannel, request.getData());
}
}
}
/** 处理响应对象,服务消费方会执行此处逻辑,后面分析 */
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (StringUtils.isNotEmpty(echo)) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
if (exception instanceof ExecutionException) {
ExecutionException e = (ExecutionException) exception;
Object msg = e.getRequest();
if (msg instanceof Request) {
Request req = (Request) msg;
if (req.isTwoWay() && !req.isHeartbeat()) {
Response res = new Response(req.getId(), req.getVersion());
res.setStatus(Response.SERVER_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
return;
}
}
}
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.caught(exchangeChannel, exception);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
@Override
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
}
到这里,我们看到了比较清晰的请求和响应逻辑。对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。否则,获取 data 字段值,也就是 RpcInvocation 对象,继续进行向下调用。
Direct事件派发策略
这里需要注意一下,虽然说Direct派发策略的语意是所有的事件都是在IO线程里执行的,此时我们可以发现Direct时重写了received方法,在received事件在满足一定的要求时也会提交给线程池去执行;
public class DirectChannelHandler extends WrappedChannelHandler {
public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
/**
* 只有received message 会放到业务线程池里去进行处理
* connected、disconnected、caught等都是没有放到业务线程池里进行执行的,都是直接在网络io里处理的
* @param channel
* @param message
* @throws RemotingException
*/
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (executor instanceof ThreadlessExecutor) {
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
handler.received(channel, message);
}
}
}
Message事件派发策略
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
/**
* 请求和响应的消息会在业务线程池里来进行处理
* @param channel
* @param message
* @throws RemotingException
*/
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
Execution事件派发策略
只请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
/** 判断消息是否是请求类型,是请求类型才交给业务线程池来进行处理 */
if (message instanceof Request) {
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening, but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
} else if (executor instanceof ThreadlessExecutor) {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
}
}
}
Connection事件派发策略
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queueWarningLimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
/** 构建线程池,核心线程数和最大线程池都为1,队列默认:无限大 */
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
@Override
public void connected(Channel channel) throws RemotingException {
try {
/** 判断队列中的数据,如果以达到一定数量则打印告警日志 */
checkQueueLength();
connectionExecutor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
try {
/** 判断队列中的数据,如果以达到一定数量则打印告警日志 */
checkQueueLength();
connectionExecutor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor*ex.e**cute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queueWarningLimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));
}
}
}