4package com.jogamp.opencl.util.concurrent;
6import com.jogamp.common.util.InterruptSource;
7import com.jogamp.opencl.CLCommandQueue;
8import com.jogamp.opencl.CLDevice;
9import com.jogamp.opencl.CLResource;
10import com.jogamp.opencl.util.CLMultiContext;
11import java.util.ArrayList;
12import java.util.Collection;
14import java.util.concurrent.Callable;
15import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
17import java.util.concurrent.Future;
18import java.util.concurrent.ThreadFactory;
19import java.util.concurrent.TimeUnit;
30 private List<CLQueueContext> contexts;
31 private ExecutorService excecutor;
33 private boolean released;
36 this.contexts = initContexts(queues, factory);
40 private List<CLQueueContext> initContexts(
final Collection<CLCommandQueue> queues,
final CLQueueContextFactory<C> factory) {
41 final List<CLQueueContext> newContexts =
new ArrayList<CLQueueContext>(queues.size());
47 if(this.contexts !=
null && !this.contexts.isEmpty()) {
48 old = this.contexts.get(index++);
52 newContexts.add(factory.
setup(queue, old));
57 private void initExecutor() {
58 this.excecutor = Executors.newFixedThreadPool(contexts.size(),
new QueueThreadFactory(contexts));
66 final List<CLCommandQueue> queues =
new ArrayList<CLCommandQueue>(devices.size());
67 for (
final CLDevice device : devices) {
68 queues.add(device.createCommandQueue(modes));
70 return create(factory, queues);
82 return excecutor.submit(
new TaskWrapper<C,R>(task, finishAction));
89 public <R> List<Future<R>> submitAll(
final Collection<? extends CLTask<? super C, R>> tasks) {
90 final List<Future<R>> futures =
new ArrayList<Future<R>>(tasks.size());
91 for (
final CLTask<? super C, R> task : tasks) {
92 futures.add(submit(task));
101 public <R> List<Future<R>> invokeAll(
final Collection<? extends CLTask<? super C, R>> tasks)
throws InterruptedException {
102 final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
103 return excecutor.invokeAll(wrapper);
110 public <R> List<Future<R>> invokeAll(
final Collection<? extends CLTask<? super C, R>> tasks,
final long timeout,
final TimeUnit unit)
throws InterruptedException {
111 final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
112 return excecutor.invokeAll(wrapper, timeout, unit);
115 private <R> List<TaskWrapper<C, R>> wrapTasks(
final Collection<? extends CLTask<? super C, R>> tasks) {
116 final List<TaskWrapper<C, R>> wrapper =
new ArrayList<TaskWrapper<C, R>>(tasks.size());
117 for (
final CLTask<? super C, R> task : tasks) {
119 throw new NullPointerException(
"at least one task was null");
121 wrapper.add(
new TaskWrapper<C, R>(task, finishAction));
133 excecutor.shutdown();
136 contexts = initContexts(
getQueues(), factory);
146 context.queue.flush();
155 context.queue.finish();
165 throw new RuntimeException(getClass().getSimpleName()+
" already released");
168 excecutor.shutdown();
170 context.queue.finish().release();
179 final List<CLCommandQueue> queues =
new ArrayList<CLCommandQueue>(contexts.size());
181 queues.add(context.queue);
190 return contexts.size();
207 this.finishAction = action;
212 return getClass().getSimpleName()+
" [queues: "+contexts.size()+
" on finish: "+finishAction+
"]";
215 private static class QueueThreadFactory
implements ThreadFactory {
217 private final List<CLQueueContext> context;
220 private QueueThreadFactory(
final List<CLQueueContext> queues) {
221 this.context = queues;
225 public synchronized Thread newThread(
final Runnable runnable) {
227 final SecurityManager sm = System.getSecurityManager();
228 final ThreadGroup group = (sm !=
null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
230 final CLQueueContext queue = context.get(index);
231 final QueueThread thread =
new QueueThread(group, runnable, queue, index++);
232 thread.setDaemon(
true);
239 private static class QueueThread
extends InterruptSource.Thread {
240 private final CLQueueContext context;
241 public QueueThread(
final ThreadGroup group,
final Runnable runnable,
final CLQueueContext context,
final int index) {
242 super(group, runnable,
"queue-worker-thread-"+index+
"["+context+
"]");
243 this.context = context;
247 private static class TaskWrapper<C
extends CLQueueContext, R> implements Callable<R> {
249 private final CLTask<? super C, R> task;
250 private final FinishAction mode;
252 public TaskWrapper(
final CLTask<? super C, R> task,
final FinishAction mode) {
257 public R call() throws Exception {
258 final CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
261 @SuppressWarnings(
"unchecked")
263 R result = task.execute((C)context);
264 if(mode.equals(FinishAction.FLUSH)) {
265 context.queue.flush();
266 }
else if(mode.equals(FinishAction.FINISH)) {
267 context.queue.finish();
The command queue is used to queue a set of operations for a specific CLDevice.
This object represents an OpenCL device.
Utility for organizing multiple CLContexts.
List< CLDevice > getDevices()
Returns a list containing all devices used in this multi context.
A multithreaded, fixed size pool of OpenCL command queues.
int getSize()
Returns the size of this pool (number of command queues).
void flushQueues()
Calls CLCommandQueue#flush() on all queues.
FinishAction getFinishAction()
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final CLMultiContext mc, final CLCommandQueue.Mode... modes)
void release()
Releases all queues.
boolean isReleased()
Returns true if release() has been called.
void finishQueues()
Calls CLCommandQueue#finish() on all queues.
List< CLCommandQueue > getQueues()
Returns the command queues used in this pool.
CLCommandQueuePool< C > switchContext(final CLQueueContextFactory< C > factory)
Switches the context of all queues - this operation can be expensive.
void setFinishAction(final FinishAction action)
Sets the action which is run after every completed task.
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final Collection< CLDevice > devices, final CLCommandQueue.Mode... modes)
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final Collection< CLCommandQueue > queues)
abstract C setup(CLCommandQueue queue, CLQueueContext old)
Creates a new queue context for the given queue.
Superclass for all per-queue contexts as used in CLCommandQueuePools.
Enumeration for the command-queue settings.
The action executed after a task completes.
DO_NOTHING
Does nothing, the task is responsible to make sure all computations have finished when the task finis...
FLUSH
Flushes the queue on task completion.
Releasable OpenCL resource.
void release()
Releases the OpenCL resource.