28package com.jogamp.common.util;
30import java.time.Duration;
31import java.time.Instant;
32import java.time.temporal.ChronoUnit;
33import java.util.concurrent.atomic.AtomicInteger;
85 private static final int RUNNING = 1 << 0;
86 private static final int ACTIVE = 1 << 1;
87 private static final int BLOCKED = 1 << 2;
88 private static final int SHALL_PAUSE = 1 << 3;
89 private static final int SHALL_STOP = 1 << 4;
90 private static final int USE_MINIMUM = 1 << 5;
91 private static final int DAEMON = 1 << 6;
92 private static AtomicInteger instanceId =
new AtomicInteger(0);
94 private volatile int state;
95 private final static boolean isSet(
final int state,
final int mask) {
return mask == ( state & mask ); }
96 private final boolean isSet(
final int mask) {
return mask == ( state & mask ); }
97 private final void set(
final int mask) { state |= mask; }
98 private final void clear(
final int mask) { state &= ~mask; }
100 private final Duration minPeriod;
101 private final Duration minDelay;
102 private final Callback cbWork;
103 private final StateCallback cbState;
104 private Thread thread;
105 private volatile Duration sleptDuration = Duration.ZERO;
106 private volatile Exception workErr =
null;
115 public WorkerThread(
final Duration minPeriod,
final Duration minDelay,
final boolean daemonThread,
final Callback work) {
116 this(minPeriod, minDelay, daemonThread, work,
null);
129 this.minPeriod =
null != minPeriod ? minPeriod : Duration.ZERO;
130 this.minDelay =
null != minDelay ? minDelay : Duration.ZERO;
131 if( this.minPeriod.toMillis() > 0 ||
this.minDelay.toMillis() > 0 ) {
135 this.cbState = stateChangeCB;
149 public final synchronized void start(
final boolean paused) {
150 if( isSet(RUNNING) ||
null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) {
158 thread =
new Thread(threadRunnable);
159 thread.setDaemon(isSet(DAEMON));
164 while( !isSet(RUNNING) && !isSet(ACTIVE) &&
null != thread && !isSet(SHALL_STOP) ) {
168 while( !isSet(RUNNING) &&
null != thread && !isSet(SHALL_STOP) ) {
171 while( isSet(RUNNING | ACTIVE) &&
null != thread && !isSet(SHALL_STOP) ) {
175 }
catch (
final InterruptedException e) {
186 public final synchronized void stop(
final boolean waitUntilDone) {
187 if( isSet(RUNNING) ) {
190 if( java.lang.Thread.currentThread() != thread ) {
191 if( isSet(BLOCKED | RUNNING) ) {
194 if( waitUntilDone ) {
196 while( isSet(RUNNING) ) {
199 }
catch (
final InterruptedException e) {
213 public final synchronized void pause(
final boolean waitUntilDone) {
214 if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) {
216 if( java.lang.Thread.currentThread() != thread ) {
217 if( isSet(BLOCKED | ACTIVE) ) {
220 if( waitUntilDone ) {
222 while( isSet(RUNNING | ACTIVE) ) {
225 }
catch (
final InterruptedException e) {
234 public final synchronized void resume() {
235 if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) {
238 if( java.lang.Thread.currentThread() != thread ) {
240 while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) {
243 }
catch (
final InterruptedException e) {
252 public final boolean isRunning() {
return isSet(RUNNING); }
254 public final boolean isActive() {
return isSet(ACTIVE); }
256 public final boolean isPaused() {
return isSet(RUNNING) && !isSet(ACTIVE); }
258 public final boolean hasError() {
return null != workErr; }
266 public final Exception
getError(
final boolean clear ) {
final Exception e = workErr;
if( clear) { workErr =
null; }
return e; }
293 final int _state = state;
294 return "Worker[running "+isSet(_state, RUNNING)+
", active "+isSet(_state, ACTIVE)+
", blocked "+isSet(_state, BLOCKED)+
295 ", shall[pause "+isSet(_state, SHALL_PAUSE)+
", stop "+isSet(_state, SHALL_STOP)+
296 "], min[period "+minPeriod.toMillis()+
"ms, delay "+minDelay.toMillis()+
"], slept "+sleptDuration.toMillis()+
297 "ms, daemon "+isSet(_state, DAEMON)+
", thread "+thread+
"]";
301 private final Runnable threadRunnable =
new Runnable() {
303 public final void run() {
304 final Thread ct = Thread.currentThread();
305 ct.setName(ct.getName()+
"-Worker_"+instanceId.getAndIncrement());
307 synchronized ( WorkerThread.this ) {
308 Exception err =
null;
309 if(
null != cbState ) {
311 cbState.run(
WorkerThread.this, StateCallback.State.INIT);
312 }
catch (
final InterruptedException e) {
314 }
catch (
final Throwable t) {
315 err =
new Exception(t.getClass().getSimpleName()+
" while processing init-state "+cbState, t);
319 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
321 workErr.printStackTrace();
326 set(RUNNING | ACTIVE);
330 while( !isSet(SHALL_STOP) ) {
331 Exception err =
null;
333 if( isSet(SHALL_PAUSE) ) {
334 synchronized ( WorkerThread.this ) {
335 if(
null != cbState ) {
337 cbState.run(
WorkerThread.this, StateCallback.State.PAUSED);
338 }
catch (
final InterruptedException e) {
340 }
catch (
final Throwable t) {
341 err =
new Exception(t.getClass().getSimpleName()+
" while processing pause-state "+cbState, t);
345 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
347 workErr.printStackTrace();
352 while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) {
357 }
catch (
final InterruptedException e) {
358 if( !isSet(SHALL_PAUSE) ) {
359 throw new InterruptedRuntimeException(e);
363 if(
null != cbState ) {
365 cbState.run(
WorkerThread.this, StateCallback.State.RESUMED);
366 }
catch (
final InterruptedException e) {
367 err =
new InterruptedRuntimeException(e.getClass().getSimpleName()+
" while processing resume-state"+cbState, e);
368 }
catch (
final Throwable t) {
369 err =
new Exception(t.getClass().getSimpleName()+
" while processing resume-state "+cbState, t);
373 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
375 workErr.printStackTrace();
384 if( !isSet(SHALL_STOP) ) {
385 final Instant t0 = Instant.now();
391 if( isSet(USE_MINIMUM) ) {
392 final long minDelayMS = minDelay.toMillis();
393 final Instant t1 = Instant.now();
394 final Duration td = Duration.between(t0, t1);
395 if( minPeriod.compareTo(td) > 0 ) {
396 final Duration minPeriodDelta = minPeriod.minus(td);
397 final long minPeriodDeltaMS = minPeriodDelta.toMillis();
398 if( minPeriodDeltaMS > 0 ) {
399 final long minSleepMS = Math.max(minDelayMS, minPeriodDeltaMS);
400 sleptDuration = Duration.of(minSleepMS, ChronoUnit.MILLIS);
401 java.lang.Thread.sleep( minSleepMS );
402 }
else if( minDelayMS > 0 ) {
403 sleptDuration = minDelay;
404 java.lang.Thread.sleep( minDelayMS );
406 sleptDuration = Duration.ZERO;
409 }
else if( minDelayMS > 0 ) {
410 sleptDuration = minDelay;
411 java.lang.Thread.sleep( minDelayMS );
413 sleptDuration = Duration.ZERO;
417 }
catch (
final InterruptedException e) {
418 if( !isSet(BLOCKED) ) {
419 err =
new InterruptedRuntimeException(e.getClass().getSimpleName()+
" while processing work-callback "+cbWork, e);
422 sleptDuration = Duration.ZERO;
423 }
catch (
final Throwable t) {
424 err =
new Exception(t.getClass().getSimpleName()+
" while processing work-callback "+cbWork, t);
425 sleptDuration = Duration.ZERO;
429 synchronized ( WorkerThread.this ) {
439 synchronized ( WorkerThread.this ) {
440 if(
null != cbState ) {
442 cbState.run(
WorkerThread.this, StateCallback.State.END);
443 }
catch (
final InterruptedException e) {
445 }
catch (
final Throwable t) {
446 workErr =
new Exception(t.getClass().getSimpleName()+
" while processing end-state "+cbState, t);
447 workErr.printStackTrace();
451 clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
Unchecked exception propagating an InterruptedException where handling of the latter is not desired.
A re-start()'able, pause(boolean)'able and interrupt'able worker thread with an optional minimum exec...
final boolean hasError()
Returns true if an exception occured during Callable work execution.
final synchronized void start(final boolean paused)
Starts execution of a new worker thread if not isRunning, i.e.
final Duration getMinPeriod()
Returns enforced minimum work-loop-period or Duration#ZERO for none.
final Exception getError(final boolean clear)
Returns the exception is hasError().
final boolean isPaused()
Returns true if the worker thread isRunning() and is paused.
final synchronized void pause(final boolean waitUntilDone)
Pauses execution of the start()'ed worker thread.
final synchronized void resume()
Resumes execution of the pause(boolean)'ed worker thread.
WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work)
Instantiates a new WorkerThread.
final synchronized void stop(final boolean waitUntilDone)
Stops execution of the start()'ed worker thread.
final Duration getMinDelay()
Returns enforced minimum work-loop-delay or Duration#ZERO for none.
final Thread getThread()
Returns the worker thread if isRunning(), otherwise null.
final boolean isRunning()
Returns true if the worker thread has started via start() and has not ended, e.g.
final Duration getSleptDuration()
Returns the slept Duration delta of getMinPeriod() and consumed Callback#run() duration,...
WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB)
Instantiates a new WorkerThread.
final boolean isActive()
Returns true if the worker thread isRunning() and is not paused.
An interruptible task periodically executed on the WorkerThread thread.
void run(WorkerThread self)
Task to be periodically executed on the WorkerThread thread.
An interruptible State task on the WorkerThread thread.
void run(WorkerThread self, State cause)
Task to be executed on State change on the WorkerThread thread.