Skip to content

Commit 8a0af2d

Browse files
idg10nilsauf
andauthored
TakeUntil with CancellationToken (#2222)
Based on original PR #2182 (Implementing TakeUntil with CancellationToken) by @nilsauf but with the following changes * Revert to block-scoped namespaces * Fix typos, standardized spelling of cancelled * Update release notes * Add new TakeUntil to verified API * Add IQbservable form of TakeUntil * Fix exception documentation on new and one existing TakeUntil overload --------- Co-authored-by: Nils Aufschläger <mail@nilsauf.de>
1 parent bdb1cc6 commit 8a0af2d

File tree

8 files changed

+200
-4
lines changed

8 files changed

+200
-4
lines changed

Rx.NET/Documentation/ReleaseHistory/Rx.v6.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
# Rx Release History v6.0
1+
# Rx Release History v6.0
2+
3+
## 6.1.0
4+
5+
This release adds:
6+
7+
* A `DisposeWith`extension method for `IDisposable` to simplify disposal in conjunction with `CompositeDisposable` (see [#2178](https://github.com/dotnet/reactive/pull/2178) thanks to [Chris Pulman](https://github.com/ChrisPulman)
8+
* A new overload of `TakeUntil` accepting a `CancellationToken` (see [#2181](https://github.com/dotnet/reactive/issues/2181) thanks to [Nils Aufschläger](https://github.com/nilsauf)
9+
210

311
## v6.0.2
412

Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ internal partial interface IQueryLanguage
558558
IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
559559
IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
560560
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
561+
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken);
561562
IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
562563
IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
563564
IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);

Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Collections.Generic;
6-
using System.Configuration;
76
using System.Reactive.Concurrency;
87
using System.Threading;
98
using System.Threading.Tasks;
@@ -865,7 +864,7 @@ public static IObservable<TSource> TakeUntil<TSource, TOther>(this IObservable<T
865864
/// .Subscribe(Console.WriteLine);
866865
/// </code>
867866
/// </example>
868-
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
867+
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
869868
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
870869
{
871870
if (source == null)
@@ -881,6 +880,25 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
881880
return s_impl.TakeUntil(source, stopPredicate);
882881
}
883882

883+
/// <summary>
884+
/// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken"/> is cancelled.
885+
/// Completes immediately if the provided <paramref name="cancellationToken"/> is already cancelled upon subscription.
886+
/// </summary>
887+
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
888+
/// <param name="source">The source sequence to relay elements of.</param>
889+
/// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
890+
/// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken"/> is cancelled.</returns>
891+
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
892+
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken cancellationToken)
893+
{
894+
if (source == null)
895+
{
896+
throw new ArgumentNullException(nameof(source));
897+
}
898+
899+
return s_impl.TakeUntil(source, cancellationToken);
900+
}
901+
884902
#endregion
885903

886904
#region + Window +
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Reactive.Disposables;
6+
using System.Threading;
7+
8+
namespace System.Reactive.Linq.ObservableImpl
9+
{
10+
/// <summary>
11+
/// Relays items to the downstream until the CancellationToken is cancelled.
12+
/// </summary>
13+
/// <typeparam name="TSource">The element type of the sequence</typeparam>
14+
internal sealed class TakeUntilCancellationToken<TSource> :
15+
Producer<TSource, TakeUntilCancellationToken<TSource>._>
16+
{
17+
private readonly IObservable<TSource> _source;
18+
private readonly CancellationToken _token;
19+
20+
public TakeUntilCancellationToken(IObservable<TSource> source, CancellationToken token)
21+
{
22+
_source = source;
23+
_token = token;
24+
}
25+
26+
protected override _ CreateSink(IObserver<TSource> observer) => new(observer);
27+
28+
protected override void Run(_ sink) => sink.Run(this);
29+
30+
internal sealed class _ : IdentitySink<TSource>
31+
{
32+
private SingleAssignmentDisposableValue _cancellationTokenRegistration;
33+
private int _wip;
34+
private Exception? _error;
35+
36+
public _(IObserver<TSource> observer) : base(observer)
37+
{
38+
}
39+
40+
public void Run(TakeUntilCancellationToken<TSource> parent)
41+
{
42+
if (parent._token.IsCancellationRequested)
43+
{
44+
OnCompleted();
45+
return;
46+
}
47+
48+
_cancellationTokenRegistration.Disposable = parent._token.Register(OnCompleted);
49+
Run(parent._source);
50+
}
51+
52+
protected override void Dispose(bool disposing)
53+
{
54+
if (disposing)
55+
{
56+
_cancellationTokenRegistration.Dispose();
57+
}
58+
base.Dispose(disposing);
59+
}
60+
61+
public override void OnNext(TSource value)
62+
{
63+
HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
64+
}
65+
66+
public override void OnError(Exception error)
67+
{
68+
HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
69+
}
70+
71+
public override void OnCompleted()
72+
{
73+
HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
74+
}
75+
}
76+
}
77+
}

Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13761,6 +13761,31 @@ public static IQbservable<IList<TSource>> TakeLastBuffer<TSource>(this IQbservab
1376113761
);
1376213762
}
1376313763

13764+
/// <summary>
13765+
/// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken" /> is cancelled.
13766+
/// Completes immediately if the provided <paramref name="cancellationToken" /> is already cancelled upon subscription.
13767+
/// </summary>
13768+
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
13769+
/// <param name="source">The source sequence to relay elements of.</param>
13770+
/// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
13771+
/// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken" /> is cancelled.</returns>
13772+
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
13773+
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, CancellationToken cancellationToken)
13774+
{
13775+
if (source == null)
13776+
throw new ArgumentNullException(nameof(source));
13777+
13778+
return source.Provider.CreateQuery<TSource>(
13779+
Expression.Call(
13780+
null,
13781+
((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TSource)),
13782+
source.Expression,
13783+
Expression.Constant(cancellationToken, typeof(CancellationToken))
13784+
)
13785+
);
13786+
}
13787+
13788+
1376413789
/// <summary>
1376513790
/// Takes elements for the specified duration until the specified end time.
1376613791
/// </summary>
@@ -13858,7 +13883,7 @@ public static IQbservable<TSource> TakeUntil<TSource, TOther>(this IQbservable<T
1385813883
/// .Subscribe(Console.WriteLine);
1385913884
/// </code>
1386013885
/// </example>
13861-
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
13886+
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
1386213887
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
1386313888
{
1386413889
if (source == null)

Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Linq;
77
using System.Reactive.Concurrency;
88
using System.Reactive.Threading.Tasks;
9+
using System.Threading;
910
using System.Threading.Tasks;
1011

1112
namespace System.Reactive.Linq
@@ -282,6 +283,11 @@ public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> sour
282283
return new TakeUntilPredicate<TSource>(source, stopPredicate);
283284
}
284285

286+
public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
287+
{
288+
return new TakeUntilCancellationToken<TSource>(source, cancellationToken);
289+
}
290+
285291
#endregion
286292

287293
#region + Window +

Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,7 @@ public static System.IObservable<TSource> TakeLast<TSource>(this System.IObserva
14641464
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
14651465
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
14661466
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
1467+
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
14671468
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
14681469
public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
14691470
public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
@@ -2296,6 +2297,7 @@ public static System.Reactive.Linq.IQbservable<TSource> TakeLast<TSource>(this S
22962297
public static System.Reactive.Linq.IQbservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
22972298
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
22982299
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
2300+
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
22992301
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
23002302
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
23012303
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }

Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/TakeUntilTest.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,5 +860,64 @@ public void TakeUntil_Predicate_Crash()
860860

861861
#endregion
862862

863+
#region + CancellationToken +
864+
865+
[TestMethod]
866+
public void TakeUntil_CancellationToken_BasicCancelation()
867+
{
868+
var scheduler = new TestScheduler();
869+
var tokenSource = new CancellationTokenSource();
870+
871+
var source = scheduler.CreateColdObservable(
872+
OnNext(10, 1),
873+
OnNext(20, 2),
874+
OnNext(30, 3),
875+
OnNext(40, 4),
876+
OnNext(50, 5),
877+
OnCompleted<int>(260)
878+
);
879+
880+
scheduler.ScheduleAbsolute(235, () => tokenSource.Cancel());
881+
882+
var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));
883+
884+
result.Messages.AssertEqual(
885+
OnNext(210, 1),
886+
OnNext(220, 2),
887+
OnNext(230, 3),
888+
OnCompleted<int>(235)
889+
);
890+
891+
source.Subscriptions.AssertEqual(
892+
Subscribe(200, 235)
893+
);
894+
}
895+
896+
[TestMethod]
897+
public void TakeUntil_CancellationToken_AlreadyCanceled()
898+
{
899+
var scheduler = new TestScheduler();
900+
var tokenSource = new CancellationTokenSource();
901+
tokenSource.Cancel();
902+
903+
var source = scheduler.CreateColdObservable(
904+
OnNext(10, 1),
905+
OnNext(20, 2),
906+
OnNext(30, 3),
907+
OnNext(40, 4),
908+
OnNext(50, 5),
909+
OnCompleted<int>(260)
910+
);
911+
912+
var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));
913+
914+
result.Messages.AssertEqual(
915+
OnCompleted<int>(200)
916+
);
917+
918+
Assert.Empty(source.Subscriptions);
919+
}
920+
921+
#endregion
863922
}
864923
}

0 commit comments

Comments
 (0)