/**
* the state of this task, chosen from the constants below.
*/
int state = virgin;
/**
* this task has not yet been scheduled.
*/
static final int virgin = 0;
/**
* this task is scheduled for execution. if it is a non-repeating task,
* it has not yet been executed.
*/
static final int scheduled = 1;
/**
* this non-repeating task has already executed (or is currently
* executing) and has not been cancelled.
*/
static final int executed = 2;
/**
* this task has been cancelled (with a call to timertask.cancel).
*/
static final int cancelled = 3;
timertask 有两个操作方法
cancel() // 取消任务
scheduledexecutiontime() // 获取任务执行时间
cancel() 比较简单,主要对当前任务加锁,然后变更状态为已取消。
public boolean cancel() {
synchronized(lock) {
boolean result = (state == scheduled);
state = cancelled;
return result;
}
}
class taskqueue {
/**
* priority queue represented as a balanced binary heap: the two children
* of queue[n] are queue[2*n] and queue[2*n+1]. the priority queue is
* ordered on the nextexecutiontime field: the timertask with the lowest
* nextexecutiontime is in queue[1] (assuming the queue is nonempty). for
* each node n in the heap, and each descendant of n, d,
* n.nextexecutiontime <= d.nextexecutiontime.
*
* 使用数组来存放任务
*/
private timertask[] queue = new timertask[128];
/**
* the number of tasks in the priority queue. (the tasks are stored in
* queue[1] up to queue[size]).
*
* 用于表示队列中任务的个数,需要注意的是,任务数并不等于数组长度
*/
private int size = 0;
/**
* returns the number of tasks currently on the queue.
*/
int size() {
return size;
}
/**
* adds a new task to the priority queue.
*
* 往队列添加一个任务
*/
void add(timertask task) {
// grow backing store if necessary
// 在任务数超过数组长度,则通过数组拷贝的方式进行动态扩容
if (size + 1 == queue.length)
queue = arrays.copyof(queue, 2*queue.length);
// 将当前任务项放入队列
queue[++size] = task;
// 向上调整,重新形成一个最小堆
fixup(size);
}
/**
* return the "head task" of the priority queue. (the head task is an
* task with the lowest nextexecutiontime.)
*
* 队列的第一个元素就是最先执行的任务
*/
timertask getmin() {
return queue[1];
}
/**
* return the ith task in the priority queue, where i ranges from 1 (the
* head task, which is returned by getmin) to the number of tasks on the
* queue, inclusive.
*
* 获取队列指定下标的元素
*/
timertask get(int i) {
return queue[i];
}
/**
* remove the head task from the priority queue.
*
* 移除堆顶元素,移除之后需要向下调整,使之重新形成最小堆
*/
void removemin() {
queue[1] = queue[size];
queue[size--] = null; // drop extra reference to prevent memory leak
fixdown(1);
}
/**
* removes the ith element from queue without regard for maintaining
* the heap invariant. recall that queue is one-based, so
* 1 <= i <= size.
*
* 快速移除指定位置元素,不会重新调整堆
*/
void quickremove(int i) {
assert i <= size;
queue[i] = queue[size];
queue[size--] = null; // drop extra ref to prevent memory leak
}
/**
* sets the nextexecutiontime associated with the head task to the
* specified value, and adjusts priority queue accordingly.
*
* 重新调度,向下调整使之重新形成最小堆
*/
void reschedulemin(long newtime) {
queue[1].nextexecutiontime = newtime;
fixdown(1);
}
/**
* returns true if the priority queue contains no elements.
*
* 队列是否为空
*/
boolean isempty() {
return size==0;
}
/**
* removes all elements from the priority queue.
*
* 清除队列中的所有元素
*/
void clear() {
// null out task references to prevent memory leak
for (int i=1; i<=size; i++)
queue[i] = null;
size = 0;
}
/**
* establishes the heap invariant (described above) assuming the heap
* satisfies the invariant except possibly for the leaf-node indexed by k
* (which may have a nextexecutiontime less than its parent's).
*
* this method functions by "promoting" queue[k] up the hierarchy
* (by swapping it with its parent) repeatedly until queue[k]'s
* nextexecutiontime is greater than or equal to that of its parent.
*
* 向上调整,使之重新形成最小堆
*/
private void fixup(int k) {
while (k > 1) {
int j = k >> 1;
if (queue[j].nextexecutiontime <= queue[k].nextexecutiontime)
break;
timertask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* establishes the heap invariant (described above) in the subtree
* rooted at k, which is assumed to satisfy the heap invariant except
* possibly for node k itself (which may have a nextexecutiontime greater
* than its children's).
*
* this method functions by "demoting" queue[k] down the hierarchy
* (by swapping it with its smaller child) repeatedly until queue[k]'s
* nextexecutiontime is less than or equal to those of its children.
*
* 向下调整,使之重新形成最小堆
*/
private void fixdown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextexecutiontime > queue[j+1].nextexecutiontime)
j++; // j indexes smallest kid
if (queue[k].nextexecutiontime <= queue[j].nextexecutiontime)
break;
timertask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
*/
void heapify() {
for (int i = size/2; i >= 1; i--)
fixdown(i);
}
}
public void schedule(timertask task, long delay) {
if (delay < 0)
throw new illegalargumentexception("negative delay.");
sched(task, system.currenttimemillis()+delay, 0);
}
public void schedule(timertask task, date time) {
sched(task, time.gettime(), 0);
}
public void schedule(timertask task, long delay, long period) {
if (delay < 0)
throw new illegalargumentexception("negative delay.");
if (period <= 0)
throw new illegalargumentexception("non-positive period.");
sched(task, system.currenttimemillis()+delay, -period);
}
public void schedule(timertask task, date firsttime, long period) {
if (period <= 0)
throw new illegalargumentexception("non-positive period.");
sched(task, firsttime.gettime(), -period);
}
public void scheduleatfixedrate(timertask task, long delay, long period) {
if (delay < 0)
throw new illegalargumentexception("negative delay.");
if (period <= 0)
throw new illegalargumentexception("non-positive period.");
sched(task, system.currenttimemillis()+delay, period);
}
public void scheduleatfixedrate(timertask task, date firsttime,
long period) {
if (period <= 0)
throw new illegalargumentexception("non-positive period.");
sched(task, firsttime.gettime(), period);
}
从上面的代码我们看出下面几点。
这两个方法最终都调用了 sched() 私有方法
schedule() 传入的 period 为负数, scheduleatfixedrate() 传入的 period 为正数
接下来我们看看 sched() 方法。
private void sched(timertask task, long time, long period) {
// 1. `time`不能为负数的校验
if (time < 0)
throw new illegalargumentexception("illegal execution time.");
// constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
// 2. `period`不能超过`long.max_value >> 1`
if (math.abs(period) > (long.max_value >> 1))
period >>= 1;
synchronized(queue) {
// 3. timer被取消时,不能被调度
if (!thread.newtasksmaybescheduled)
throw new illegalstateexception("timer already cancelled.");
// 4. 对任务加锁,然后设置任务的下次执行时间、执行周期和任务状态,保证任务调度和任务取消是线程安全的
synchronized(task.lock) {
if (task.state != timertask.virgin)
throw new illegalstateexception(
"task already scheduled or cancelled");
task.nextexecutiontime = time;
task.period = period;
task.state = timertask.scheduled;
}
// 5. 将任务添加进队列
queue.add(task);
// 6. 队列中如果堆顶元素是当前任务,则唤醒队列,让`timerthread`可以进行任务调度
if (queue.getmin() == task)
queue.notify();
}
}
public void cancel() {
synchronized(queue) {
thread.newtasksmaybescheduled = false;
queue.clear();
queue.notify(); // in case queue was already empty.
}
}
public int purge() {
int result = 0;
synchronized(queue) {
// 1. 遍历所有任务,如果任务为取消状态,则将其从队列中移除,移除数做加一操作
for (int i = queue.size(); i > 0; i--) {
if (queue.get(i).state == timertask.cancelled) {
queue.quickremove(i);
result++;
}
}
// 2. 将队列重新形成最小堆
if (result != 0)
queue.heapify();
}
return result;
}
5. 唤醒队列的方法
通过前面源码的分析,我们看到队列的唤醒存在于下面几处:
timer.cancel()
timer.sched()
timer.threadreaper.finalize()
第一点和第二点其实已经分析过了,下面我们来看看第三点。
private final object threadreaper = new object() {
protected void finalize() throws throwable {
synchronized(queue) {
thread.newtasksmaybescheduled = false;
queue.notify(); // in case queue is empty.
}
}
};