tgstation-server 6.12.3
The /tg/station 13 server suite
Loading...
Searching...
No Matches
ChatManager.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Globalization;
5using System.Linq;
6using System.Threading;
7using System.Threading.Tasks;
8
9using Microsoft.Extensions.Logging;
10
11using Newtonsoft.Json;
12
13using Serilog.Context;
14
23
25{
27#pragma warning disable CA1506 // TODO: Decomplexify
29 {
33 public const string CommonMention = "!tgs";
34
39
44
49
53 readonly ILoggerFactory loggerFactory;
54
58 readonly ILogger<ChatManager> logger;
59
63 readonly Dictionary<string, ICommand> builtinCommands;
64
68 readonly Dictionary<long, IProvider> providers;
69
73 readonly ConcurrentDictionary<long, SemaphoreSlim> changeChannelSemaphores;
74
78 readonly Dictionary<ulong, ChannelMapping> mappedChannels;
79
83 readonly List<IChatTrackingContext> trackingContexts;
84
88 readonly List<Models.ChatBot> activeChatBots;
89
93 readonly CancellationTokenSource handlerCts;
94
98 readonly object synchronizationLock;
99
104
109
114
119
123 TaskCompletionSource connectionsUpdated;
124
129
134
147 IServerControl serverControl,
148 ILoggerFactory loggerFactory,
149 ILogger<ChatManager> logger,
150 IEnumerable<Models.ChatBot> initialChatBots)
151 {
152 this.providerFactory = providerFactory ?? throw new ArgumentNullException(nameof(providerFactory));
153 this.commandFactory = commandFactory ?? throw new ArgumentNullException(nameof(commandFactory));
154 ArgumentNullException.ThrowIfNull(serverControl);
155 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
156 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
157 activeChatBots = initialChatBots?.ToList() ?? throw new ArgumentNullException(nameof(initialChatBots));
158
159 restartRegistration = serverControl.RegisterForRestart(this);
160
161 synchronizationLock = new object();
162
163 builtinCommands = new Dictionary<string, ICommand>(StringComparer.OrdinalIgnoreCase);
164 providers = new Dictionary<long, IProvider>();
165 changeChannelSemaphores = new ConcurrentDictionary<long, SemaphoreSlim>();
166 mappedChannels = new Dictionary<ulong, ChannelMapping>();
167 trackingContexts = new List<IChatTrackingContext>();
168 handlerCts = new CancellationTokenSource();
169 connectionsUpdated = new TaskCompletionSource();
170
171 messageSendTask = Task.CompletedTask;
173 }
174
176 public async ValueTask DisposeAsync()
177 {
178 logger.LogTrace("Disposing...");
179 restartRegistration.Dispose();
180 handlerCts.Dispose();
181 foreach (var providerKvp in providers)
182 await providerKvp.Value.DisposeAsync();
183
184 foreach (var providerKvp in changeChannelSemaphores)
185 providerKvp.Value.Dispose();
186
187 await messageSendTask;
188 }
189
191 public async ValueTask ChangeChannels(long connectionId, IEnumerable<Models.ChatChannel> newChannels, CancellationToken cancellationToken)
192 {
193 ArgumentNullException.ThrowIfNull(newChannels);
194
195 logger.LogTrace("ChangeChannels {connectionId}...", connectionId);
196 var semaphore = changeChannelSemaphores.GetOrAdd(connectionId, _ =>
197 {
198 logger.LogTrace("Creating ChangeChannels semaphore for connection ID {connectionId}...", connectionId);
199 return new SemaphoreSlim(1);
200 });
201 using (await SemaphoreSlimContext.Lock(semaphore, cancellationToken))
202 {
203 var provider = await RemoveProviderChannels(connectionId, false, cancellationToken);
204 if (provider == null)
205 return;
206
207 if (!provider.Connected)
208 {
209 logger.LogDebug("Cannot map channels, provider {providerId} disconnected!", connectionId);
210 return;
211 }
212
213 var results = await provider.MapChannels(newChannels, cancellationToken);
214 try
215 {
216 lock (activeChatBots)
217 {
218 var botToUpdate = activeChatBots.FirstOrDefault(bot => bot.Id == connectionId);
219 if (botToUpdate != null)
220 botToUpdate.Channels = newChannels
221 .Select(apiModel => new Models.ChatChannel
222 {
223 DiscordChannelId = apiModel.DiscordChannelId,
224 IrcChannel = apiModel.IrcChannel,
225 IsAdminChannel = apiModel.IsAdminChannel,
226 IsUpdatesChannel = apiModel.IsUpdatesChannel,
227 IsSystemChannel = apiModel.IsSystemChannel,
228 IsWatchdogChannel = apiModel.IsWatchdogChannel,
229 Tag = apiModel.Tag,
230 })
231 .ToList();
232 }
233
234 var newMappings = results.SelectMany(
235 kvp => kvp.Value.Select(
236 channelRepresentation => new ChannelMapping(channelRepresentation)
237 {
238 IsWatchdogChannel = kvp.Key.IsWatchdogChannel == true,
239 IsUpdatesChannel = kvp.Key.IsUpdatesChannel == true,
240 IsAdminChannel = kvp.Key.IsAdminChannel == true,
241 IsSystemChannel = kvp.Key.IsSystemChannel == true,
242 ProviderChannelId = channelRepresentation.RealId,
243 ProviderId = connectionId,
244 }));
245
246 ulong baseId;
248 {
249 baseId = channelIdCounter;
250 channelIdCounter += (ulong)results.Count;
251 }
252
253 lock (mappedChannels)
254 {
255 lock (providers)
256 if (!providers.TryGetValue(connectionId, out var verify) || verify != provider) // aborted again
257 return;
258 foreach (var newMapping in newMappings)
259 {
260 var newId = baseId++;
261 logger.LogTrace("Mapping channel {connectionName}:{channelFriendlyName} as {newId}", newMapping.Channel.ConnectionName, newMapping.Channel.FriendlyName, newId);
262 mappedChannels.Add(newId, newMapping);
263 newMapping.Channel.RealId = newId;
264 }
265 }
266
267 // we only want to update contexts if everything at startup has connected once already
268 // otherwise we could send an incomplete channel set to the DMAPI, which will then spout all its queued messages into it instead of all relevant chatbots
269 // The watchdog can call this if it needs to after starting up
270 if (initialProviderConnectionsTask!.IsCompleted)
271 await UpdateTrackingContexts(cancellationToken);
272 }
273 finally
274 {
275 provider.InitialMappingComplete();
276 }
277 }
278 }
279
281 public async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
282 {
283 ArgumentNullException.ThrowIfNull(newSettings);
284
285 logger.LogTrace("ChangeSettings...");
286
287 Task disconnectTask;
288 IProvider? provider = null;
289 var newSettingsId = Models.ModelExtensions.Require(newSettings, x => x.Id);
290 var newSettingsEnabled = Models.ModelExtensions.Require(newSettings, x => x.Enabled);
291 lock (providers)
292 {
293 // raw settings changes forces a rebuild of the provider
294 if (providers.ContainsKey(newSettingsId))
295 disconnectTask = DeleteConnection(newSettingsId, cancellationToken);
296 else
297 disconnectTask = Task.CompletedTask;
298 if (newSettingsEnabled)
299 {
300 provider = providerFactory.CreateProvider(newSettings);
301 providers.Add(newSettingsId, provider);
302 }
303 }
304
305 lock (mappedChannels)
306 foreach (var oldMappedChannelId in mappedChannels.Where(x => x.Value.ProviderId == newSettingsId).Select(x => x.Key).ToList())
307 mappedChannels.Remove(oldMappedChannelId);
308
309 await disconnectTask;
310
312 {
313 // same thread shennanigans
314 var oldOne = connectionsUpdated;
315 connectionsUpdated = new TaskCompletionSource();
316 oldOne.SetResult();
317 }
318
319 var reconnectionUpdateTask = provider?.SetReconnectInterval(
320 Models.ModelExtensions.Require(newSettings, x => x.ReconnectionInterval),
321 newSettingsEnabled)
322 ?? Task.CompletedTask;
323 lock (activeChatBots)
324 {
325 var originalChatBot = activeChatBots.FirstOrDefault(bot => bot.Id == newSettings.Id);
326 if (originalChatBot != null)
327 activeChatBots.Remove(originalChatBot);
328
329 activeChatBots.Add(new Models.ChatBot(newSettings.Channels)
330 {
331 Id = newSettings.Id,
332 ConnectionString = newSettings.ConnectionString,
333 Enabled = newSettings.Enabled,
334 Name = newSettings.Name,
335 ReconnectionInterval = newSettings.ReconnectionInterval,
336 Provider = newSettings.Provider,
337 });
338 }
339
340 await reconnectionUpdateTask;
341 }
342
344 public void QueueMessage(MessageContent message, IEnumerable<ulong> channelIds)
345 {
346 ArgumentNullException.ThrowIfNull(message);
347 ArgumentNullException.ThrowIfNull(channelIds);
348
349 QueueMessageInternal(message, () => channelIds, false);
350 }
351
353 public void QueueWatchdogMessage(string message)
354 {
355 ArgumentNullException.ThrowIfNull(message);
356
357 message = String.Format(CultureInfo.InvariantCulture, "WD: {0}", message);
358
359 if (!initialProviderConnectionsTask!.IsCompleted)
360 logger.LogTrace("Waiting for initial provider connections before sending watchdog message...");
361
362 // Reimplementing QueueMessage
365 {
366 Text = message,
367 },
368 () =>
369 {
370 // so it doesn't change while we're using it
371 lock (mappedChannels)
372 return mappedChannels.Where(x => x.Value.IsWatchdogChannel).Select(x => x.Key).ToList();
373 },
374 true);
375 }
376
378 public Func<string?, string, Action<bool>> QueueDeploymentMessage(
379 Models.RevisionInformation revisionInformation,
380 EngineVersion engineVersion,
381 DateTimeOffset? estimatedCompletionTime,
382 string? gitHubOwner,
383 string? gitHubRepo,
384 bool localCommitPushed)
385 {
386 List<ulong> wdChannels;
387 lock (mappedChannels) // so it doesn't change while we're using it
388 wdChannels = mappedChannels.Where(x => x.Value.IsUpdatesChannel).Select(x => x.Key).ToList();
389
390 logger.LogTrace("Sending deployment message for RevisionInformation: {revisionInfoId}", revisionInformation.Id);
391
392 var callbacks = new List<Func<string?, string, ValueTask<Func<bool, ValueTask>>>>();
393
394 var task = Task.WhenAll(
395 wdChannels.Select(
396 async x =>
397 {
398 ChannelMapping? channelMapping;
399 lock (mappedChannels)
400 if (!mappedChannels.TryGetValue(x, out channelMapping))
401 return;
402 IProvider? provider;
403 lock (providers)
404 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
405 return;
406 try
407 {
408 var callback = await provider.SendUpdateMessage(
409 revisionInformation,
410 engineVersion,
411 estimatedCompletionTime,
412 gitHubOwner,
413 gitHubRepo,
414 channelMapping.ProviderChannelId,
415 localCommitPushed,
416 handlerCts.Token);
417
418 lock (callbacks)
419 callbacks.Add(callback);
420 }
421 catch (Exception ex)
422 {
423 logger.LogWarning(
424 ex,
425 "Error sending deploy message to provider {providerId}!",
426 channelMapping.ProviderId);
427 }
428 }));
429
430 AddMessageTask(task);
431
432 Task callbackTask;
433 Func<bool, Task>? finalUpdateAction = null;
434 async Task CallbackTask(string? errorMessage, string dreamMakerOutput)
435 {
436 await task;
437 var callbackResults = await ValueTaskExtensions.WhenAll(
438 callbacks.Select(
439 x => x(
440 errorMessage,
441 dreamMakerOutput)),
442 callbacks.Count);
443
444 finalUpdateAction = active => ValueTaskExtensions.WhenAll(callbackResults.Select(finalizerCallback => finalizerCallback(active))).AsTask();
445 }
446
447 async Task CompletionTask(bool active)
448 {
449 try
450 {
451 await callbackTask;
452 }
453 catch
454 {
455 // Handled in AddMessageTask
456 return;
457 }
458
459 AddMessageTask(finalUpdateAction!(active));
460 }
461
462 return (errorMessage, dreamMakerOutput) =>
463 {
464 callbackTask = CallbackTask(errorMessage, dreamMakerOutput);
465 AddMessageTask(callbackTask);
466 return active => AddMessageTask(CompletionTask(active));
467 };
468 }
469
471 public async Task StartAsync(CancellationToken cancellationToken)
472 {
473 foreach (var tgsCommand in commandFactory.GenerateCommands())
474 builtinCommands.Add(tgsCommand.Name.ToUpperInvariant(), tgsCommand);
475 var initialChatBots = activeChatBots.ToList();
476 await ValueTaskExtensions.WhenAll(initialChatBots.Select(x => ChangeSettings(x, cancellationToken)));
477 initialProviderConnectionsTask = InitialConnection();
478 chatHandler = MonitorMessages(handlerCts.Token);
479 }
480
482 public async Task StopAsync(CancellationToken cancellationToken)
483 {
484 handlerCts.Cancel();
485 if (chatHandler != null)
486 await chatHandler;
487 await Task.WhenAll(providers.Select(x => x.Key).Select(x => DeleteConnection(x, cancellationToken)));
488 await messageSendTask;
489 }
490
493 {
494 if (customCommandHandler == null)
495 throw new InvalidOperationException("RegisterCommandHandler() hasn't been called!");
496
497 IChatTrackingContext context = null!;
498 lock (mappedChannels)
499 context = new ChatTrackingContext(
500 customCommandHandler,
501 mappedChannels.Select(y => y.Value.Channel),
502 loggerFactory.CreateLogger<ChatTrackingContext>(),
503 () =>
504 {
505 lock (trackingContexts)
506 trackingContexts.Remove(context);
507 });
508
509 lock (trackingContexts)
510 trackingContexts.Add(context);
511
512 return context;
513 }
514
516 public async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
517 {
518 var logMessageSent = 0;
519 async Task UpdateTrackingContext(IChatTrackingContext channelSink, IEnumerable<ChannelRepresentation> channels)
520 {
521 if (Interlocked.Exchange(ref logMessageSent, 1) == 0)
522
523 await channelSink.UpdateChannels(channels, cancellationToken);
524 }
525
526 var waitingForInitialConnection = !initialProviderConnectionsTask!.IsCompleted;
527 if (waitingForInitialConnection)
528 {
529 logger.LogTrace("Waiting for initial chat bot connections before updating tracking contexts...");
530 await initialProviderConnectionsTask.WaitAsync(cancellationToken);
531 }
532
533 List<Task> tasks;
534 lock (mappedChannels)
535 lock (trackingContexts)
536 tasks = trackingContexts.Select(x => UpdateTrackingContext(x, mappedChannels.Select(y => y.Value.Channel))).ToList();
537
538 if (waitingForInitialConnection)
539 if (tasks.Count > 0)
540 logger.LogTrace("Updating chat tracking contexts...");
541 else
542 logger.LogTrace("No chat tracking contexts to update");
543
544 await Task.WhenAll(tasks);
545 }
546
548 public void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
549 {
550 if (this.customCommandHandler != null)
551 throw new InvalidOperationException("RegisterCommandHandler() already called!");
552 this.customCommandHandler = customCommandHandler ?? throw new ArgumentNullException(nameof(customCommandHandler));
553 }
554
556 public async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
557 {
558 logger.LogTrace("DeleteConnection {connectionId}", connectionId);
559 var hasSemaphore = changeChannelSemaphores.TryRemove(connectionId, out var semaphore);
560 using (hasSemaphore
561 ? semaphore
562 : null)
563 using (hasSemaphore
564 ? await SemaphoreSlimContext.Lock(semaphore!, cancellationToken)
565 : null)
566 {
567 var provider = await RemoveProviderChannels(connectionId, true, cancellationToken);
568 if (provider != null)
569 {
570 var startTime = DateTimeOffset.UtcNow;
571 try
572 {
573 await provider.Disconnect(cancellationToken);
574 }
575 catch (Exception ex)
576 {
577 logger.LogError(ex, "Error disconnecting connection {connectionId}!", connectionId);
578 }
579
580 await provider.DisposeAsync();
581 var duration = DateTimeOffset.UtcNow - startTime;
582 if (duration.TotalSeconds > 3)
583 logger.LogWarning("Disconnecting a {providerType} took {totalSeconds}s!", provider.GetType().Name, duration.TotalSeconds);
584 }
585 else
586 logger.LogTrace("DeleteConnection: ID {connectionId} doesn't exist!", connectionId);
587 }
588 }
589
591 public ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
592 {
593 var message = updateVersion == null
594 ? $"TGS: {(handlerMayDelayShutdownWithExtremelyLongRunningTasks ? "Graceful shutdown" : "Going down")}..."
595 : $"TGS: Updating to version {updateVersion}...";
596 List<ulong> systemChannels;
597 lock (mappedChannels) // so it doesn't change while we're using it
598 systemChannels = mappedChannels
599 .Where(x => x.Value.IsSystemChannel)
600 .Select(x => x.Key)
601 .ToList();
602
603 return SendMessage(
604 systemChannels,
605 null,
607 {
608 Text = message,
609 },
610 cancellationToken);
611 }
612
620 async ValueTask<IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
621 {
622 logger.LogTrace("RemoveProviderChannels {connectionId}...", connectionId);
623 IProvider? provider;
624 lock (providers)
625 {
626 if (!providers.TryGetValue(connectionId, out provider))
627 {
628 logger.LogTrace("Aborted, no such provider!");
629 return null;
630 }
631
632 if (removeProvider)
633 providers.Remove(connectionId);
634 }
635
636 ValueTask trackingContextsUpdateTask;
637 lock (mappedChannels)
638 {
639 foreach (var mappedConnectionChannel in mappedChannels.Where(x => x.Value.ProviderId == connectionId).Select(x => x.Key).ToList())
640 mappedChannels.Remove(mappedConnectionChannel);
641
642 var newMappedChannels = mappedChannels.Select(y => y.Value.Channel).ToList();
643
644 if (removeProvider)
645 lock (trackingContexts)
646 trackingContextsUpdateTask = ValueTaskExtensions.WhenAll(trackingContexts.Select(x => x.UpdateChannels(newMappedChannels, cancellationToken)));
647 else
648 trackingContextsUpdateTask = ValueTask.CompletedTask;
649 }
650
651 await trackingContextsUpdateTask;
652
653 return provider;
654 }
655
662 async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
663 {
664 logger.LogTrace("Remapping channels for provider reconnection...");
665 IEnumerable<Models.ChatChannel>? channelsToMap;
666 long providerId;
667 lock (providers)
668 providerId = providers.Where(x => x.Value == provider).Select(x => x.Key).First();
669
670 lock (activeChatBots)
671 channelsToMap = activeChatBots.FirstOrDefault(x => x.Id == providerId)?.Channels;
672
673 if (channelsToMap?.Any() ?? false)
674 await ChangeChannels(providerId, channelsToMap, cancellationToken);
675 }
676
685#pragma warning disable CA1502
686 async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
687#pragma warning restore CA1502
688 {
689 if (!provider.Connected)
690 {
691 logger.LogTrace("Abort message processing because provider is disconnected!");
692 return;
693 }
694
695 // provider reconnected, remap channels.
696 if (message == null)
697 {
698 await RemapProvider(provider, cancellationToken);
699 return;
700 }
701
702 // map the channel if it's private and we haven't seen it
703 var providerChannelId = message.User.Channel.RealId;
704 KeyValuePair<ulong, ChannelMapping>? mappedChannel;
705 long providerId;
706 bool hasChannelZero;
707 lock (providers)
708 {
709 // important, otherwise we could end up processing during shutdown
710 cancellationToken.ThrowIfCancellationRequested();
711
712 var providerIdNullable = providers
713 .Where(x => x.Value == provider)
714 .Select(x => (long?)x.Key)
715 .FirstOrDefault();
716
717 if (!providerIdNullable.HasValue)
718 {
719 // possible to have a message queued and then the provider immediately disconnects
720 logger.LogDebug("Unable to process command \"{command}\" due to provider disconnecting", message.Content);
721 return;
722 }
723
724 providerId = providerIdNullable.Value;
725 mappedChannel = mappedChannels
726 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == providerChannelId)
727 .Select(x => (KeyValuePair<ulong, ChannelMapping>?)x)
728 .FirstOrDefault();
729 hasChannelZero = mappedChannels
730 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == 0)
731 .Any();
732 }
733
734 if (!recursed && !mappedChannel.HasValue && !message.User.Channel.IsPrivateChannel && hasChannelZero)
735 {
736 logger.LogInformation("Receieved message from unmapped channel whose provider contains ID 0. Remapping...");
737 await RemapProvider(provider, cancellationToken);
738 logger.LogTrace("Resume processing original message...");
739 await ProcessMessage(provider, message, true, cancellationToken);
740 return;
741 }
742
743 ValueTask TextReply(string reply) => SendMessage(
744 new List<ulong>
745 {
746 message.User.Channel.RealId,
747 },
748 message,
750 {
751 Text = reply,
752 },
753 cancellationToken);
754
755 if (message.User.Channel.IsPrivateChannel)
756 lock (mappedChannels)
757 if (!mappedChannel.HasValue)
758 {
759 ulong newId;
760 lock (synchronizationLock)
761 newId = channelIdCounter++;
762 logger.LogTrace(
763 "Mapping private channel {connectionName}:{channelFriendlyName} as {newId}",
765 message.User.FriendlyName,
766 newId);
767 mappedChannels.Add(newId, new ChannelMapping(message.User.Channel)
768 {
769 ProviderChannelId = message.User.Channel.RealId,
770 ProviderId = providerId,
771 });
772
773 logger.LogTrace(
774 "Mapping DM {connectionName}:{userId} ({userFriendlyName}) as {newId}",
776 message.User.RealId,
777 message.User.FriendlyName,
778 newId);
779 message.User.Channel.RealId = newId;
780 }
781 else
782 message.User.Channel.RealId = mappedChannel.Value.Key;
783 else
784 {
785 if (!mappedChannel.HasValue)
786 {
787 logger.LogError(
788 "Error mapping message: Provider ID: {providerId}, Channel Real ID: {realId}",
789 providerId,
790 message.User.Channel.RealId);
791 logger.LogTrace("message: {messageJson}", JsonConvert.SerializeObject(message));
792 lock (mappedChannels)
793 logger.LogTrace("mappedChannels: {mappedChannelsJson}", JsonConvert.SerializeObject(mappedChannels));
794 await TextReply("TGS: Processing error, check logs!");
795 return;
796 }
797
798 var mappingChannelRepresentation = mappedChannel.Value.Value.Channel;
799
800 message.User.Channel.RealId = mappingChannelRepresentation.RealId;
801 message.User.Channel.Tag = mappingChannelRepresentation.Tag;
802 message.User.Channel.IsAdminChannel = mappingChannelRepresentation.IsAdminChannel;
803 }
804
805 var trimmedMessage = message.Content.Trim();
806 if (trimmedMessage.Length == 0)
807 return;
808
809 var splits = new List<string>(trimmedMessage.Split(' ', StringSplitOptions.RemoveEmptyEntries));
810 var address = splits[0];
811 if (address.Length > 1 && (address.Last() == ':' || address.Last() == ','))
812 address = address[0..^1];
813
814 var addressed =
815 address.Equals(CommonMention, StringComparison.OrdinalIgnoreCase)
816 || address.Equals(provider.BotMention, StringComparison.OrdinalIgnoreCase);
817
818 // no mention
819 if (!addressed && !message.User.Channel.IsPrivateChannel)
820 return;
821
822 logger.LogTrace(
823 "Start processing command: {message}. User (True provider Id): {profiderId}",
824 message.Content,
825 JsonConvert.SerializeObject(message.User));
826 try
827 {
828 if (addressed)
829 splits.RemoveAt(0);
830
831 if (splits.Count == 0)
832 {
833 // just a mention
834 await TextReply("Hi!");
835 return;
836 }
837
838 var command = splits[0];
839 splits.RemoveAt(0);
840 var arguments = String.Join(" ", splits);
841
842 Tuple<ICommand, IChatTrackingContext?>? GetCommand(string command)
843 {
844 if (!builtinCommands.TryGetValue(command, out var handler))
845 return trackingContexts
846 .Where(trackingContext => trackingContext.Active)
847 .SelectMany(trackingContext => trackingContext.CustomCommands.Select(customCommand => Tuple.Create<ICommand, IChatTrackingContext?>(customCommand, trackingContext)))
848 .Where(tuple => tuple.Item1.Name.Equals(command, StringComparison.OrdinalIgnoreCase))
849 .FirstOrDefault();
850
851 return Tuple.Create<ICommand, IChatTrackingContext?>(handler, null);
852 }
853
854 const string UnknownCommandMessage = "TGS: Unknown command! Type '?' or 'help' for available commands.";
855
856 if (command.Equals("help", StringComparison.OrdinalIgnoreCase) || command == "?")
857 {
858 string helpText;
859 if (splits.Count == 0)
860 {
861 var allCommands = builtinCommands.Select(x => x.Value).ToList();
862 allCommands.AddRange(
863 trackingContexts
864 .SelectMany(
865 x => x.CustomCommands));
866 helpText = String.Format(CultureInfo.InvariantCulture, "Available commands (Type '?' or 'help' and then a command name for more details): {0}", String.Join(", ", allCommands.Select(x => x.Name)));
867 }
868 else
869 {
870 var helpTuple = GetCommand(splits[0]);
871 if (helpTuple != default)
872 {
873 var (helpHandler, _) = helpTuple;
874 helpText = String.Format(CultureInfo.InvariantCulture, "{0}: {1}{2}", helpHandler.Name, helpHandler.HelpText, helpHandler.AdminOnly ? " - May only be used in admin channels" : String.Empty);
875 }
876 else
877 helpText = UnknownCommandMessage;
878 }
879
880 await TextReply(helpText);
881 return;
882 }
883
884 var tuple = GetCommand(command);
885
886 if (tuple == default)
887 {
888 await TextReply(UnknownCommandMessage);
889 return;
890 }
891
892 var (commandHandler, trackingContext) = tuple;
893
894 if (trackingContext?.Active == false)
895 {
896 await TextReply("TGS: The server is rebooting, please try again later");
897 return;
898 }
899
900 if (commandHandler.AdminOnly && !message.User.Channel.IsAdminChannel)
901 {
902 await TextReply("TGS: Use this command in an admin channel!");
903 return;
904 }
905
906 var result = await commandHandler.Invoke(arguments, message.User, cancellationToken);
907 if (result != null)
908 await SendMessage(new List<ulong> { message.User.Channel.RealId }, message, result, cancellationToken);
909 }
910 catch (OperationCanceledException ex)
911 {
912 logger.LogTrace(ex, "Command processing canceled!");
913 }
914 catch (Exception e)
915 {
916 // error bc custom commands should reply about why it failed
917 logger.LogError(e, "Error processing chat command");
918 await TextReply("TGS: Internal error processing command! Check server logs!");
919 }
920 finally
921 {
922 logger.LogTrace("Done processing command.");
923 }
924 }
925
931 async Task MonitorMessages(CancellationToken cancellationToken)
932 {
933 logger.LogTrace("Starting processing loop...");
934 var messageTasks = new Dictionary<IProvider, Task<Message?>>();
935 ValueTask activeProcessingTask = ValueTask.CompletedTask;
936 try
937 {
938 Task? updatedTask = null;
939 while (!cancellationToken.IsCancellationRequested)
940 {
941 if (updatedTask?.IsCompleted != false)
942 lock (synchronizationLock)
943 updatedTask = connectionsUpdated.Task;
944
945 // prune disconnected providers
946 foreach (var disposedProviderMessageTaskKvp in messageTasks.Where(x => x.Key.Disposed).ToList())
947 messageTasks.Remove(disposedProviderMessageTaskKvp.Key);
948
949 // add new ones
950 lock (providers)
951 foreach (var providerKvp in providers)
952 if (!messageTasks.ContainsKey(providerKvp.Value))
953 messageTasks.Add(
954 providerKvp.Value,
955 providerKvp.Value.NextMessage(cancellationToken));
956
957 if (messageTasks.Count == 0)
958 {
959 logger.LogTrace("No providers active, pausing messsage monitoring...");
960 await updatedTask.WaitAsync(cancellationToken);
961 logger.LogTrace("Resuming message monitoring...");
962 continue;
963 }
964
965 // wait for a message
966 await Task.WhenAny(updatedTask, Task.WhenAny(messageTasks.Select(x => x.Value)));
967
968 // process completed ones
969 foreach (var completedMessageTaskKvp in messageTasks.Where(x => x.Value.IsCompleted).ToList())
970 {
971 var provider = completedMessageTaskKvp.Key;
972 messageTasks.Remove(provider);
973
974 if (provider.Disposed) // valid to receive one, but don't process it
975 continue;
976
977 var message = await completedMessageTaskKvp.Value;
978 var messageNumber = Interlocked.Increment(ref messagesProcessed);
979
980 async ValueTask WrapProcessMessage()
981 {
982 var localActiveProcessingTask = activeProcessingTask;
983 using (LogContext.PushProperty(SerilogContextHelper.ChatMessageIterationContextProperty, messageNumber))
984 try
985 {
986 await ProcessMessage(provider, message, false, cancellationToken);
987 }
988 catch (Exception ex)
989 {
990 logger.LogError(ex, "Error processing message {messageNumber}!", messageNumber);
991 }
992
993 await localActiveProcessingTask;
994 }
995
996 activeProcessingTask = WrapProcessMessage();
997 }
998 }
999 }
1000 catch (OperationCanceledException ex)
1001 {
1002 logger.LogTrace(ex, "Message processing loop cancelled!");
1003 }
1004 catch (Exception e)
1005 {
1006 logger.LogError(e, "Message loop crashed!");
1007 }
1008 finally
1009 {
1010 await activeProcessingTask;
1011 }
1012
1013 logger.LogTrace("Leaving message processing loop");
1014 }
1015
1024 ValueTask SendMessage(IEnumerable<ulong> channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
1025 {
1026 var channelIdsList = channelIds.ToList();
1027
1028 logger.LogTrace(
1029 "Chat send \"{message}\"{embed} to channels: [{channelIdsCommaSeperated}]",
1030 message.Text,
1031 message.Embed != null ? " (with embed)" : String.Empty,
1032 String.Join(", ", channelIdsList));
1033
1034 if (channelIdsList.Count == 0)
1035 return ValueTask.CompletedTask;
1036
1038 channelIdsList.Select(x =>
1039 {
1040 ChannelMapping? channelMapping;
1041 lock (mappedChannels)
1042 if (!mappedChannels.TryGetValue(x, out channelMapping))
1043 return ValueTask.CompletedTask;
1044 IProvider? provider;
1045 lock (providers)
1046 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
1047 return ValueTask.CompletedTask;
1048 return provider.SendMessage(replyTo, message, channelMapping.ProviderChannelId, cancellationToken);
1049 }));
1050 }
1051
1057 {
1058 await Task.WhenAll(providers.Select(x => x.Value.InitialConnectionJob));
1059 logger.LogTrace("Initial provider connection task completed");
1060 }
1061
1066 void AddMessageTask(Task task)
1067 {
1068 async Task Wrap(Task originalTask)
1069 {
1070 await originalTask;
1071 try
1072 {
1073 await task;
1074 }
1075 catch (OperationCanceledException ex)
1076 {
1077 logger.LogDebug(ex, "Async chat message cancelled!");
1078 }
1079 catch (Exception ex)
1080 {
1081 logger.LogError(ex, "Error in asynchronous chat message!");
1082 }
1083 }
1084
1085 lock (handlerCts)
1086 messageSendTask = Wrap(messageSendTask);
1087 }
1088
1095 void QueueMessageInternal(MessageContent message, Func<IEnumerable<ulong>> channelIdsFactory, bool waitForConnections)
1096 {
1097 async Task SendMessageTask()
1098 {
1099 var cancellationToken = handlerCts.Token;
1100 if (waitForConnections)
1101 await initialProviderConnectionsTask!.WaitAsync(cancellationToken);
1102
1103 await SendMessage(
1104 channelIdsFactory(),
1105 null,
1106 message,
1107 cancellationToken);
1108 }
1109
1110 AddMessageTask(SendMessageTask());
1111 }
1112 }
1113}
Information about an engine installation.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Represents a mapping of a ChannelRepresentation.RealId.
string ConnectionName
The name of the connection the ChannelRepresentation belongs to.
bool IsAdminChannel
If this is considered a channel for admin commands.
const string CommonMention
The common bot mention.
long messagesProcessed
The number of Messages processed.
readonly IProviderFactory providerFactory
The IProviderFactory for the ChatManager.
ValueTask SendMessage(IEnumerable< ulong > channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
Asynchronously send a given message to a set of channelIds .
readonly object synchronizationLock
Used for various lock statements throughout this class.
ChatManager(IProviderFactory providerFactory, ICommandFactory commandFactory, IServerControl serverControl, ILoggerFactory loggerFactory, ILogger< ChatManager > logger, IEnumerable< Models.ChatBot > initialChatBots)
Initializes a new instance of the ChatManager class.
Task? initialProviderConnectionsTask
A Task that represents the IProviders initial connection.
ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
Handle a restart of the server.A ValueTask representing the running operation.
void QueueMessageInternal(MessageContent message, Func< IEnumerable< ulong > > channelIdsFactory, bool waitForConnections)
Adds a given message to the send queue.
readonly List< Models.ChatBot > activeChatBots
The active Models.ChatBot for the ChatManager.
async Task InitialConnection()
Aggregate all IProvider.InitialConnectionJobs into one <sse cref="Task">.
async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
Remap the channels for a given provider .
async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
Processes a message .
readonly Dictionary< long, IProvider > providers
Map of IProviders in use, keyed by ChatBotSettings EntityId.Id.
async Task MonitorMessages(CancellationToken cancellationToken)
Monitors active providers for new Messages.
ICustomCommandHandler? customCommandHandler
The ICustomCommandHandler for the ChangeChannels(long, IEnumerable<Models.ChatChannel>,...
async ValueTask ChangeChannels(long connectionId, IEnumerable< Models.ChatChannel > newChannels, CancellationToken cancellationToken)
readonly Dictionary< string, ICommand > builtinCommands
Unchanging ICommands in the ChatManager mapped by ICommand.Name.
void QueueMessage(MessageContent message, IEnumerable< ulong > channelIds)
Queue a chat message to a given set of channelIds .
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the ChatManager.
Func< string?, string, Action< bool > > QueueDeploymentMessage(Models.RevisionInformation revisionInformation, EngineVersion engineVersion, DateTimeOffset? estimatedCompletionTime, string? gitHubOwner, string? gitHubRepo, bool localCommitPushed)
Send the message for a deployment to configured deployment channels.A Func<T1, T2,...
Task messageSendTask
A Task that represents all sent messages.
Task? chatHandler
The Task that monitors incoming chat messages.
async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
Change chat settings. If the Api.Models.EntityId.Id is not currently in use, a new connection will be...
ulong channelIdCounter
Used for remapping ChannelRepresentation.RealIds.
readonly ICommandFactory commandFactory
The ICommandFactory for the ChatManager.
IChatTrackingContext CreateTrackingContext()
Start tracking Commands.CustomCommands and ChannelRepresentations.A new IChatTrackingContext.
void AddMessageTask(Task task)
Adds a given task to messageSendTask.
readonly ILogger< ChatManager > logger
The ILogger for the ChatManager.
readonly ConcurrentDictionary< long, SemaphoreSlim > changeChannelSemaphores
Map of SemaphoreSlims used to guard concurrent access to ChangeChannels(long, IEnumerable<Models....
readonly IRestartRegistration restartRegistration
The IRestartRegistration for the ChatManager.
void QueueWatchdogMessage(string message)
Queue a chat message to configured watchdog channels.
readonly CancellationTokenSource handlerCts
The CancellationTokenSource for chatHandler.
async Task StartAsync(CancellationToken cancellationToken)
void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
Registers a customCommandHandler to use.
TaskCompletionSource connectionsUpdated
The TaskCompletionSource that completes when ChatBotSettingss change.
async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
Force an update with the active channels on all active IChatTrackingContexts.A ValueTask representing...
async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
Disconnects and deletes a given connection.A Task representing the running operation.
async ValueTask< IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
Remove a IProvider from mappedChannels optionally removing the provider itself from providers and upd...
async Task StopAsync(CancellationToken cancellationToken)
readonly List< IChatTrackingContext > trackingContexts
The active IChatTrackingContexts for the ChatManager.
readonly Dictionary< ulong, ChannelMapping > mappedChannels
Map of ChannelRepresentation.RealIds to ChannelMappings.
string FriendlyName
The friendly name of the user.
Definition ChatUser.cs:31
ChannelRepresentation Channel
The ChannelRepresentation the user spoke from.
Definition ChatUser.cs:41
Represents a message received by a IProvider.
Definition Message.cs:9
ChatUser User
The ChatUser who sent the Message.
Definition Message.cs:18
Represents a message to send to a chat provider.
static async ValueTask< SemaphoreSlimContext > Lock(SemaphoreSlim semaphore, CancellationToken cancellationToken, ILogger? logger=null)
Asyncronously locks a semaphore .
Helpers for manipulating the Serilog.Context.LogContext.
const string ChatMessageIterationContextProperty
The Serilog.Context.LogContext property name for the ID of the chat message currently being processed...
Represents a command that can be invoked by talking to chat bots.
Definition ICommand.cs:12
ValueTask UpdateChannels(IEnumerable< ChannelRepresentation > newChannels, CancellationToken cancellationToken)
Called when newChannels are set.
For managing connected chat services.
Represents a tracking of dynamic chat json files.
Handles Commands.ICommands that map to those defined in a IChatTrackingContext.
IProvider CreateProvider(ChatBot settings)
Create a IProvider.
string BotMention
The string that indicates the IProvider was mentioned.
Definition IProvider.cs:30
bool Connected
If the IProvider is currently connected.
Definition IProvider.cs:20
Represents the lifetime of a IRestartHandler registration.
Represents a service that may take an updated Host assembly and run it, stopping the current assembly...
IRestartRegistration RegisterForRestart(IRestartHandler handler)
Register a given handler to run before stopping the server for a restart.