Skip to content

Commit 3707f0a

Browse files
committed
Merge pull request #1 from akarnokd/ProperScheduler
SwtScheduler that honors the Scheduler/Worker contracts
2 parents 77509e1 + b870e28 commit 3707f0a

File tree

1 file changed

+208
-73
lines changed

1 file changed

+208
-73
lines changed

Diff for: src/com/diffplug/common/swt/SwtExec.java

+208-73
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,27 @@
1515
*/
1616
package com.diffplug.common.swt;
1717

18+
import java.util.*;
1819
import java.util.List;
19-
import java.util.Objects;
20-
import java.util.concurrent.AbstractExecutorService;
21-
import java.util.concurrent.Callable;
22-
import java.util.concurrent.CompletionStage;
23-
import java.util.concurrent.Delayed;
24-
import java.util.concurrent.ExecutionException;
25-
import java.util.concurrent.Executor;
26-
import java.util.concurrent.RejectedExecutionException;
27-
import java.util.concurrent.RunnableFuture;
28-
import java.util.concurrent.ScheduledExecutorService;
29-
import java.util.concurrent.ScheduledFuture;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.TimeoutException;
20+
import java.util.concurrent.*;
21+
import java.util.concurrent.atomic.*;
3222
import java.util.function.Supplier;
3323

3424
import org.eclipse.swt.SWT;
35-
import org.eclipse.swt.widgets.Display;
36-
import org.eclipse.swt.widgets.Widget;
25+
import org.eclipse.swt.widgets.*;
3726

27+
import rx.*;
3828
import rx.Observable;
39-
import rx.Scheduler;
40-
import rx.Subscription;
4129
import rx.functions.Action0;
42-
import rx.subscriptions.BooleanSubscription;
43-
import rx.subscriptions.Subscriptions;
30+
import rx.subscriptions.*;
4431

4532
import com.google.common.base.Preconditions;
4633
import com.google.common.primitives.Ints;
4734
import com.google.common.util.concurrent.ListenableFuture;
4835

4936
import com.diffplug.common.base.Box.Nullable;
5037
import com.diffplug.common.base.Unhandled;
51-
import com.diffplug.common.rx.Rx;
52-
import com.diffplug.common.rx.RxSubscriber;
38+
import com.diffplug.common.rx.*;
5339
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
5440

5541
/**
@@ -326,58 +312,7 @@ public Scheduler getRxScheduler() {
326312

327313
private SwtExec(Display display) {
328314
this.display = display;
329-
this.scheduler = new Scheduler() {
330-
@Override
331-
public Worker createWorker() {
332-
return new Worker() {
333-
private BooleanSubscription workerSub = BooleanSubscription.create();
334-
335-
@Override
336-
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
337-
if (isUnsubscribed()) {
338-
return Subscriptions.unsubscribed();
339-
}
340-
if (delayTime <= 0) {
341-
return schedule(action);
342-
} else {
343-
ScheduledFuture<?> future = SwtExec.this.schedule(() -> {
344-
if (!workerSub.isUnsubscribed()) {
345-
action.call();
346-
}
347-
}, delayTime, unit);
348-
Subscription sub = Subscriptions.create(() -> {
349-
future.cancel(true);
350-
});
351-
return sub;
352-
}
353-
}
354-
355-
@Override
356-
public Subscription schedule(Action0 action) {
357-
if (isUnsubscribed()) {
358-
return Subscriptions.unsubscribed();
359-
}
360-
BooleanSubscription sub = BooleanSubscription.create();
361-
execute(() -> {
362-
if (!sub.isUnsubscribed() && !workerSub.isUnsubscribed()) {
363-
action.call();
364-
}
365-
});
366-
return sub;
367-
}
368-
369-
@Override
370-
public void unsubscribe() {
371-
workerSub.unsubscribe();
372-
}
373-
374-
@Override
375-
public boolean isUnsubscribed() {
376-
return workerSub.isUnsubscribed();
377-
}
378-
};
379-
}
380-
};
315+
this.scheduler = new SwtScheduler(this);
381316
this.rxExecutor = Rx.on(this, scheduler);
382317
}
383318

@@ -673,4 +608,204 @@ public boolean isDone() {
673608
return runnableFuture.isDone();
674609
}
675610
}
611+
612+
/** Scheduler that runs tasks on Swt's event dispatch thread. */
613+
static final class SwtScheduler extends Scheduler {
614+
final SwtExec exec;
615+
616+
public SwtScheduler(SwtExec exec) {
617+
this.exec = exec;
618+
}
619+
620+
@Override
621+
public Worker createWorker() {
622+
return new SwtWorker(exec);
623+
}
624+
625+
static final class SwtWorker extends Scheduler.Worker {
626+
final SwtExec exec;
627+
628+
volatile boolean unsubscribed;
629+
630+
/** Set of active tasks, guarded by this. */
631+
Set<SwtScheduledAction> tasks;
632+
633+
public SwtWorker(SwtExec exec) {
634+
this.exec = exec;
635+
this.tasks = new HashSet<>();
636+
}
637+
638+
@Override
639+
public void unsubscribe() {
640+
if (unsubscribed) {
641+
return;
642+
}
643+
unsubscribed = true;
644+
645+
Set<SwtScheduledAction> set;
646+
synchronized (this) {
647+
set = tasks;
648+
tasks = null;
649+
}
650+
651+
if (set != null) {
652+
for (SwtScheduledAction a : set) {
653+
a.cancelFuture();
654+
}
655+
}
656+
}
657+
658+
void remove(SwtScheduledAction a) {
659+
if (unsubscribed) {
660+
return;
661+
}
662+
synchronized (this) {
663+
if (unsubscribed) {
664+
return;
665+
}
666+
667+
tasks.remove(a);
668+
}
669+
}
670+
671+
@Override
672+
public boolean isUnsubscribed() {
673+
return unsubscribed;
674+
}
675+
676+
@Override
677+
public Subscription schedule(Action0 action) {
678+
if (unsubscribed) {
679+
return Subscriptions.unsubscribed();
680+
}
681+
682+
SwtScheduledAction a = new SwtScheduledAction(action, this);
683+
684+
synchronized (this) {
685+
if (unsubscribed) {
686+
return Subscriptions.unsubscribed();
687+
}
688+
689+
tasks.add(a);
690+
}
691+
692+
exec.execute(a);
693+
694+
if (unsubscribed) {
695+
a.cancel();
696+
return Subscriptions.unsubscribed();
697+
}
698+
699+
return a;
700+
}
701+
702+
@Override
703+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
704+
if (unsubscribed) {
705+
return Subscriptions.unsubscribed();
706+
}
707+
708+
SwtScheduledAction a = new SwtScheduledAction(action, this);
709+
710+
synchronized (this) {
711+
if (unsubscribed) {
712+
return Subscriptions.unsubscribed();
713+
}
714+
715+
tasks.add(a);
716+
}
717+
718+
Future<?> f = exec.schedule(a, delayTime, unit);
719+
720+
if (unsubscribed) {
721+
a.cancel();
722+
f.cancel(true);
723+
return Subscriptions.unsubscribed();
724+
}
725+
726+
a.setFuture(f);
727+
728+
return a;
729+
}
730+
731+
/**
732+
* Represents a cancellable asynchronous Runnable that wraps an action
733+
* and manages the associated Worker lifecycle.
734+
*/
735+
static final class SwtScheduledAction implements Runnable, Subscription {
736+
final Action0 action;
737+
738+
final SwtWorker parent;
739+
740+
volatile Future<?> future;
741+
@SuppressWarnings("rawtypes")
742+
static final AtomicReferenceFieldUpdater<SwtScheduledAction, Future> FUTURE = AtomicReferenceFieldUpdater.newUpdater(SwtScheduledAction.class, Future.class, "future");
743+
744+
static final Future<?> CANCELLED = new FutureTask<>(() -> {}, null);
745+
746+
static final Future<?> FINISHED = new FutureTask<>(() -> {}, null);
747+
748+
volatile int state;
749+
static final AtomicIntegerFieldUpdater<SwtScheduledAction> STATE = AtomicIntegerFieldUpdater.newUpdater(SwtScheduledAction.class, "state");
750+
751+
static final int STATE_ACTIVE = 0;
752+
static final int STATE_FINISHED = 1;
753+
static final int STATE_CANCELLED = 2;
754+
755+
public SwtScheduledAction(Action0 action, SwtWorker parent) {
756+
this.action = action;
757+
this.parent = parent;
758+
}
759+
760+
@Override
761+
public void run() {
762+
if (!parent.unsubscribed && state == STATE_ACTIVE) {
763+
try {
764+
action.call();
765+
} finally {
766+
FUTURE.lazySet(this, FINISHED);
767+
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_FINISHED)) {
768+
parent.remove(this);
769+
}
770+
}
771+
}
772+
}
773+
774+
@Override
775+
public boolean isUnsubscribed() {
776+
return state != STATE_ACTIVE;
777+
}
778+
779+
@Override
780+
public void unsubscribe() {
781+
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_CANCELLED)) {
782+
parent.remove(this);
783+
}
784+
cancelFuture();
785+
}
786+
787+
void setFuture(Future<?> f) {
788+
if (FUTURE.compareAndSet(this, null, f)) {
789+
if (future != FINISHED) {
790+
f.cancel(true);
791+
}
792+
}
793+
}
794+
795+
void cancelFuture() {
796+
Future<?> f = future;
797+
if (f != CANCELLED && f != FINISHED) {
798+
f = FUTURE.getAndSet(this, CANCELLED);
799+
if (f != null && f != CANCELLED && f != FINISHED) {
800+
f.cancel(true);
801+
}
802+
}
803+
}
804+
805+
void cancel() {
806+
state = STATE_CANCELLED;
807+
}
808+
}
809+
}
810+
}
676811
}

0 commit comments

Comments
 (0)