001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.thread; 018 019import java.util.concurrent.Executor; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.RejectedExecutionHandler; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.activemq.util.ThreadPoolUtils; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Manages the thread pool for long running tasks. Long running tasks are not 035 * always active but when they are active, they may need a few iterations of 036 * processing for them to become idle. The manager ensures that each task is 037 * processes but that no one task overtakes the system. This is somewhat like 038 * cooperative multitasking. 039 * 040 * @org.apache.xbean.XBean 041 */ 042public class TaskRunnerFactory implements Executor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class); 045 private ExecutorService executor; 046 private int maxIterationsPerRun; 047 private String name; 048 private int priority; 049 private boolean daemon; 050 private final AtomicLong id = new AtomicLong(0); 051 private boolean dedicatedTaskRunner; 052 private long shutdownAwaitTermination = 30000; 053 private final AtomicBoolean initDone = new AtomicBoolean(false); 054 private int maxThreadPoolSize = getDefaultMaximumPoolSize(); 055 private RejectedExecutionHandler rejectedTaskHandler = null; 056 private ClassLoader threadClassLoader; 057 058 public TaskRunnerFactory() { 059 this("ActiveMQ Task"); 060 } 061 062 public TaskRunnerFactory(String name) { 063 this(name, Thread.NORM_PRIORITY, true, 1000); 064 } 065 066 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { 067 this(name, priority, daemon, maxIterationsPerRun, false); 068 } 069 070 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { 071 this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize()); 072 } 073 074 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { 075 this.name = name; 076 this.priority = priority; 077 this.daemon = daemon; 078 this.maxIterationsPerRun = maxIterationsPerRun; 079 this.dedicatedTaskRunner = dedicatedTaskRunner; 080 this.maxThreadPoolSize = maxThreadPoolSize; 081 } 082 083 public void init() { 084 if (initDone.compareAndSet(false, true)) { 085 // If your OS/JVM combination has a good thread model, you may want to 086 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead. 087 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { 088 executor = null; 089 } else if (executor == null) { 090 executor = createDefaultExecutor(); 091 } 092 LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor); 093 } 094 } 095 096 /** 097 * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. 098 * 099 * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) 100 */ 101 public void shutdown() { 102 if (executor != null) { 103 ThreadPoolUtils.shutdown(executor); 104 executor = null; 105 } 106 initDone.set(false); 107 } 108 109 /** 110 * Performs a shutdown now (aggressively) on the thread pool. 111 * 112 * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) 113 */ 114 public void shutdownNow() { 115 if (executor != null) { 116 ThreadPoolUtils.shutdownNow(executor); 117 executor = null; 118 } 119 initDone.set(false); 120 } 121 122 /** 123 * Performs a graceful shutdown. 124 * 125 * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) 126 */ 127 public void shutdownGraceful() { 128 if (executor != null) { 129 ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); 130 executor = null; 131 } 132 initDone.set(false); 133 } 134 135 public TaskRunner createTaskRunner(Task task, String name) { 136 init(); 137 if (executor != null) { 138 return new PooledTaskRunner(executor, task, maxIterationsPerRun); 139 } else { 140 return new DedicatedTaskRunner(task, name, priority, daemon); 141 } 142 } 143 144 @Override 145 public void execute(Runnable runnable) { 146 execute(runnable, name); 147 } 148 149 public void execute(Runnable runnable, String name) { 150 init(); 151 LOG.trace("Execute[{}] runnable: {}", name, runnable); 152 if (executor != null) { 153 executor.execute(runnable); 154 } else { 155 doExecuteNewThread(runnable, name); 156 } 157 } 158 159 private void doExecuteNewThread(Runnable runnable, String name) { 160 String threadName = name + "-" + id.incrementAndGet(); 161 Thread thread = new Thread(runnable, threadName); 162 thread.setDaemon(daemon); 163 164 LOG.trace("Created and running thread[{}]: {}", threadName, thread); 165 thread.start(); 166 } 167 168 protected ExecutorService createDefaultExecutor() { 169 ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 170 @Override 171 public Thread newThread(Runnable runnable) { 172 String threadName = name + "-" + id.incrementAndGet(); 173 Thread thread = new Thread(runnable, threadName); 174 thread.setDaemon(daemon); 175 thread.setPriority(priority); 176 if (threadClassLoader != null) { 177 thread.setContextClassLoader(threadClassLoader); 178 } 179 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 180 @Override 181 public void uncaughtException(final Thread t, final Throwable e) { 182 LOG.error("Error in thread '{}'", t.getName(), e); 183 } 184 }); 185 186 LOG.trace("Created thread[{}]: {}", threadName, thread); 187 return thread; 188 } 189 }); 190 191 if (rejectedTaskHandler != null) { 192 rc.setRejectedExecutionHandler(rejectedTaskHandler); 193 } else { 194 rc.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 195 } 196 197 return rc; 198 } 199 200 public ExecutorService getExecutor() { 201 return executor; 202 } 203 204 public void setExecutor(ExecutorService executor) { 205 this.executor = executor; 206 } 207 208 public int getMaxIterationsPerRun() { 209 return maxIterationsPerRun; 210 } 211 212 public void setMaxIterationsPerRun(int maxIterationsPerRun) { 213 this.maxIterationsPerRun = maxIterationsPerRun; 214 } 215 216 public String getName() { 217 return name; 218 } 219 220 public void setName(String name) { 221 this.name = name; 222 } 223 224 public int getPriority() { 225 return priority; 226 } 227 228 public void setPriority(int priority) { 229 this.priority = priority; 230 } 231 232 public boolean isDaemon() { 233 return daemon; 234 } 235 236 public void setDaemon(boolean daemon) { 237 this.daemon = daemon; 238 } 239 240 public boolean isDedicatedTaskRunner() { 241 return dedicatedTaskRunner; 242 } 243 244 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 245 this.dedicatedTaskRunner = dedicatedTaskRunner; 246 } 247 248 public int getMaxThreadPoolSize() { 249 return maxThreadPoolSize; 250 } 251 252 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 253 this.maxThreadPoolSize = maxThreadPoolSize; 254 } 255 256 public void setThreadClassLoader(ClassLoader threadClassLoader) { 257 this.threadClassLoader = threadClassLoader; 258 } 259 260 public RejectedExecutionHandler getRejectedTaskHandler() { 261 return rejectedTaskHandler; 262 } 263 264 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 265 this.rejectedTaskHandler = rejectedTaskHandler; 266 } 267 268 public long getShutdownAwaitTermination() { 269 return shutdownAwaitTermination; 270 } 271 272 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 273 this.shutdownAwaitTermination = shutdownAwaitTermination; 274 } 275 276 private static int getDefaultCorePoolSize() { 277 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.corePoolSize", 0); 278 } 279 280 private static int getDefaultMaximumPoolSize() { 281 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.maximumPoolSize", Integer.MAX_VALUE); 282 } 283 284 private static int getDefaultKeepAliveTime() { 285 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30); 286 } 287}