public class Timer1 { private final TaskQueue queue = new TaskQueue();//这是一个最小堆,它存放所有TimerTask。一个数组 //定时任务只会创建一个线程,所以如果存在多个任务,且任务时间过长,超过了两个任务的间隔时间 private final TimerThread thread = new TimerThread(queue);//queue中的任务,执行完从任务队列中移除。 /** * This object causes the timer's task execution thread to exit * gracefully when there are no live references to the Timer object and no * tasks in the timer queue. It is used in preference to a finalizer on * Timer as such a finalizer would be susceptible to a subclass's * finalizer forgetting to call it. */ private final Object threadReaper = new Object() { protected void finalize() throws Throwable { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.notify(); // In case queue is empty. } } }; private final static AtomicInteger nextSerialNumber = new AtomicInteger(0); private static int serialNumber() { return nextSerialNumber.getAndIncrement(); } public Timer1() { this("Timer-" + serialNumber()); } public Timer1(boolean isDaemon) { this("Timer-" + serialNumber(), isDaemon);//是否守护线程 } public Timer1(String name) { thread.setName(name); thread.start(); } public Timer1(String name, boolean isDaemon) { thread.setName(name); thread.setDaemon(isDaemon); thread.start(); } public void schedule(TimerTask1 task, long delay) {//在时间等于或超过time的时候执行且只执行一次task, if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask1 task, Date time) { sched(task, time.getTime(), 0); } public void schedule(TimerTask1 task, long delay, long period) {//在时间等于或超过time的时候首次执行task,之后每隔period毫秒重复执行一次task 。 if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void schedule(TimerTask1 task, Date firstTime, long period) { if (period 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), -period); } public void scheduleAtFixedRate(TimerTask1 task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); } public void scheduleAtFixedRate(TimerTask1 task, Date firstTime, long period) { if (period 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), period); } private void sched(TimerTask1 task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask1.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask1.SCHEDULED; } queue.add(task); //当timer对象调用schedule方法时,都会向队列添加元素,并唤醒TaskQueue队列上的线程, //这时候TimerThread会被唤醒,继续执行mainLoop方法。 if (queue.getMin() == task) queue.notify();//多线程对同一队列出队入队,使用synchronized,queue.notify() } } public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } public int purge() { int result = 0; synchronized(queue) { for (int i = queue.size(); i > 0; i--) { if (queue.get(i).state == TimerTask1.CANCELLED) { queue.quickRemove(i); result++; } } if (result != 0) queue.heapify(); } return result; } } class TimerThread extends Thread { /** * This flag is set to false by the reaper to inform us that there * are no more live references to our Timer object. Once this flag * is true and there are no more tasks in our queue, there is no * work left for us to do, so we terminate gracefully. Note that * this field is protected by queue's monitor! */ boolean newTasksMayBeScheduled = true; private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } private void mainLoop() {//拿出任务队列中的第一个任务,如果执行时间还没有到,则继续等待,否则立即执行。 while (true) {//函数执行的是一个死循环 try { TimerTask1 task; boolean taskFired; synchronized(queue) {//并且加了queue锁,从而保证是线程安全的。 // 队列为空等待 while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); /* queue.getMin()找到任务队列中执行时间最早的元素, 然后判断元素的state,period,nextExecutionTime,SCHEDULED等属性,从而确定任务是否可执行。 主要是判断这几个属性:1,state 属性,如果为取消(即我们调用了timer的cancel方法取消了某一任务), 则会从队列中删除这个元素,然后继续循环;2,period 属性,如果为单次执行,这个值为0,周期执行的话, 为我们传入的intervalTime值,如果为0,则会移出队列,并设置任务状态为已执行,然后下面的 task.run()会执行任务, 如果这个值不为0,则会修正队列,设置这个任务的再一次执行时间,queue.rescheduleMin这个函数来完成的这个操作; 3,taskFired 属性, 如果 executionTime*/ synchronized(task.lock) { if (task.state == TimerTask1.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTimecurrentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask1.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } //会对TaskQueue队列的首元素进行判断,看是否达到执行时间, //如果没有,则进行休眠,休眠时间为队首任务的开始执行时间到当前时间的时间差。 if (!taskFired) queue.wait(executionTime - currentTime); } if (taskFired) task.run(); } catch(InterruptedException e) { } } } } class TaskQueue { /*TaskQueue是一个平衡二叉堆,具有最小 nextExecutionTime 的 TimerTask 在队列中为 queue[1] , 也就是堆中的根节点。第 n 个位置 queue[n] 的子节点分别在 queue[2n] 和 queue[2n+1] 也就是说TimerTask 在堆中的位置其实是通过nextExecutionTime 来决定的。 nextExecutionTime 越小,那么在堆中的位置越靠近根,越有可能先被执行。而nextExecutionTime意思就是下一次执行开始的时间。 */ private TimerTask1[] queue = new TimerTask1[128];//默认128 private int size = 0; int size() { return size; } void add(TimerTask1 task) {//根据执行时间的先后对数组元素进行排序,从而确定最先开始执行的任务, if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } TimerTask1 getMin() { return queue[1]; } TimerTask1 get(int i) { return queue[i]; } void removeMin() { queue[1] = queue[size]; queue[size--] = null; // Drop extra reference to prevent memory leak fixDown(1); } /** * Removes the ith element from queue without regard for maintaining * the heap invariant. Recall that queue is one-based, so * 1 */ void quickRemove(int i) { assert i size; queue[i] = queue[size]; queue[size--] = null; // Drop extra ref to prevent memory leak } /** * Sets the nextExecutionTime associated with the head task to the * specified value, and adjusts priority queue accordingly. */ void rescheduleMin(long newTime) { queue[1].nextExecutionTime = newTime; fixDown(1); } boolean isEmpty() { return size==0; } void clear() { // Null out task references to prevent memory leak for (int i=1; i) queue[i] = null; size = 0; } private void fixUp(int k) {//维护最小堆 while (k > 1) { int j = k >> 1; if (queue[j].nextExecutionTime queue[k].nextExecutionTime) break; TimerTask1 tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } private void fixDown(int k) { int j; while ((j = k << 1) 0) { if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) j++; // j indexes smallest kid if (queue[k].nextExecutionTime queue[j].nextExecutionTime) break; TimerTask1 tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } void heapify() { for (int i = size/2; i >= 1; i--) fixDown(i); } }
Original: https://www.cnblogs.com/yaowen/p/13431627.html
Author: 哈哈呵h
Title: Timer
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/546755/
转载文章受原作者版权保护。转载请注明原作者出处!