Skip to content

Commit 0a85f9b

Browse files
fix: Sinks.Many would continue to accept emissions after cancellation
Signed-off-by: George Banasios <banasiosgeorge@gmail.com>
1 parent 141eef6 commit 0a85f9b

File tree

3 files changed

+167
-31
lines changed

3 files changed

+167
-31
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2025 VMware Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.publisher;
18+
19+
import org.openjdk.jcstress.annotations.*;
20+
import org.openjdk.jcstress.infra.results.II_Result;
21+
22+
import java.util.Queue;
23+
24+
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
25+
26+
public class SinkManyEmitterProcessorStressTest {
27+
28+
@JCStressTest
29+
@Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Emission returned OK, but the concurrent drain cleaned the queue before the arbiter ran.")
30+
@Outcome(id = "3, 0", expect = ACCEPTABLE, desc = "Emission was correctly cancelled due to a race or because it happened post-cancellation.")
31+
@State
32+
public static class EmitNextAndAutoCancelRaceStressTest {
33+
34+
private final SinkManyEmitterProcessor<Integer> sink = new SinkManyEmitterProcessor<>(true, 16);
35+
36+
@Actor
37+
public void emitActor(II_Result r) {
38+
r.r1 = sink.tryEmitNext(1).ordinal();
39+
}
40+
41+
@Actor
42+
public void cancelActor() {
43+
sink.asFlux().subscribe().dispose();
44+
}
45+
46+
@Arbiter
47+
public void arbiter(II_Result r) {
48+
Queue<Integer> q = sink.queue;
49+
r.r2 = (q == null) ? 0 : q.size();
50+
}
51+
}
52+
}

reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,7 +57,7 @@
5757
* @author Stephane Maldini
5858
*/
5959
final class SinkManyEmitterProcessor<T> extends Flux<T> implements InternalManySink<T>,
60-
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder {
60+
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder {
6161

6262
@SuppressWarnings("rawtypes")
6363
static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0];
@@ -201,6 +201,9 @@ public void onComplete() {
201201

202202
@Override
203203
public EmitResult tryEmitComplete() {
204+
if (isCancelled()) {
205+
return EmitResult.FAIL_CANCELLED;
206+
}
204207
if (done) {
205208
return EmitResult.FAIL_TERMINATED;
206209
}
@@ -217,6 +220,9 @@ public void onError(Throwable throwable) {
217220
@Override
218221
public EmitResult tryEmitError(Throwable t) {
219222
Objects.requireNonNull(t, "tryEmitError must be invoked with a non-null Throwable");
223+
if (isCancelled()) {
224+
return EmitResult.FAIL_CANCELLED;
225+
}
220226
if (done) {
221227
return EmitResult.FAIL_TERMINATED;
222228
}
@@ -241,6 +247,9 @@ public void onNext(T t) {
241247

242248
@Override
243249
public EmitResult tryEmitNext(T t) {
250+
if (isCancelled()) {
251+
return EmitResult.FAIL_CANCELLED;
252+
}
244253
if (done) {
245254
return Sinks.EmitResult.FAIL_TERMINATED;
246255
}
@@ -271,6 +280,23 @@ public EmitResult tryEmitNext(T t) {
271280
return subscribers == EMPTY ? EmitResult.FAIL_ZERO_SUBSCRIBER : EmitResult.FAIL_OVERFLOW;
272281
}
273282
drain();
283+
284+
// This final check is critical for handling a race between this emit operation
285+
// and a concurrent cancellation from another thread.
286+
//
287+
// The race condition scenario:
288+
// 1. This thread passes the initial isCancelled() check at the top of the method.
289+
// 2. This thread successfully offers an item to the queue.
290+
// 3. Concurrently, another thread disposes the last subscriber, which cancels the sink
291+
// and triggers a drain that cleans up the just-offered item.
292+
//
293+
// Without this check, we would return EmitResult.OK, but the item has already been
294+
// discarded. This check ensures we accurately report FAIL_CANCELLED, reflecting
295+
// the final state of the operation.
296+
if (isCancelled()) {
297+
return EmitResult.FAIL_CANCELLED;
298+
}
299+
274300
return EmitResult.OK;
275301
}
276302

@@ -382,7 +408,7 @@ public Object scanUnsafe(Attr key) {
382408
return null;
383409
}
384410

385-
final void drain() {
411+
void drain() {
386412
if (WIP.getAndIncrement(this) != 0) {
387413
return;
388414
}
@@ -397,11 +423,9 @@ final void drain() {
397423

398424
boolean empty = q == null || q.isEmpty();
399425

400-
if (checkTerminated(d, empty)) {
401-
return;
402-
}
426+
checkTerminated(d, empty);
403427

404-
FluxPublish.PubSubInner<T>[] a = subscribers;
428+
FluxPublish.PubSubInner<T>[] a = subscribers;
405429

406430
if (a != EMPTY && !empty) {
407431
long maxRequested = Long.MAX_VALUE;
@@ -431,10 +455,8 @@ final void drain() {
431455
d = true;
432456
v = null;
433457
}
434-
if (checkTerminated(d, v == null)) {
435-
return;
436-
}
437-
if (sourceMode != Fuseable.SYNC) {
458+
checkTerminated(d, v == null);
459+
if (sourceMode != Fuseable.SYNC) {
438460
s.request(1);
439461
}
440462
continue;
@@ -458,11 +480,9 @@ final void drain() {
458480

459481
empty = v == null;
460482

461-
if (checkTerminated(d, empty)) {
462-
return;
463-
}
483+
checkTerminated(d, empty);
464484

465-
if (empty) {
485+
if (empty) {
466486
//async mode only needs to break but SYNC mode needs to perform terminal cleanup here...
467487
if (sourceMode == Fuseable.SYNC) {
468488
//the q is empty
@@ -494,10 +514,8 @@ final void drain() {
494514
}
495515
else if ( sourceMode == Fuseable.SYNC ) {
496516
done = true;
497-
if (checkTerminated(true, empty)) { //empty can be true if no subscriber
498-
break;
499-
}
500-
}
517+
checkTerminated(true, empty);//empty can be true if no subscriber
518+
}
501519

502520
missed = WIP.addAndGet(this, -missed);
503521
if (missed == 0) {
@@ -544,7 +562,7 @@ else if (empty) {
544562
return false;
545563
}
546564

547-
final boolean add(EmitterInner<T> inner) {
565+
boolean add(EmitterInner<T> inner) {
548566
for (; ; ) {
549567
FluxPublish.PubSubInner<T>[] a = subscribers;
550568
if (a == TERMINATED) {
@@ -560,7 +578,7 @@ final boolean add(EmitterInner<T> inner) {
560578
}
561579
}
562580

563-
final void remove(FluxPublish.PubSubInner<T> inner) {
581+
void remove(FluxPublish.PubSubInner<T> inner) {
564582
for (; ; ) {
565583
FluxPublish.PubSubInner<T>[] a = subscribers;
566584
if (a == TERMINATED || a == EMPTY) {
@@ -591,14 +609,11 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
591609
if (SUBSCRIBERS.compareAndSet(this, a, b)) {
592610
//contrary to FluxPublish, there is a possibility of auto-cancel, which
593611
//happens when the removed inner makes the subscribers array EMPTY
594-
if (autoCancel && b == EMPTY && Operators.terminate(S, this)) {
595-
if (WIP.getAndIncrement(this) != 0) {
596-
return;
597-
}
598-
terminate();
599-
Queue<T> q = queue;
600-
if (q != null) {
601-
q.clear();
612+
if (autoCancel && b == EMPTY && !isCancelled()) {
613+
if (Operators.terminate(S, this)) {
614+
// The state is now CANCELLED.
615+
// Trigger a drain so the serialized drain-loop can perform the cleanup
616+
drain();
602617
}
603618
}
604619
return;
@@ -653,5 +668,4 @@ public void dispose() {
653668
}
654669
}
655670

656-
657671
}

reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,8 @@
5050
import reactor.util.context.Context;
5151

5252
import static org.assertj.core.api.Assertions.*;
53+
import static org.junit.jupiter.api.Assertions.assertEquals;
54+
import static org.junit.jupiter.api.Assertions.assertTrue;
5355
import static reactor.core.Scannable.Attr;
5456
import static reactor.core.Scannable.Attr.*;
5557
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
@@ -922,4 +924,72 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() {
922924
.expectTimeout(Duration.ofSeconds(1))
923925
.verify();
924926
}
927+
928+
@Test
929+
void testThatCancelledSinkShouldNotAcceptsEmissions() {
930+
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
931+
Disposable subscription1 = sink.asFlux().subscribe(s -> System.out.println("1: " + s));
932+
assertEquals(1, sink.currentSubscriberCount());
933+
sink.tryEmitNext("Test1");
934+
subscription1.dispose();
935+
assertEquals(0, sink.currentSubscriberCount());
936+
Disposable subscription2 = sink.asFlux().subscribe(s -> System.out.println("2: " + s));
937+
assertTrue(subscription2.isDisposed());
938+
assertEquals(0, sink.currentSubscriberCount());
939+
assertTrue(sink.tryEmitNext("Test2").isFailure(), "Emissions on a cancelled sink should fail");
940+
}
941+
942+
@Test
943+
void testQueueShouldBeEmptyAfterCancellation() {
944+
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
945+
processor.tryEmitNext(1);
946+
assertThat(processor.queue.size()).isEqualTo(1);
947+
processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();
948+
assertThat(processor.queue.size()).isEqualTo(0);
949+
processor.tryEmitNext(2);
950+
assertThat(processor.queue.size()).isEqualTo(0);
951+
}
952+
953+
@Test
954+
void testNoQueueIsCreatedIfNoEmissionOccurredBeforeCancellation() {
955+
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
956+
957+
processor.asFlux().subscribe().dispose();
958+
959+
processor.tryEmitNext(1);
960+
assertThat(processor.queue).isNull();
961+
}
962+
963+
@Test
964+
void testThatOneSubscriberDisposesSinkStaysActive() {
965+
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
966+
967+
sink.asFlux().subscribe(i -> System.out.println("Subscriber A received: " + i));
968+
Disposable subscriberB = sink.asFlux().subscribe(i -> System.out.println("Subscriber B received: " + i));
969+
970+
assertThat(sink.currentSubscriberCount()).isEqualTo(2);
971+
972+
sink.tryEmitNext(1);
973+
subscriberB.dispose();
974+
975+
assertThat(sink.currentSubscriberCount()).isEqualTo(1);
976+
977+
Sinks.EmitResult result = sink.tryEmitNext(2);
978+
assertThat(result).isEqualTo(Sinks.EmitResult.OK);
979+
}
980+
981+
@Test
982+
void testThatLastSubscriberDisposesTriggersAutoCancel() {
983+
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
984+
Disposable disposable = sink.asFlux().subscribe();
985+
986+
assertThat(sink.currentSubscriberCount()).isEqualTo(1);
987+
988+
disposable.dispose();
989+
990+
assertThat(sink.currentSubscriberCount()).isEqualTo(0);
991+
992+
Sinks.EmitResult result = sink.tryEmitNext(1);
993+
assertThat(result).isEqualTo(Sinks.EmitResult.FAIL_CANCELLED);
994+
}
925995
}

0 commit comments

Comments
 (0)