JOCL v2.6.0-rc-20250722
JOCL, OpenCL® API Binding for Java™ (public API).
CLCommandQueuePool.java
Go to the documentation of this file.
1/*
2 * Created on Tuesday, May 03 2011
3 */
4package com.jogamp.opencl.util.concurrent;
5
6import com.jogamp.common.util.InterruptSource;
7import com.jogamp.opencl.CLCommandQueue;
8import com.jogamp.opencl.CLDevice;
9import com.jogamp.opencl.CLResource;
10import com.jogamp.opencl.util.CLMultiContext;
11import java.util.ArrayList;
12import java.util.Collection;
13import java.util.List;
14import java.util.concurrent.Callable;
15import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
17import java.util.concurrent.Future;
18import java.util.concurrent.ThreadFactory;
19import java.util.concurrent.TimeUnit;
20
21/**
22 * A multithreaded, fixed size pool of OpenCL command queues.
23 * It serves as a multiplexer distributing tasks over N queues usually run on N devices.
24 * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s
25 * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue.
26 * @author Michael Bien
27 */
28public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource {
29
30 private List<CLQueueContext> contexts;
31 private ExecutorService excecutor;
32 private FinishAction finishAction = FinishAction.DO_NOTHING;
33 private boolean released;
34
35 private CLCommandQueuePool(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) {
36 this.contexts = initContexts(queues, factory);
37 initExecutor();
38 }
39
40 private List<CLQueueContext> initContexts(final Collection<CLCommandQueue> queues, final CLQueueContextFactory<C> factory) {
41 final List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size());
42
43 int index = 0;
44 for (final CLCommandQueue queue : queues) {
45
46 CLQueueContext old = null;
47 if(this.contexts != null && !this.contexts.isEmpty()) {
48 old = this.contexts.get(index++);
49 old.release();
50 }
51
52 newContexts.add(factory.setup(queue, old));
53 }
54 return newContexts;
55 }
56
57 private void initExecutor() {
58 this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
59 }
60
61 public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final CLMultiContext mc, final CLCommandQueue.Mode... modes) {
62 return create(factory, mc.getDevices(), modes);
63 }
64
65 public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLDevice> devices, final CLCommandQueue.Mode... modes) {
66 final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
67 for (final CLDevice device : devices) {
68 queues.add(device.createCommandQueue(modes));
69 }
70 return create(factory, queues);
71 }
72
73 public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) {
74 return new CLCommandQueuePool<C>(factory, queues);
75 }
76
77 /**
78 * Submits this task to the pool for execution returning its {@link Future}.
79 * @see ExecutorService#submit(java.util.concurrent.Callable)
80 */
81 public <R> Future<R> submit(final CLTask<? super C, R> task) {
82 return excecutor.submit(new TaskWrapper<C,R>(task, finishAction));
83 }
84
85 /**
86 * Submits all tasks to the pool for execution and returns their {@link Future}.
87 * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task.
88 */
89 public <R> List<Future<R>> submitAll(final Collection<? extends CLTask<? super C, R>> tasks) {
90 final List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
91 for (final CLTask<? super C, R> task : tasks) {
92 futures.add(submit(task));
93 }
94 return futures;
95 }
96
97 /**
98 * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
99 * @see ExecutorService#invokeAll(java.util.Collection)
100 */
101 public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
102 final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
103 return excecutor.invokeAll(wrapper);
104 }
105
106 /**
107 * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
108 * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
109 */
110 public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
111 final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
112 return excecutor.invokeAll(wrapper, timeout, unit);
113 }
114
115 private <R> List<TaskWrapper<C, R>> wrapTasks(final Collection<? extends CLTask<? super C, R>> tasks) {
116 final List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
117 for (final CLTask<? super C, R> task : tasks) {
118 if(task == null) {
119 throw new NullPointerException("at least one task was null");
120 }
121 wrapper.add(new TaskWrapper<C, R>(task, finishAction));
122 }
123 return wrapper;
124 }
125
126 /**
127 * Switches the context of all queues - this operation can be expensive.
128 * Blocks until all tasks finish and sets up a new context for all queues.
129 * @return this
130 */
132
133 excecutor.shutdown();
134 finishQueues(); // just to be sure
135
136 contexts = initContexts(getQueues(), factory);
137 initExecutor();
138 return this;
139 }
140
141 /**
142 * Calls {@link CLCommandQueue#flush()} on all queues.
143 */
144 public void flushQueues() {
145 for (final CLQueueContext context : contexts) {
146 context.queue.flush();
147 }
148 }
149
150 /**
151 * Calls {@link CLCommandQueue#finish()} on all queues.
152 */
153 public void finishQueues() {
154 for (final CLQueueContext context : contexts) {
155 context.queue.finish();
156 }
157 }
158
159 /**
160 * Releases all queues.
161 */
162 @Override
163 public void release() {
164 if(released) {
165 throw new RuntimeException(getClass().getSimpleName()+" already released");
166 }
167 released = true;
168 excecutor.shutdown();
169 for (final CLQueueContext context : contexts) {
170 context.queue.finish().release();
171 context.release();
172 }
173 }
174
175 /**
176 * Returns the command queues used in this pool.
177 */
178 public List<CLCommandQueue> getQueues() {
179 final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size());
180 for (final CLQueueContext context : contexts) {
181 queues.add(context.queue);
182 }
183 return queues;
184 }
185
186 /**
187 * Returns the size of this pool (number of command queues).
188 */
189 public int getSize() {
190 return contexts.size();
191 }
192
194 return finishAction;
195 }
196
197 @Override
198 public boolean isReleased() {
199 return released;
200 }
201
202 /**
203 * Sets the action which is run after every completed task.
204 * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}.
205 */
206 public void setFinishAction(final FinishAction action) {
207 this.finishAction = action;
208 }
209
210 @Override
211 public String toString() {
212 return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]";
213 }
214
215 private static class QueueThreadFactory implements ThreadFactory {
216
217 private final List<CLQueueContext> context;
218 private int index;
219
220 private QueueThreadFactory(final List<CLQueueContext> queues) {
221 this.context = queues;
222 this.index = 0;
223 }
224
225 public synchronized Thread newThread(final Runnable runnable) {
226
227 final SecurityManager sm = System.getSecurityManager();
228 final ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
229
230 final CLQueueContext queue = context.get(index);
231 final QueueThread thread = new QueueThread(group, runnable, queue, index++);
232 thread.setDaemon(true);
233
234 return thread;
235 }
236
237 }
238
239 private static class QueueThread extends InterruptSource.Thread {
240 private final CLQueueContext context;
241 public QueueThread(final ThreadGroup group, final Runnable runnable, final CLQueueContext context, final int index) {
242 super(group, runnable, "queue-worker-thread-"+index+"["+context+"]");
243 this.context = context;
244 }
245 }
246
247 private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> {
248
249 private final CLTask<? super C, R> task;
250 private final FinishAction mode;
251
252 public TaskWrapper(final CLTask<? super C, R> task, final FinishAction mode) {
253 this.task = task;
254 this.mode = mode;
255 }
256
257 public R call() throws Exception {
258 final CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
259 // we make sure to only wrap tasks on the correct kind of thread, so this
260 // shouldn't fail (trying to genericize QueueThread properly becomes tricky)
261 @SuppressWarnings("unchecked")
262 final
263 R result = task.execute((C)context);
264 if(mode.equals(FinishAction.FLUSH)) {
265 context.queue.flush();
266 }else if(mode.equals(FinishAction.FINISH)) {
267 context.queue.finish();
268 }
269 return result;
270 }
271
272 }
273
274 /**
275 * The action executed after a task completes.
276 */
277 public enum FinishAction {
278
279 /**
280 * Does nothing, the task is responsible to make sure all computations
281 * have finished when the task finishes
282 */
284
285 /**
286 * Flushes the queue on task completion.
287 */
289
290 /**
291 * Finishes the queue on task completion.
292 */
293 FINISH
295
296}
The command queue is used to queue a set of operations for a specific CLDevice.
This object represents an OpenCL device.
Definition: CLDevice.java:53
Utility for organizing multiple CLContexts.
List< CLDevice > getDevices()
Returns a list containing all devices used in this multi context.
A multithreaded, fixed size pool of OpenCL command queues.
int getSize()
Returns the size of this pool (number of command queues).
void flushQueues()
Calls CLCommandQueue#flush() on all queues.
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final CLMultiContext mc, final CLCommandQueue.Mode... modes)
boolean isReleased()
Returns true if release() has been called.
void finishQueues()
Calls CLCommandQueue#finish() on all queues.
List< CLCommandQueue > getQueues()
Returns the command queues used in this pool.
CLCommandQueuePool< C > switchContext(final CLQueueContextFactory< C > factory)
Switches the context of all queues - this operation can be expensive.
void setFinishAction(final FinishAction action)
Sets the action which is run after every completed task.
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final Collection< CLDevice > devices, final CLCommandQueue.Mode... modes)
static< C extends CLQueueContext > CLCommandQueuePool< C > create(final CLQueueContextFactory< C > factory, final Collection< CLCommandQueue > queues)
abstract C setup(CLCommandQueue queue, CLQueueContext old)
Creates a new queue context for the given queue.
Superclass for all per-queue contexts as used in CLCommandQueuePools.
Enumeration for the command-queue settings.
DO_NOTHING
Does nothing, the task is responsible to make sure all computations have finished when the task finis...
Releasable OpenCL resource.
Definition: CLResource.java:35
void release()
Releases the OpenCL resource.