tgstation-server 6.12.3
The /tg/station 13 server suite
Loading...
Searching...
No Matches
ShutdownAwareTopicEventReceiver.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Threading;
4using System.Threading.Tasks;
5
6using HotChocolate.Execution;
7using HotChocolate.Subscriptions;
8
9using Microsoft.Extensions.Hosting;
10
12{
15 {
19 readonly IHostApplicationLifetime hostApplicationLifetime;
20
24 readonly HotChocolate.Subscriptions.ITopicEventReceiver hotChocolateReceiver;
25
29 readonly ConcurrentBag<CancellationTokenRegistration> registrations;
30
34 readonly ConcurrentBag<Task> disposeTasks;
35
42 IHostApplicationLifetime hostApplicationLifetime,
43 HotChocolate.Subscriptions.ITopicEventReceiver hotChocolateReceiver)
44 {
45 this.hostApplicationLifetime = hostApplicationLifetime ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
46 this.hotChocolateReceiver = hotChocolateReceiver ?? throw new ArgumentNullException(nameof(hotChocolateReceiver));
47
48 registrations = new ConcurrentBag<CancellationTokenRegistration>();
49 disposeTasks = new ConcurrentBag<Task>();
50 }
51
53 public async ValueTask DisposeAsync()
54 {
55 foreach (var registration in registrations)
56 {
57 registration.Dispose();
58 }
59
60 await Task.WhenAll(disposeTasks);
61 }
62
64 public ValueTask<ISourceStream<TMessage>> SubscribeAsync<TMessage>(string topicName, CancellationToken cancellationToken)
65 => WrapWithApplicationLifetimeCancellation(
66 hotChocolateReceiver.SubscribeAsync<TMessage>(topicName, cancellationToken));
67
69 public ValueTask<ISourceStream<TMessage>> SubscribeAsync<TMessage>(string topicName, int? bufferCapacity, TopicBufferFullMode? bufferFullMode, CancellationToken cancellationToken)
70 => WrapWithApplicationLifetimeCancellation(
71 hotChocolateReceiver.SubscribeAsync<TMessage>(topicName, bufferCapacity, bufferFullMode, cancellationToken));
72
79 async ValueTask<ISourceStream<TMessage>> WrapWithApplicationLifetimeCancellation<TMessage>(ValueTask<ISourceStream<TMessage>> sourceStreamTask)
80 {
81 var sourceStream = await sourceStreamTask;
82 registrations.Add(
83 hostApplicationLifetime.ApplicationStopping.Register(
84 () => disposeTasks.Add(
85 sourceStream.DisposeAsync().AsTask())));
86 return sourceStream;
87 }
88 }
89}
ValueTask< ISourceStream< TMessage > > SubscribeAsync< TMessage >(string topicName, CancellationToken cancellationToken)
ShutdownAwareTopicEventReceiver(IHostApplicationLifetime hostApplicationLifetime, HotChocolate.Subscriptions.ITopicEventReceiver hotChocolateReceiver)
Initializes a new instance of the ShutdownAwareTopicEventReceiver class.
readonly ConcurrentBag< Task > disposeTasks
A ConcurrentBag<T> of ValueTasks returned from initiating IAsyncDisposable.DisposeAsync calls on ISou...
readonly ConcurrentBag< CancellationTokenRegistration > registrations
A ConcurrentBag<T> of CancellationTokenRegistrations that were created for this scope.
readonly IHostApplicationLifetime hostApplicationLifetime
The IHostApplicationLifetime for the ShutdownAwareTopicEventReceiver.
async ValueTask< ISourceStream< TMessage > > WrapWithApplicationLifetimeCancellation< TMessage >(ValueTask< ISourceStream< TMessage > > sourceStreamTask)
Wraps a given sourceStreamTask with hostApplicationLifetime cancellation awareness.
readonly HotChocolate.Subscriptions.ITopicEventReceiver hotChocolateReceiver
The wrapped HotChocolate.Subscriptions.ITopicEventReceiver.
Implementation of HotChocolate.Subscriptions.ITopicEventReceiver that works around the global::System...