javabbs开源论坛系统

功能强大、开源、代码通俗易懂、注释详细、面向二次开发友好!

FreeCMS团队又一力作!打造更好的中国开源jsp java 论坛系统!

javabbsV1.2已发布!



在线演示 立即购买 下载用户手册
客服QQ: 124812878 联系电话: 18339991503 Email: freeteam@foxmail.com
点击查看FreeCMS商业版开源CMS系统 点击查看shop4j开源java商城系统

Executors解析 - Donald_Draper

加入收藏夹】     【打印】     【关闭】 来源: 日期:2017-04-13 04:00:02 点击量: 收藏

ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
ScheduledThreadPoolExecutor解析一(调度任务,任务队列):
http://donald-draper.iteye.com/blog/2367332
ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593
ScheduledThreadPoolExecutor解析三(关闭线程池):
http://donald-draper.iteye.com/blog/2367698
package java.util.concurrent;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedActionException;
import java.security.AccessControlException;
import sun.security.util.SecurityConstants;

/**
 * Factory and utility methods for {@link Executor}, {@link
 * ExecutorService}, {@link ScheduledExecutorService}, {@link
 * ThreadFactory}, and {@link Callable} classes defined in this
 * package. This class supports the following kinds of methods:
 *
Executors提供了工厂方法和有效方法为Executor,ExecutorService,
ScheduledExecutorService,ThreadFactory,Callable。提供一下方法:
 * [list]
 *   <li> Methods that create and return an {@link ExecutorService}
 *        set up with commonly useful configuration settings.
      根据通用的配置参数,创建并返回一个ExecutorService
 *   <li> Methods that create and return a {@link ScheduledExecutorService}
 *        set up with commonly useful configuration settings.
      根据通用的配置参数,创建并返回一个ScheduledExecutorService
 *   <li> Methods that create and return a "wrapped" ExecutorService, that
 *        disables reconfiguration by making implementation-specific methods
 *        inaccessible.
      创建并返回一个包装的ExecutorService,不能重新配置
 *   <li> Methods that create and return a {@link ThreadFactory}
 *        that sets newly created threads to a known state.
      创建并返回一个ThreadFactory,设置创建线程为指定状态
 *   <li> Methods that create and return a {@link Callable}
 *        out of other closure-like forms, so they can be used
 *        in execution methods requiring <tt>Callable</tt>.
      创建并返回一个闭包形式的Callable,以便可以在执行方法中执行需要的形式的Callable。
 * [/list]
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     创建一个可以重用的固定数量的工作线程和无界的共享队列的线程池。在任何时候,
     最多有nThreads个工作线程。如果所有的工作线程在执行任务,新提交的任务
     将会在任务队列中等待,直到有工作线程可利用。如果在线程池关闭之前,如果
     有任务在执行中失败并结束,需要的话,则一个新工作线程将会创建,替代旧的
     工作线程。直到线程池关闭,线程池中的工作线程才退出。
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    //与上一个方法的区别增加了线程池参数ThreadFactory
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
     /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newFixedThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     创建单工作线程与无界队列的执行器(如果工作线程在线程池关闭之前,执行任务的过程,
     工作线程由于失败结束,则一个新工作线程将会创建,替代旧的工作线程)。
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    //FinalizableDelegatedExecutorService,所有操作委托给执行器代理
    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
	//回收时关闭执行器
        protected void finalize() {
            super.shutdown();
        }
    }
    /**
    执行器静态代理,所有方法委托给内部执行器
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
}

//创建单线程执行器,与前一个方法的不同为,添加了线程工厂参数ThreadFactory
 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>(),
                                 threadFactory));
 }

从上面来看Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。
再来看newCachedThreadPool
/**
  * Creates a thread pool that creates new threads as needed, but
  * will reuse previously constructed threads when they are
  * available.  These pools will typically improve the performance
  * of programs that execute many short-lived asynchronous tasks.
  * Calls to <tt>execute</tt> will reuse previously constructed
  * threads if available. If no existing thread is available, a new
  * thread will be created and added to the pool. Threads that have
  * not been used for sixty seconds are terminated and removed from
  * the cache. Thus, a pool that remains idle for long enough will
  * not consume any resources. Note that pools with similar
  * properties but different details (for example, timeout parameters)
  * may be created using {@link ThreadPoolExecutor} constructors.
  *
  创建一个线程池,可以根据需要创建工作线程,如果有空闲工作线程,并且可用,
  则重用工作线程。这个线程池可以显著地改善执行大量执行时间短的异步任务场景的性能。
  当调用execute方法时,将重用空闲的工作线程。如果没有工作线程可利用,则
  将创建新的工作线程,添加到线程池。如果工作线程在60之内,没有执行任务,那个将会
  从工作线程缓存中移除。这样工作线程空闲足够的时间等待任务,并不会消耗太多的资源。
  * @return the newly created thread pool
  */
 public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>());
 }
//带线程工厂ThreadFactory参数的newCachedThreadPool方法
 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>(),
                                   threadFactory);
 }

从newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,
工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法
创建的线程适合需要执行大量执行时间短的异步任务场景。
再来看调度执行器创建:
/**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newScheduledThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     创建一个单线程的执行器可以执行延时任务和间歇性的任务(如果工作线程在线程池关闭之前,执行任务的过程,
     工作线程由于失败结束,则一个新工作线程将会创建,替代旧的工作线程)。
     保证任务按顺序执行,在任何时候不会有两个任务同时执行。

     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        //返回调度执行器代理
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

  
  /**
    调度执行器静态代理,所有的方法通过内部的调度执行器
     * A wrapper class that exposes only the ScheduledExecutorService
     * methods of a ScheduledExecutorService implementation.
     */
    static class DelegatedScheduledExecutorService
            extends DelegatedExecutorService
            implements ScheduledExecutorService {
        private final ScheduledExecutorService e;
        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
        }
        public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
            return e.schedule(command, delay, unit);
        }
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return e.schedule(callable, delay, unit);
        }
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
        }
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    }

 
  //此方与上一个方法的不同为,待线程工厂参数
     public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

从上面来看单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。
根据核心线程池数量和线程工厂,构造调度线程池:
/**
  * Creates a thread pool that can schedule commands to run after a
  * given delay, or to execute periodically.
  * @param corePoolSize the number of threads to keep in the pool,
  * even if they are idle.
  * @return a newly created scheduled thread pool
  * @throws IllegalArgumentException if {@code corePoolSize < 0}
  */
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
     return new ScheduledThreadPoolExecutor(corePoolSize);
 }
  public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
 //不可重新配置的线程池执行器
 public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedExecutorService(executor);
    }
//不可重新配置的调度线程池执行器
  public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedScheduledExecutorService(executor);
    }  

不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
默认的线程工厂:这个前面已说,这里仅贴出代码,保证完整性
public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
 /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

其他方法:
//根据Runnable和结果类型result创建一个Callable
 public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
 //根据Runnable创建一个无返回结果的Callable
 public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }
    //RunnableAdapter
  /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

总结:

Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法创建的线程适合需要执行大量执行时间短的异步任务场景。单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。newScheduledThreadPool返回的调度执行器为ScheduledThreadPoolExecutor。不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
附:
下面的是Executors其余部分,将Runnable和Callable形式的线程包装在与当前线程具有相同控制权限和类加载器的环境下执行,
这一部分可以当扩展。
/**
     * Returns a thread factory used to create new threads that
     * have the same permissions as the current thread.
     * This factory creates threads with the same settings as {@link
     * Executors#defaultThreadFactory}, additionally setting the
     * AccessControlContext and contextClassLoader of new threads to
     * be the same as the thread invoking this
     * <tt>privilegedThreadFactory</tt> method.  A new
     * <tt>privilegedThreadFactory</tt> can be created within an
     * {@link AccessController#doPrivileged} action setting the
     * current thread's access control context to create threads with
     * the selected permission settings holding within that action.
     返回一个与当前线程具有相同权限的线程工厂,用于创建线程。线程工厂与
     默认执行器有着相同的配置,不同的是新创建线程的访问控制上下文AccessControlContex和
     上下文类加载器contextClassLoader与调用privilegedThreadFactory的线程相同。
     在AccessController#doPrivileged动作设置当前可以访问控制创建线程的已选择的
     配置权限时,PrivilegedThreadFactory被创建。

     *
     * <p> Note that while tasks running within such threads will have
     * the same access control and class loader settings as the
     * current thread, they need not have the same {@link
     * java.lang.ThreadLocal} or {@link
     * java.lang.InheritableThreadLocal} values. If necessary,
     * particular values of thread locals can be set or reset before
     * any task runs in {@link ThreadPoolExecutor} subclasses using
     * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
     * necessary to initialize worker threads to have the same
     * InheritableThreadLocal settings as some other designated
     * thread, you can create a custom ThreadFactory in which that
     * thread waits for and services requests to create others that
     * will inherit its values.
     *
     当任务线程执行时,将会有和当前线程一样的访问控制权限和类加载器,
     而不需要相同的ThreadLocal和InheritableThreadLocal。如果需要,在任务
     被工作线程执行的ThreadPoolExecutor#beforeExecute的方法中可以设置或重置
      thread locals值。如果需要初始化工作线程与一些特定的线程具有相同的
     InheritableThreadLocal,你可以创建一个定制的线程工厂,当线程等待请求时,
     请求过来,创建一个新的线程处理请求,新的线程将会继承创建它的线程的属性值。
     * @return a thread factory
     * @throws AccessControlException if the current access control
     * context does not have permission to both get and set context
     * class loader.
     */
    public static ThreadFactory privilegedThreadFactory() {
        return new PrivilegedThreadFactory();
    }
 

//PrivilegedThreadFactory
  
  /**
     * Thread factory capturing access control context and class loader
     */
    static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        PrivilegedThreadFactory() {
            super();
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
	        //检查获取和设置上下文类加载器权限
                // Calls to getContextClassLoader from this class
                // never trigger a security check, but we check
                // whether our callers have this permission anyways.
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

                // Fail fast
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
	    //获取当前线程的访问控制上下文,和上下文类加载器
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
		    //设置当前线程的访问控制权限
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        public Void run() {
			    //设置创建线程的上下文类加载器
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }

再来看PrivilegedAction的定义:
/**
 * A computation to be performed with privileges enabled.  The computation is
 * performed by invoking <code>AccessController.doPrivileged</code> on the
 * <code>PrivilegedAction</code> object.  This interface is used only for
 * computations that do not throw checked exceptions; computations that
 * throw checked exceptions must use <code>PrivilegedExceptionAction</code>
 * instead.
 *
 一个计算操作可以,以赋予的权限执行。这个计算操作在AccessController.doPrivileged方法
 的PrivilegedAction中执行。这个接口不会抛出检查异常,如果操作需要抛出检查异常,
 则用PrivilegedExceptionAction。
 * @see AccessController
 * @see AccessController#doPrivileged(PrivilegedAction)
 * @see PrivilegedExceptionAction
 */

public interface PrivilegedAction<T> {
    /**
     * Performs the computation.  This method will be called by
     * <code>AccessController.doPrivileged</code> after enabling privileges.
     *
     此方放将会在AccessController.doPrivileged开启权限后执行。
     * @return a class-dependent value that may represent the results of the
     *         computation. Each class that implements
     *         <code>PrivilegedAction</code>
     *         should document what (if anything) this value represents.
     * @see AccessController#doPrivileged(PrivilegedAction)
     * @see AccessController#doPrivileged(PrivilegedAction,
     *                                     AccessControlContext)
     */
    T run();
}

//PrivilegedExceptionAction,与PrivilegedAction基本相同,不同的是在赋予权限后,可能会
抛出检查异常
package java.security;
/**
 * A computation to be performed with privileges enabled, that throws one or
 * more checked exceptions.  The computation is performed by invoking
 * <code>AccessController.doPrivileged</code> on the
 * <code>PrivilegedExceptionAction</code> object.  This interface is
 * used only for computations that throw checked exceptions;
 * computations that do not throw
 * checked exceptions should use <code>PrivilegedAction</code> instead.
 *
 * @see AccessController
 * @see AccessController#doPrivileged(PrivilegedExceptionAction)
 * @see AccessController#doPrivileged(PrivilegedExceptionAction,
 *                                              AccessControlContext)
 * @see PrivilegedAction
 */
public interface PrivilegedExceptionAction<T> {
    /**
     * Performs the computation.  This method will be called by
     * <code>AccessController.doPrivileged</code> after enabling privileges.
     *
     * @return a class-dependent value that may represent the results of the
     *         computation.  Each class that implements
     *         <code>PrivilegedExceptionAction</code> should document what
     *         (if anything) this value represents.
     * @throws Exception an exceptional condition has occurred.  Each class
     *         that implements <code>PrivilegedExceptionAction</code> should
     *         document the exceptions that its run method can throw.
     * @see AccessController#doPrivileged(PrivilegedExceptionAction)
     * @see AccessController#doPrivileged(PrivilegedExceptionAction,AccessControlContext)
     */

    T run() throws Exception;
}

从上面可以看出PrivilegedThreadFactory,权限线程工厂创建的线程与创建权限线程工厂的线程具有相同的上下文访问控制权限和上下文类加载器。权限线程工厂创建的线程线程时首先检查当前线程是否有获取和设置上下文类加载器权限,有则,将权限赋予创建的线程,并在权限动作PrivilegedAction中,设置任务线程的上下类加载器与当前线程相同,并执行任务。如果想要创建线程具有与当前线程具有相同的ThreadLocal和InheritableThreadLocal变量的,则可以重写ThreadPoolExecutor#beforeExecute方法,在beforeExecute中设置或重置ThreadLocal和InheritableThreadLocal变量。

简单看一下ThreadLocal和InheritableThreadLocal的javaDoc,具体代码以后有时间,我们在深入;
//ThreadLocal
package java.lang;
import java.lang.ref.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * This class provides thread-local variables.  These variables differ from
 * their normal counterparts in that each thread that accesses one (via its
 * <tt>get</tt> or <tt>set</tt> method) has its own, independently initialized
 * copy of the variable.  <tt>ThreadLocal</tt> instances are typically private
 * static fields in classes that wish to associate state with a thread (e.g.,
 * a user ID or Transaction ID).
 *
 ThreadLocal提供线程本地变量。这些变量不同于线程一般用get和set方法获取的变量,
 它是一个独立的变量初始化拷贝。ThreadLocal实例是典型的私有静态访问fields,比如
 我们希望每个线程关联一个状态,如用户或事物的ID
 * <p>For example, the class below generates unique identifiers local to each
 * thread.
 * A thread's id is assigned the first time it invokes <tt>ThreadId.get()</tt>
 * and remains unchanged on subsequent calls.
 举个例子,ThreadId保证了每个线程用于一个本地唯一的标识。如果一个线程的id,在以第一次调用
ThreadId.get()方法时指定,接下来将不能改变
 * <pre>
 * import java.util.concurrent.atomic.AtomicInteger;
 *
 * public class ThreadId {
 *     // Atomic integer containing the next thread ID to be assigned
 *     private static final AtomicInteger nextId = new AtomicInteger(0);
 *
 *     // Thread local variable containing each thread's ID
 *     private static final ThreadLocal<Integer> threadId =
 *         new ThreadLocal<Integer>() {
 *         @Override 
           protected Integer initialValue() {
 *                 return nextId.getAndIncrement();
 *         }
 *     };
 *
 *     // Returns the current thread's unique ID, assigning it if necessary
 *     public static int get() {
 *         return threadId.get();
 *     }
 * }
 * </pre>
 * <p>Each thread holds an implicit reference to its copy of a thread-local
 * variable as long as the thread is alive and the <tt>ThreadLocal</tt>
 * instance is accessible; after a thread goes away, all of its copies of
 * thread-local instances are subject to garbage collection (unless other
 * references to these copies exist).
 *
 只要线程存活,每个线程将拥有一个隐式的线程本地变量ThreadLocal的副本,在一个线程结束
 之后所有的线程本地变量将会被垃圾回收器回收,除非有其他副本引用。
 * @author  Josh Bloch and Doug Lea
 * @since   1.2
 */
public class ThreadLocal<T> {
    //获取线程本地变量的拷贝
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null)
                return (T)e.value;
        }
        return setInitialValue();
    }
    //设置初始化值
     private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
     protected T initialValue() {
         //待子类扩展
        return null;
    }
    这个我们这里不多讲,以后我们会单写一篇文章。
}





//InheritableThreadLocal
package java.lang;
import java.lang.ref.*;

/**
 * This class extends <tt>ThreadLocal</tt> to provide inheritance of values
 * from parent thread to child thread: when a child thread is created, the
 * child receives initial values for all inheritable thread-local variables
 * for which the parent has values.  Normally the child's values will be
 * identical to the parent's; however, the child's value can be made an
 * arbitrary function of the parent's by overriding the <tt>childValue</tt>
 * method in this class.
 *
InheritableThreadLocal继承了ThreadLocal,提供了子线程继承父线程本地变量的实现;
当子线程被创建,子线程将会拥有父线程所有可继承的线程本地变量。一般情况子线程的本地
变量值与父线程相同;如果子线程重写的父线程的childValue,将不能保证。
 * <p>Inheritable thread-local variables are used in preference to
 * ordinary thread-local variables when the per-thread-attribute being
 * maintained in the variable (e.g., User ID, Transaction ID) must be
 * automatically transmitted to any child threads that are created.
 当每个线程的属性保存在一个变量中如用户ID和事务ID,如果是可继承的线程本地变量
 必须自动的传给所有创建的子线程,可继承的线程本地变量优先于一般的线程变量被使用。
 * @author  Josh Bloch and Doug Lea
 * @see     ThreadLocal
 * @since   1.2
 */

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     在子线程创建时,初始化子线程从父线程继承的线程本地变量。这个方法在子线程启动之前,
     父线程调用。
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
    protected T childValue(T parentValue) {
        return parentValue;
    }

    /**
     * Get the map associated with a ThreadLocal.
     *
     获取线程关联ThreadLocal
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal.
     *
     创建一个线程关联ThreadLocal的Map
     * @param t the current thread
     * @param firstValue value for the initial entry of the table.
     * @param map the map to store.
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

再来看执行器Executors的其他方法:
/**
     * Returns a {@link Callable} object that, when
     * called, runs the given privileged action and returns its result.
     返回 一个包装权限动作的Callable,执行时,运行权限动作的run,并返回结果
     * @param action the privileged action to run
     * @return a callable object
     * @throws NullPointerException if action null
     */
    public static Callable<Object> callable(final PrivilegedAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
            public Object call() { return action.run(); }};
    }

    /**
     * Returns a {@link Callable} object that, when
     * called, runs the given privileged exception action and returns
     * its result.
     返回 一个包装权限异常动作的Callable,执行时,运行权限异常动作的run,并返回结果
     * @param action the privileged exception action to run
     * @return a callable object
     * @throws NullPointerException if action null
     */
    public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
            public Object call() throws Exception { return action.run(); }};
    }

 /**
     * Returns a {@link Callable} object that will, when
     * called, execute the given <tt>callable</tt> under the current
     * access control context. This method should normally be
     * invoked within an {@link AccessController#doPrivileged} action
     * to create callables that will, if possible, execute under the
     * selected permission settings holding within that action; or if
     * not possible, throw an associated {@link
     * AccessControlException}.
     返回一个Callable,Callable可以在与当前线程相同访问控制权限上下文的情况,执行。
     Callable方法将会在AccessController#doPrivileged的创建的PrivilegedCallable中,执行,
     如果可能话,将会在已选的权限Action中执行,否则抛出AccessControlException。

     * @param callable the underlying task
     * @return a callable object
     * @throws NullPointerException if callable null
     *
     */
    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallable<T>(callable);
    }
/**
     * A callable that runs under established access control settings
     在已经建立的访问控制权限下,执行Callable,与权限线程工厂基本相同,
     只是权限工厂创建出来的线程除了相同访问控制权限外,还有相同的类加载器。
     */
    static final class PrivilegedCallable<T> implements Callable<T> {
        private final Callable<T> task;
        private final AccessControlContext acc;

        PrivilegedCallable(Callable<T> task) {
            this.task = task;
            this.acc = AccessController.getContext();
        }
        //有了前面的知识,这个应该很容易理解
        public T call() throws Exception {
            try {
                return AccessController.doPrivileged(
                    new PrivilegedExceptionAction<T>() {
                        public T run() throws Exception {
                            return task.call();
                        }
                    }, acc);
            } catch (PrivilegedActionException e) {
                throw e.getException();
            }
        }
    }

  
 /**
     * Returns a {@link Callable} object that will, when
     * called, execute the given <tt>callable</tt> under the current
     * access control context, with the current context class loader
     * as the context class loader. This method should normally be
     * invoked within an {@link AccessController#doPrivileged} action
     * to create callables that will, if possible, execute under the
     * selected permission settings holding within that action; or if
     * not possible, throw an associated {@link
     * AccessControlException}.
     返回一个Callable,Callable可以在与当前线程相同访问控制权限上下文和类加载器的情况,执行。Callable方法将会在AccessController#doPrivileged的创建的PrivilegedCallable中,执行,如果可能话,将会在已选的权限Action中执行,否则抛出AccessControlException。
     * @param callable the underlying task
     *
     * @return a callable object
     * @throws NullPointerException if callable null
     * @throws AccessControlException if the current access control
     * context does not have permission to both set and get context
     * class loader.
     */
    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
    }
     /**
     * A callable that runs under established access control settings and
     * current ClassLoader
     在已经建立的访问控制权限和当前类加载器下,执行Callable,与权限线程工厂基本相同,
     相同访问控制权限外,还有相同的类加载器。
     */
    static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
        private final Callable<T> task;
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
	        //检查获取和设置类加载的权限
                // Calls to getContextClassLoader from this class
                // never trigger a security check, but we check
                // whether our callers have this permission anyways.
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

                // Whether setContextClassLoader turns out to be necessary
                // or not, we fail fast if permission is not available.
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.task = task;
	    //访问控制上下文
            this.acc = AccessController.getContext();
	    //当前线程类加载器
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        public T call() throws Exception {
            try {
                return AccessController.doPrivileged(
                    new PrivilegedExceptionAction<T>() {
                        public T run() throws Exception {
                            Thread t = Thread.currentThread();
                            ClassLoader cl = t.getContextClassLoader();
                            if (ccl == cl) {
			       //只有在与当前线程具有相同控制权限和类加载器的情况,
			       //才执行任务
                                return task.call();
                            } else {
                                t.setContextClassLoader(ccl);
                                try {
                                    return task.call();
                                } finally {
                                    t.setContextClassLoader(cl);
                                }
                            }
                        }
                    }, acc);
            } catch (PrivilegedActionException e) {
                throw e.getException();
            }
        }
    }







上一条

下一条

相关新闻
自定义表单