in
java.util.concurrent
package - JDK 7
A Framework for Divide
and Conquer
recursively
divides
a task into smaller subtasks until threshold
check indicates subtask size
is small enough to execute
serially. Optimal
threshold is affected by
specific computational steps & obtained
through profiling – heuristic:
between 100 and 10000.
abstracts
multithreading - automatically scale up.
Leverages
work-stealing
- Each worker
thread maintains a queue of tasks. If one worker thread’s queue is
empty, it will take a task from another worker thread.
uses
daemon threads - automatically terminated when all user threads have
terminated -no need to explicitly shut down
Classes
-
ForkJoinTask<T>
- abstract class that defines a task. implements
Serializable.
-
ForkJoinPool – schedules
execution of ForkJoinTask<T> objects
on thread pool
-
RecursiveAction – extends
ForkJoinTask<T> for tasks that do not
return values
-
RecursiveTask<T>
- extends ForkJoinTask<T> for tasks that
return values. Must aggregate
the results, so that when the first invocation finishes, it returns
the overall result.
Using
the framework requires custom classes that extend RecursiveAction
/ RecursiveTask<T> and
@override its
compute() method
ForkJoinTask<T>
Members
-
compute(
) -
implement the
recursive
divide-and-conquer strategy and
its threshold branch – implementer must override.
-
fork()
- submit the invoking task for asynchronous execution. Non-blocking.
Can be called
only from another ForkJoinTask
running within a
ForkJoinPool. use
to execute subtasks.
-
join()
- waits until the task on which it is called terminates and
returns the task result
-
invoke()
- combines fork
and join operations into a single synchronous
call – convenience
method that starts
a task and then waits for it to terminate.
-
invokeAll() -
invoke
multiple tasks
-
cancel( ) - returns
true
if the task is cancelled, returns false
if task can’t be cancelled, has already completed or
was already cancelled.
-
isCancelled(
) -
determine if a task has been cancelled by calling
-
isCompletedNormally( )
/ isCompletedAbnormally( ) -
Determine
a Task’s Completion Status
-
reinitialize( ) -
Reset state
of task after it has completed so it can be rerun
-
inForkJoinPool(
) -
determine if code is executing inside a task
-
adapt() -
convert a Runnable
or Callable
object into a ForkJoinTask
(
the run()
or call( )
method is executed)
-
getQueuedTaskCount( )
- obtain
approx count of number of tasks in queue of invoking thread
-
getSurplusQueuedTaskCount(
) - obtain
approx count of number of tasks the invoking thread has in its queue
that exceed
the number of other threads in the pool that might steal them
-
quietlyJoin()
/ quitlyInvoke() -
don’t return values or throw exceptions.
-
tryUnfork( ) -
unschedule
a task
ForkJoinPool
Members
-
Ctor
- pLevel
argument
specifies
level of parallelism – valid
range: one to
implementation defined limit. By
default uses
the number of cores via Runtime.availableProcessors().
-
getParallelism(
) - get
the level of parallelism
-
execute() -
initiate the asynchronous
execution of
a task
-
invoke()
- initiate
the synchronous execution
of a task
-
shutdown(
) -
force
termination of all threads
-
toString( ) - displays
state of the pool.
-
isQuiescent( ) -
determine
if a pool is currently idle -
returns true
if the pool has no active threads
-
getPoolSize( ) -
obtain number of worker threads currently in the pool
-
getActiveThreadCount(
) -
obtain approx count of active threads in the pool
-
shutdown( ) -
shut down pool gracefully
- currently
active tasks will still be executed, but no new tasks can be
started.
-
shutdownNow(
) -
stop a pool immediately - cancel
currently active tasks.
-
isShutdown(
) -
determine if pool is shut down
-
isTerminated( ) -
determine if pool has been shut down with
all tasks completed
RecursiveAction Example
import
java.util.*;
import
java.util.concurrent.*;
class
MySqrtAction extends RecursiveAction
{
final
int seqThreshold = 1000;
double[]
data;
int
start, end;
MySqrtAction(double[]
vals, int s, int e) {
data
= vals;
start
= s;
end
= e;
}
@override
protected
void compute()
{
//
if #elements < threshold
then process sequentially
//
else invoke new tasks using subdivided data
if
((end – start) < seqThreshold) {
for
(int i=0; i< seqThreshold; i++) {
data[i]
= Math.sqrt(data[i]);
}
}
else {
int
middle = (start + end) / 2;
invokeAll
( new MySqrtAction(data, start, middle),
new
MySqrtAction(data, middle, end));
}
}
}
void
testForkJoinAction () {
double[]
nums = new double[100000];
for
(int i=0; i < nums.length; i++) nums[i] = (double)i;
ForkJoinPool
fjp = new ForkJoinPool();
MySqrtAction
action = new MySqrtAction(nums, 0, nums.length);
fjp.invoke
(action);
}
RecursiveTask<T>
Example
import
java.util.*;
import
java.util.concurrent.*;
class
MySumTask extends
RecursiveTask<Double>
{
final
int seqThreshold = 1000;
double[]
data;
int
start, end;
MySumTask(double[]
vals, int s, int e) {
data
= vals;
start
= s;
end
= e;
}
@override
protected
double compute()
{
double
sum = 0;
//
if #elements < threshold
then process sequentially
//
else invoke new tasks using subdivided data
if
((end – start) < seqThreshold) {
for
(int i=0; i< seqThreshold; i++) {
sum
+= data[i];
}
}
else {
int
middle = (start + end) / 2;
MySumTask
t1 = new MySumTask(data, start; middle),
MySumTask
t2 = new MySumTask(data, middle, end);
//
start each subtask by forking:
t1.fork();
t2.fork();
sum
= t1.join() + t2.join();
}
return
sum;
}
}
void
testForkJoinTask () {
double[]
nums = new double[100000];
for
(int i=0; i < nums.length; i++) nums[i] = (double)i;
ForkJoinPool
fjp = new ForkJoinPool();
MySumTask
task = new MySumTask(nums, 0, nums.length);
double
d = fjp.invoke (task);
}
Tips
-
Avoid an overly low sequential
threshold – leads to task thrashing
-
Use the default level of
parallelism
-
Avoid synchronization
or other blocking
code