GlueGen v2.6.0-rc-20250712
GlueGen, Native Binding Generator for Java™ (public API).
WorkerThread.java
Go to the documentation of this file.
1/**
2 * Copyright 2023 JogAmp Community. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without modification, are
5 * permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice, this list of
8 * conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
11 * of conditions and the following disclaimer in the documentation and/or other materials
12 * provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
15 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
16 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
20 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
21 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
22 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 *
24 * The views and conclusions contained in the software and documentation are those of the
25 * authors and should not be interpreted as representing official policies, either expressed
26 * or implied, of JogAmp Community.
27 */
28package com.jogamp.common.util;
29
30import java.time.Duration;
31import java.time.Instant;
32import java.time.temporal.ChronoUnit;
33import java.util.concurrent.atomic.AtomicInteger;
34
35/**
36 * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link #getThread() thread}
37 * with an optional minimum execution duration, see {@link #getSleptDuration()}
38 * executing a {@link Callback#run(WorkerThread) task} periodically.
39 * <p>
40 * Optionally a {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task}
41 * can be given for fine grained control.
42 * </p>
43 * <p>
44 * If an exception occurs during execution of the work {@link Callback}, the worker {@link #getThread() thread} is {@link #pause(boolean)}'ed
45 * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
46 * User may {@link #resume()} or {@link #stop()} the thread.
47 * </p>
48 * <p>
49 * If an exception occurs during execution of the optional
50 * {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task},
51 * the worker {@link #getThread() thread} is {@link #stop()}'ed
52 * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
53 * </p>
54 */
55public class WorkerThread {
56 /**
57 * An interruptible {@link #run() task} periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
58 */
59 public interface Callback {
60 /**
61 * Task to be periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
62 * @param self The {@link WorkerThread} manager
63 * @throws InterruptedException
64 */
65 void run(WorkerThread self) throws InterruptedException;
66 }
67
68 /**
69 * An interruptible {@link State} {@link #run() task} on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
70 */
71 public interface StateCallback {
72 /** State change cause. */
73 public static enum State {
74 INIT, PAUSED, RESUMED, END
75 }
76 /**
77 * Task to be executed on {@link State} change on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
78 * @param self The {@link WorkerThread} manager
79 * @param cause the {@link State} change cause
80 * @throws InterruptedException
81 */
82 void run(WorkerThread self, State cause) throws InterruptedException;
83 }
84
85 private static final int RUNNING = 1 << 0;
86 private static final int ACTIVE = 1 << 1;
87 private static final int BLOCKED = 1 << 2;
88 private static final int SHALL_PAUSE = 1 << 3;
89 private static final int SHALL_STOP = 1 << 4;
90 private static final int USE_MINIMUM = 1 << 5;
91 private static final int DAEMON = 1 << 6;
92 private static AtomicInteger instanceId = new AtomicInteger(0);
93
94 private volatile int state;
95 private final static boolean isSet(final int state, final int mask) { return mask == ( state & mask ); }
96 private final boolean isSet(final int mask) { return mask == ( state & mask ); }
97 private final void set(final int mask) { state |= mask; }
98 private final void clear(final int mask) { state &= ~mask; }
99
100 private final Duration minPeriod;
101 private final Duration minDelay;
102 private final Callback cbWork;
103 private final StateCallback cbState;
104 private Thread thread;
105 private volatile Duration sleptDuration = Duration.ZERO;
106 private volatile Exception workErr = null;
107
108 /**
109 * Instantiates a new {@link WorkerThread}.
110 * @param minPeriod minimum work-loop-period to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
111 * @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
112 * @param daemonThread argument for {@link Thread#setDaemon(boolean)}
113 * @param work the actual work {@link Callback} to perform.
114 */
115 public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work) {
116 this(minPeriod, minDelay, daemonThread, work, null);
117 }
118
119 /**
120 * Instantiates a new {@link WorkerThread}.
121 * @param minPeriod minimum work-loop-period to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
122 * @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
123 * @param daemonThread argument for {@link Thread#setDaemon(boolean)}
124 * @param work the actual work {@link Callback} to perform.
125 * @param stateChangeCB optional {@link StateCallback} called at different {@link StateCallback.State} changes while locked
126 */
127 public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB) {
128 this.state = 0;
129 this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO;
130 this.minDelay = null != minDelay ? minDelay : Duration.ZERO;
131 if( this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0 ) {
132 set(USE_MINIMUM);
133 }
134 this.cbWork = work;
135 this.cbState = stateChangeCB;
136 if( daemonThread ) {
137 set(DAEMON);
138 }
139 thread = null;
140 }
141
142 /**
143 * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed.
144 * <p>
145 * Method blocks until the new worker thread has been started and {@link #isRunning()} and {@link #isActive()} if {@code paused == false}.
146 * </p>
147 * @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it.
148 */
149 public final synchronized void start(final boolean paused) {
150 if( isSet(RUNNING) || null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) {
151 // definite start condition: !isRunning
152 // subsequent conditions only for consistency/doc: null == thread && !shallStop && !shallPause
153 return;
154 }
155 if( paused ) {
156 set(SHALL_PAUSE);
157 }
158 thread = new Thread(threadRunnable);
159 thread.setDaemon(isSet(DAEMON));
160 thread.start();
161 try {
162 this.notifyAll(); // wake-up startup-block
163 if( !paused ) {
164 while( !isSet(RUNNING) && !isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
165 this.wait(); // wait until started and active (not-paused)
166 }
167 } else {
168 while( !isSet(RUNNING) && null != thread && !isSet(SHALL_STOP) ) {
169 this.wait(); // wait until started
170 }
171 while( isSet(RUNNING | ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
172 this.wait(); // wait until paused
173 }
174 }
175 } catch (final InterruptedException e) {
176 throw new InterruptedRuntimeException(e);
177 }
178 }
179
180 /**
181 * Stops execution of the {@link #start()}'ed worker thread.
182 * <p>
183 * Method blocks until worker thread has been {@link #isRunning() stopped} if {@code waitUntilDone} is {@code true}.
184 * </p>
185 */
186 public final synchronized void stop(final boolean waitUntilDone) {
187 if( isSet(RUNNING) ) {
188 set(SHALL_STOP);
189 this.notifyAll(); // wake-up pause-block (opt)
190 if( java.lang.Thread.currentThread() != thread ) {
191 if( isSet(BLOCKED | RUNNING) ) {
192 thread.interrupt();
193 }
194 if( waitUntilDone ) {
195 try {
196 while( isSet(RUNNING) ) {
197 this.wait(); // wait until stopped
198 }
199 } catch (final InterruptedException e) {
200 throw new InterruptedRuntimeException(e);
201 }
202 }
203 }
204 }
205 }
206
207 /**
208 * Pauses execution of the {@link #start()}'ed worker thread.
209 * <p>
210 * Method blocks until worker thread has been {@link #isActive()}'ated if {@code waitUntilDone} is {@code true}.
211 * </p>
212 */
213 public final synchronized void pause(final boolean waitUntilDone) {
214 if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) {
215 set(SHALL_PAUSE);
216 if( java.lang.Thread.currentThread() != thread ) {
217 if( isSet(BLOCKED | ACTIVE) ) {
218 thread.interrupt();
219 }
220 if( waitUntilDone ) {
221 try {
222 while( isSet(RUNNING | ACTIVE) ) {
223 this.wait(); // wait until paused
224 }
225 } catch (final InterruptedException e) {
226 throw new InterruptedRuntimeException(e);
227 }
228 }
229 }
230 }
231 }
232
233 /** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */
234 public final synchronized void resume() {
235 if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) {
236 clear(SHALL_PAUSE);
237 this.notifyAll(); // wake-up pause-block
238 if( java.lang.Thread.currentThread() != thread ) {
239 try {
240 while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) {
241 this.wait(); // wait until resumed
242 }
243 } catch (final InterruptedException e) {
244 pause(false);
245 throw new InterruptedRuntimeException(e);
246 }
247 }
248 }
249 }
250
251 /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */
252 public final boolean isRunning() { return isSet(RUNNING); }
253 /** Returns true if the worker thread {@link #isRunning()} and is not {@link #pause(boolean) paused}. */
254 public final boolean isActive() { return isSet(ACTIVE); }
255 /** Returns true if the worker thread {@link #isRunning()} and is {@link #pause(boolean) paused}. */
256 public final boolean isPaused() { return isSet(RUNNING) && !isSet(ACTIVE); }
257 /** Returns true if an exception occured during {@link Callable} work execution. */
258 public final boolean hasError() { return null != workErr; }
259 /** Returns the worker thread if {@link #isRunning()}, otherwise {@code null}. */
260 public final Thread getThread() { return thread; }
261
262 /**
263 * Returns the exception is {@link #hasError()}.
264 * @param clear if true, clear the exception
265 */
266 public final Exception getError(final boolean clear ) { final Exception e = workErr; if( clear) { workErr = null; } return e; }
267
268 /**
269 * Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none.
270 * @see #getSleptDuration()
271 */
272 public final Duration getMinPeriod() { return minPeriod; }
273
274 /**
275 * Returns enforced minimum work-loop-delay or {@link Duration#ZERO} for none.
276 * @see #getSleptDuration()
277 */
278 public final Duration getMinDelay() { return minDelay; }
279
280 /**
281 * Returns the slept {@link Duration} delta of {@link #getMinPeriod()} and consumed {@link Callback#run()} duration,
282 * which minimum is {@link #getMinDelay()}.
283 * <p>
284 * Returns {@link Duration#ZERO zero} for {@link Duration#ZERO zero} {@link #getMinPeriod()} and {@link #getMinDelay()} or exceeding {@link Callback#run()} duration
285 * without {@link #getMinDelay()}.
286 * </p>
287 */
288 public final Duration getSleptDuration() { return sleptDuration; }
289
290 @Override
291 public String toString() {
292 synchronized(this) {
293 final int _state = state;
294 return "Worker[running "+isSet(_state, RUNNING)+", active "+isSet(_state, ACTIVE)+", blocked "+isSet(_state, BLOCKED)+
295 ", shall[pause "+isSet(_state, SHALL_PAUSE)+", stop "+isSet(_state, SHALL_STOP)+
296 "], min[period "+minPeriod.toMillis()+"ms, delay "+minDelay.toMillis()+"], slept "+sleptDuration.toMillis()+
297 "ms, daemon "+isSet(_state, DAEMON)+", thread "+thread+"]";
298 }
299 }
300
301 private final Runnable threadRunnable = new Runnable() {
302 @Override
303 public final void run() {
304 final Thread ct = Thread.currentThread();
305 ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement());
306
307 synchronized ( WorkerThread.this ) {
308 Exception err = null;
309 if( null != cbState ) {
310 try {
311 cbState.run(WorkerThread.this, StateCallback.State.INIT);
312 } catch (final InterruptedException e) {
313 // OK
314 } catch (final Throwable t) {
315 err = new Exception(t.getClass().getSimpleName()+" while processing init-state "+cbState, t);
316 }
317 if( null != err ) {
318 workErr = err;
319 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
320 thread = null;
321 workErr.printStackTrace();
322 WorkerThread.this.notifyAll(); // wake-up ctor()
323 return; // bail out
324 }
325 }
326 set(RUNNING | ACTIVE);
327 WorkerThread.this.notifyAll(); // wake-up ctor()
328 }
329
330 while( !isSet(SHALL_STOP) ) {
331 Exception err = null;
332 try {
333 if( isSet(SHALL_PAUSE) ) {
334 synchronized ( WorkerThread.this ) {
335 if( null != cbState ) {
336 try {
337 cbState.run(WorkerThread.this, StateCallback.State.PAUSED);
338 } catch (final InterruptedException e) {
339 // OK
340 } catch (final Throwable t) {
341 err = new Exception(t.getClass().getSimpleName()+" while processing pause-state "+cbState, t);
342 }
343 if( null != err ) {
344 workErr = err;
345 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
346 thread = null;
347 workErr.printStackTrace();
348 WorkerThread.this.notifyAll(); // wake-up ctor()
349 return; // bail out
350 }
351 }
352 while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) {
353 clear(ACTIVE);
354 WorkerThread.this.notifyAll(); // wake-up doPause()
355 try {
356 WorkerThread.this.wait(); // wait until resumed
357 } catch (final InterruptedException e) {
358 if( !isSet(SHALL_PAUSE) ) {
359 throw new InterruptedRuntimeException(e);
360 }
361 }
362 }
363 if( null != cbState ) {
364 try {
365 cbState.run(WorkerThread.this, StateCallback.State.RESUMED);
366 } catch (final InterruptedException e) {
367 err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing resume-state"+cbState, e);
368 } catch (final Throwable t) {
369 err = new Exception(t.getClass().getSimpleName()+" while processing resume-state "+cbState, t);
370 }
371 if( null != err ) {
372 workErr = err;
373 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
374 thread = null;
375 workErr.printStackTrace();
376 WorkerThread.this.notifyAll(); // wake-up ctor()
377 return; // bail out
378 }
379 }
380 set(ACTIVE);
381 WorkerThread.this.notifyAll(); // wake-up doResume()
382 }
383 }
384 if( !isSet(SHALL_STOP) ) {
385 final Instant t0 = Instant.now();
386 set(BLOCKED);
387 {
388 cbWork.run(WorkerThread.this);
389 }
390 clear(BLOCKED);
391 if( isSet(USE_MINIMUM) ) {
392 final long minDelayMS = minDelay.toMillis();
393 final Instant t1 = Instant.now();
394 final Duration td = Duration.between(t0, t1);
395 if( minPeriod.compareTo(td) > 0 ) {
396 final Duration minPeriodDelta = minPeriod.minus(td);
397 final long minPeriodDeltaMS = minPeriodDelta.toMillis();
398 if( minPeriodDeltaMS > 0 ) {
399 final long minSleepMS = Math.max(minDelayMS, minPeriodDeltaMS);
400 sleptDuration = Duration.of(minSleepMS, ChronoUnit.MILLIS);
401 java.lang.Thread.sleep( minSleepMS );
402 } else if( minDelayMS > 0 ) {
403 sleptDuration = minDelay;
404 java.lang.Thread.sleep( minDelayMS );
405 } else {
406 sleptDuration = Duration.ZERO;
407 }
408 // java.util.concurrent.locks.LockSupport.parkNanos(tdMin.toNanos());
409 } else if( minDelayMS > 0 ) {
410 sleptDuration = minDelay;
411 java.lang.Thread.sleep( minDelayMS );
412 } else {
413 sleptDuration = Duration.ZERO;
414 }
415 }
416 }
417 } catch (final InterruptedException e) {
418 if( !isSet(BLOCKED) ) { // !shallStop && !shallPause
419 err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing work-callback "+cbWork, e);
420 }
421 clear(BLOCKED);
422 sleptDuration = Duration.ZERO;
423 } catch (final Throwable t) {
424 err = new Exception(t.getClass().getSimpleName()+" while processing work-callback "+cbWork, t);
425 sleptDuration = Duration.ZERO;
426 } finally {
427 if( null != err ) {
428 // state transition incl. notification
429 synchronized ( WorkerThread.this ) {
430 workErr = err;
431 err = null;
432 set(SHALL_PAUSE);
433 clear(ACTIVE);
434 WorkerThread.this.notifyAll(); // wake-up potential do*()
435 }
436 }
437 }
438 }
439 synchronized ( WorkerThread.this ) {
440 if( null != cbState ) {
441 try {
442 cbState.run(WorkerThread.this, StateCallback.State.END);
443 } catch (final InterruptedException e) {
444 // OK
445 } catch (final Throwable t) {
446 workErr = new Exception(t.getClass().getSimpleName()+" while processing end-state "+cbState, t);
447 workErr.printStackTrace();
448 }
449 }
450 thread = null;
451 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
452 WorkerThread.this.notifyAll(); // wake-up doStop()
453 }
454 } };
455}
Unchecked exception propagating an InterruptedException where handling of the latter is not desired.
A re-start()'able, pause(boolean)'able and interrupt'able worker thread with an optional minimum exec...
final boolean hasError()
Returns true if an exception occured during Callable work execution.
final synchronized void start(final boolean paused)
Starts execution of a new worker thread if not isRunning, i.e.
final Duration getMinPeriod()
Returns enforced minimum work-loop-period or Duration#ZERO for none.
final Exception getError(final boolean clear)
Returns the exception is hasError().
final boolean isPaused()
Returns true if the worker thread isRunning() and is paused.
final synchronized void pause(final boolean waitUntilDone)
Pauses execution of the start()'ed worker thread.
final synchronized void resume()
Resumes execution of the pause(boolean)'ed worker thread.
WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work)
Instantiates a new WorkerThread.
final synchronized void stop(final boolean waitUntilDone)
Stops execution of the start()'ed worker thread.
final Duration getMinDelay()
Returns enforced minimum work-loop-delay or Duration#ZERO for none.
final Thread getThread()
Returns the worker thread if isRunning(), otherwise null.
final boolean isRunning()
Returns true if the worker thread has started via start() and has not ended, e.g.
final Duration getSleptDuration()
Returns the slept Duration delta of getMinPeriod() and consumed Callback#run() duration,...
WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB)
Instantiates a new WorkerThread.
final boolean isActive()
Returns true if the worker thread isRunning() and is not paused.
An interruptible task periodically executed on the WorkerThread thread.
void run(WorkerThread self)
Task to be periodically executed on the WorkerThread thread.
An interruptible State task on the WorkerThread thread.
void run(WorkerThread self, State cause)
Task to be executed on State change on the WorkerThread thread.