tgstation-server 6.18.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
ChatManager.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Globalization;
5using System.Linq;
6using System.Threading;
7using System.Threading.Tasks;
8
9using Microsoft.Extensions.Logging;
10
11using Newtonsoft.Json;
12
13using Serilog.Context;
14
23
25{
27#pragma warning disable CA1506 // TODO: Decomplexify
29 {
33 public const string CommonMention = "!tgs";
34
39
44
49
53 readonly ILoggerFactory loggerFactory;
54
58 readonly ILogger<ChatManager> logger;
59
63 readonly Dictionary<string, ICommand> builtinCommands;
64
68 readonly Dictionary<long, IProvider> providers;
69
73 readonly ConcurrentDictionary<long, SemaphoreSlim> changeChannelSemaphores;
74
78 readonly Dictionary<ulong, ChannelMapping> mappedChannels;
79
83 readonly List<IChatTrackingContext> trackingContexts;
84
88 readonly List<Models.ChatBot> activeChatBots;
89
93 readonly CancellationTokenSource handlerCts;
94
98 readonly object synchronizationLock;
99
104
109
114
119
123 TaskCompletionSource connectionsUpdated;
124
129
134
147 IServerControl serverControl,
148 ILoggerFactory loggerFactory,
149 ILogger<ChatManager> logger,
150 IEnumerable<Models.ChatBot> initialChatBots)
151 {
152 this.providerFactory = providerFactory ?? throw new ArgumentNullException(nameof(providerFactory));
153 this.commandFactory = commandFactory ?? throw new ArgumentNullException(nameof(commandFactory));
154 ArgumentNullException.ThrowIfNull(serverControl);
155 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
156 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
157 activeChatBots = initialChatBots?.ToList() ?? throw new ArgumentNullException(nameof(initialChatBots));
158
159 restartRegistration = serverControl.RegisterForRestart(this);
160
161 synchronizationLock = new object();
162
163 builtinCommands = new Dictionary<string, ICommand>(StringComparer.OrdinalIgnoreCase);
164 providers = new Dictionary<long, IProvider>();
165 changeChannelSemaphores = new ConcurrentDictionary<long, SemaphoreSlim>();
166 mappedChannels = new Dictionary<ulong, ChannelMapping>();
167 trackingContexts = new List<IChatTrackingContext>();
168 handlerCts = new CancellationTokenSource();
169 connectionsUpdated = new TaskCompletionSource();
170
171 messageSendTask = Task.CompletedTask;
173 }
174
176 public async ValueTask DisposeAsync()
177 {
178 logger.LogTrace("Disposing...");
179 restartRegistration.Dispose();
180 handlerCts.Dispose();
181 foreach (var providerKvp in providers)
182 await providerKvp.Value.DisposeAsync();
183
184 foreach (var providerKvp in changeChannelSemaphores)
185 providerKvp.Value.Dispose();
186
187 await messageSendTask;
188 }
189
191 public async ValueTask ChangeChannels(long connectionId, IEnumerable<Models.ChatChannel> newChannels, CancellationToken cancellationToken)
192 {
193 ArgumentNullException.ThrowIfNull(newChannels);
194
195 logger.LogTrace("ChangeChannels {connectionId}...", connectionId);
196 var semaphore = changeChannelSemaphores.GetOrAdd(connectionId, _ =>
197 {
198 logger.LogTrace("Creating ChangeChannels semaphore for connection ID {connectionId}...", connectionId);
199 return new SemaphoreSlim(1);
200 });
201 using (await SemaphoreSlimContext.Lock(semaphore, cancellationToken))
202 {
203 var provider = await RemoveProviderChannels(connectionId, false, cancellationToken);
204 if (provider == null)
205 return;
206
207 if (!provider.Connected)
208 {
209 logger.LogDebug("Cannot map channels, provider {providerId} disconnected!", connectionId);
210 return;
211 }
212
213 var results = await provider.MapChannels(newChannels, cancellationToken);
214 try
215 {
216 lock (activeChatBots)
217 {
218 var botToUpdate = activeChatBots.FirstOrDefault(bot => bot.Id == connectionId);
219 if (botToUpdate != null)
220 botToUpdate.Channels = newChannels
221 .Select(apiModel => new Models.ChatChannel
222 {
223 DiscordChannelId = apiModel.DiscordChannelId,
224 IrcChannel = apiModel.IrcChannel,
225 IsAdminChannel = apiModel.IsAdminChannel,
226 IsUpdatesChannel = apiModel.IsUpdatesChannel,
227 IsSystemChannel = apiModel.IsSystemChannel,
228 IsWatchdogChannel = apiModel.IsWatchdogChannel,
229 Tag = apiModel.Tag,
230 })
231 .ToList();
232 }
233
234 var newMappings = results.SelectMany(
235 kvp => kvp.Value.Select(
236 channelRepresentation => new ChannelMapping(channelRepresentation)
237 {
238 IsWatchdogChannel = kvp.Key.IsWatchdogChannel == true,
239 IsUpdatesChannel = kvp.Key.IsUpdatesChannel == true,
240 IsAdminChannel = kvp.Key.IsAdminChannel == true,
241 IsSystemChannel = kvp.Key.IsSystemChannel == true,
242 ProviderChannelId = channelRepresentation.RealId,
243 ProviderId = connectionId,
244 }));
245
246 ulong baseId;
248 {
249 baseId = channelIdCounter;
250 channelIdCounter += (ulong)results.Count;
251 }
252
253 lock (mappedChannels)
254 {
255 lock (providers)
256 if (!providers.TryGetValue(connectionId, out var verify) || verify != provider) // aborted again
257 return;
258 foreach (var newMapping in newMappings)
259 {
260 var newId = baseId++;
261 logger.LogTrace("Mapping channel {connectionName}:{channelFriendlyName} as {newId}", newMapping.Channel.ConnectionName, newMapping.Channel.FriendlyName, newId);
262 mappedChannels.Add(newId, newMapping);
263 newMapping.Channel.RealId = newId;
264 }
265 }
266
267 // we only want to update contexts if everything at startup has connected once already
268 // otherwise we could send an incomplete channel set to the DMAPI, which will then spout all its queued messages into it instead of all relevant chatbots
269 // The watchdog can call this if it needs to after starting up
270 if (initialProviderConnectionsTask!.IsCompleted)
271 await UpdateTrackingContexts(cancellationToken);
272 }
273 finally
274 {
275 provider.InitialMappingComplete();
276 }
277 }
278 }
279
281 public async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
282 {
283 ArgumentNullException.ThrowIfNull(newSettings);
284
285 logger.LogTrace("ChangeSettings...");
286
287 Task disconnectTask;
288 IProvider? provider = null;
289 var newSettingsId = Models.ModelExtensions.Require(newSettings, x => x.Id);
290 var newSettingsEnabled = Models.ModelExtensions.Require(newSettings, x => x.Enabled);
291 lock (providers)
292 {
293 // raw settings changes forces a rebuild of the provider
294 if (providers.ContainsKey(newSettingsId))
295 disconnectTask = DeleteConnection(newSettingsId, cancellationToken);
296 else
297 disconnectTask = Task.CompletedTask;
298 if (newSettingsEnabled)
299 {
300 provider = providerFactory.CreateProvider(newSettings);
301 providers.Add(newSettingsId, provider);
302 }
303 }
304
305 lock (mappedChannels)
306 foreach (var oldMappedChannelId in mappedChannels.Where(x => x.Value.ProviderId == newSettingsId).Select(x => x.Key).ToList())
307 mappedChannels.Remove(oldMappedChannelId);
308
309 await disconnectTask;
310
312 {
313 // same thread shennanigans
314 var oldOne = connectionsUpdated;
315 connectionsUpdated = new TaskCompletionSource();
316 oldOne.SetResult();
317 }
318
319 var reconnectionUpdateTask = provider?.SetReconnectInterval(
320 Models.ModelExtensions.Require(newSettings, x => x.ReconnectionInterval),
321 newSettingsEnabled)
322 ?? Task.CompletedTask;
323 lock (activeChatBots)
324 {
325 var originalChatBot = activeChatBots.FirstOrDefault(bot => bot.Id == newSettings.Id);
326 if (originalChatBot != null)
327 activeChatBots.Remove(originalChatBot);
328
329 activeChatBots.Add(new Models.ChatBot(newSettings.Channels)
330 {
331 Id = newSettings.Id,
332 ConnectionString = newSettings.ConnectionString,
333 Enabled = newSettings.Enabled,
334 Name = newSettings.Name,
335 ReconnectionInterval = newSettings.ReconnectionInterval,
336 Provider = newSettings.Provider,
337 });
338 }
339
340 await reconnectionUpdateTask;
341 }
342
344 public void QueueMessage(MessageContent message, IEnumerable<ulong> channelIds)
345 {
346 ArgumentNullException.ThrowIfNull(message);
347 ArgumentNullException.ThrowIfNull(channelIds);
348
349 QueueMessageInternal(message, () => channelIds, false);
350 }
351
353 public void QueueWatchdogMessage(string message)
354 => QueueMessageGeneric(mapping => mapping.IsWatchdogChannel, message, "WD");
355
357 public void QueueRawDeploymentMessage(string message)
358 => QueueMessageGeneric(mapping => mapping.IsUpdatesChannel, message, null);
359
361 public Func<string?, string, Action<bool>> QueueDeploymentMessage(
362 Models.RevisionInformation revisionInformation,
363 Models.RevisionInformation? previousRevisionInformation,
364 EngineVersion engineVersion,
365 DateTimeOffset? estimatedCompletionTime,
366 string? gitHubOwner,
367 string? gitHubRepo,
368 bool localCommitPushed)
369 {
370 List<ulong> wdChannels;
371 lock (mappedChannels) // so it doesn't change while we're using it
372 wdChannels = mappedChannels.Where(x => x.Value.IsUpdatesChannel).Select(x => x.Key).ToList();
373
374 logger.LogTrace("Sending deployment message for RevisionInformation: {revisionInfoId}", revisionInformation.Id);
375
376 var callbacks = new List<Func<string?, string, ValueTask<Func<bool, ValueTask>>>>();
377
378 var task = Task.WhenAll(
379 wdChannels.Select(
380 async x =>
381 {
382 ChannelMapping? channelMapping;
383 lock (mappedChannels)
384 if (!mappedChannels.TryGetValue(x, out channelMapping))
385 return;
386 IProvider? provider;
387 lock (providers)
388 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
389 return;
390 try
391 {
392 var callback = await provider.SendUpdateMessage(
393 revisionInformation,
394 previousRevisionInformation,
395 engineVersion,
396 estimatedCompletionTime,
397 gitHubOwner,
398 gitHubRepo,
399 channelMapping.ProviderChannelId,
400 localCommitPushed,
401 handlerCts.Token);
402
403 lock (callbacks)
404 callbacks.Add(callback);
405 }
406 catch (Exception ex)
407 {
408 logger.LogWarning(
409 ex,
410 "Error sending deploy message to provider {providerId}!",
411 channelMapping.ProviderId);
412 }
413 }));
414
415 AddMessageTask(task);
416
417 Task callbackTask;
418 Func<bool, Task>? finalUpdateAction = null;
419 async Task CallbackTask(string? errorMessage, string dreamMakerOutput)
420 {
421 await task;
422 var callbackResults = await ValueTaskExtensions.WhenAll(
423 callbacks.Select(
424 x => x(
425 errorMessage,
426 dreamMakerOutput)),
427 callbacks.Count);
428
429 finalUpdateAction = active => ValueTaskExtensions.WhenAll(callbackResults.Select(finalizerCallback => finalizerCallback(active))).AsTask();
430 }
431
432 async Task CompletionTask(bool active)
433 {
434 try
435 {
436 await callbackTask;
437 }
438 catch
439 {
440 // Handled in AddMessageTask
441 return;
442 }
443
444 AddMessageTask(finalUpdateAction!(active));
445 }
446
447 return (errorMessage, dreamMakerOutput) =>
448 {
449 callbackTask = CallbackTask(errorMessage, dreamMakerOutput);
450 AddMessageTask(callbackTask);
451 return active => AddMessageTask(CompletionTask(active));
452 };
453 }
454
456 public async Task StartAsync(CancellationToken cancellationToken)
457 {
458 foreach (var tgsCommand in commandFactory.GenerateCommands())
459 builtinCommands.Add(tgsCommand.Name.ToUpperInvariant(), tgsCommand);
460 var initialChatBots = activeChatBots.ToList();
461 await ValueTaskExtensions.WhenAll(initialChatBots.Select(x => ChangeSettings(x, cancellationToken)));
462 initialProviderConnectionsTask = InitialConnection();
463 chatHandler = MonitorMessages(handlerCts.Token);
464 }
465
467 public async Task StopAsync(CancellationToken cancellationToken)
468 {
469 handlerCts.Cancel();
470 if (chatHandler != null)
471 await chatHandler;
472 await Task.WhenAll(providers.Select(x => x.Key).Select(x => DeleteConnection(x, cancellationToken)));
473 await messageSendTask;
474 }
475
478 {
479 if (customCommandHandler == null)
480 throw new InvalidOperationException("RegisterCommandHandler() hasn't been called!");
481
482 IChatTrackingContext context = null!;
483 lock (mappedChannels)
484 context = new ChatTrackingContext(
485 customCommandHandler,
486 mappedChannels.Select(y => y.Value.Channel),
487 loggerFactory.CreateLogger<ChatTrackingContext>(),
488 () =>
489 {
490 lock (trackingContexts)
491 trackingContexts.Remove(context);
492 });
493
494 lock (trackingContexts)
495 trackingContexts.Add(context);
496
497 return context;
498 }
499
501 public async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
502 {
503 var logMessageSent = 0;
504 async Task UpdateTrackingContext(IChatTrackingContext channelSink, IEnumerable<ChannelRepresentation> channels)
505 {
506 if (Interlocked.Exchange(ref logMessageSent, 1) == 0)
507
508 await channelSink.UpdateChannels(channels, cancellationToken);
509 }
510
511 var waitingForInitialConnection = !initialProviderConnectionsTask!.IsCompleted;
512 if (waitingForInitialConnection)
513 {
514 logger.LogTrace("Waiting for initial chat bot connections before updating tracking contexts...");
515 await initialProviderConnectionsTask.WaitAsync(cancellationToken);
516 }
517
518 List<Task> tasks;
519 lock (mappedChannels)
520 lock (trackingContexts)
521 tasks = trackingContexts.Select(x => UpdateTrackingContext(x, mappedChannels.Select(y => y.Value.Channel))).ToList();
522
523 if (waitingForInitialConnection)
524 if (tasks.Count > 0)
525 logger.LogTrace("Updating chat tracking contexts...");
526 else
527 logger.LogTrace("No chat tracking contexts to update");
528
529 await Task.WhenAll(tasks);
530 }
531
533 public void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
534 {
535 if (this.customCommandHandler != null)
536 throw new InvalidOperationException("RegisterCommandHandler() already called!");
537 this.customCommandHandler = customCommandHandler ?? throw new ArgumentNullException(nameof(customCommandHandler));
538 }
539
541 public async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
542 {
543 logger.LogTrace("DeleteConnection {connectionId}", connectionId);
544 var hasSemaphore = changeChannelSemaphores.TryRemove(connectionId, out var semaphore);
545 using (hasSemaphore
546 ? semaphore
547 : null)
548 using (hasSemaphore
549 ? await SemaphoreSlimContext.Lock(semaphore!, cancellationToken)
550 : null)
551 {
552 var provider = await RemoveProviderChannels(connectionId, true, cancellationToken);
553 if (provider != null)
554 {
555 var startTime = DateTimeOffset.UtcNow;
556 try
557 {
558 await provider.Disconnect(cancellationToken);
559 }
560 catch (Exception ex)
561 {
562 logger.LogError(ex, "Error disconnecting connection {connectionId}!", connectionId);
563 }
564
565 await provider.DisposeAsync();
566 var duration = DateTimeOffset.UtcNow - startTime;
567 if (duration.TotalSeconds > 3)
568 logger.LogWarning("Disconnecting a {providerType} took {totalSeconds}s!", provider.GetType().Name, duration.TotalSeconds);
569 }
570 else
571 logger.LogTrace("DeleteConnection: ID {connectionId} doesn't exist!", connectionId);
572 }
573 }
574
576 public ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
577 {
578 var message = updateVersion == null
579 ? $"TGS: {(handlerMayDelayShutdownWithExtremelyLongRunningTasks ? "Graceful shutdown" : "Going down")}..."
580 : $"TGS: Updating to version {updateVersion}...";
581 List<ulong> systemChannels;
582 lock (mappedChannels) // so it doesn't change while we're using it
583 systemChannels = mappedChannels
584 .Where(x => x.Value.IsSystemChannel)
585 .Select(x => x.Key)
586 .ToList();
587
588 return SendMessage(
589 systemChannels,
590 null,
592 {
593 Text = message,
594 },
595 cancellationToken);
596 }
597
605 async ValueTask<IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
606 {
607 logger.LogTrace("RemoveProviderChannels {connectionId}...", connectionId);
608 IProvider? provider;
609 lock (providers)
610 {
611 if (!providers.TryGetValue(connectionId, out provider))
612 {
613 logger.LogTrace("Aborted, no such provider!");
614 return null;
615 }
616
617 if (removeProvider)
618 providers.Remove(connectionId);
619 }
620
621 ValueTask trackingContextsUpdateTask;
622 lock (mappedChannels)
623 {
624 foreach (var mappedConnectionChannel in mappedChannels.Where(x => x.Value.ProviderId == connectionId).Select(x => x.Key).ToList())
625 mappedChannels.Remove(mappedConnectionChannel);
626
627 var newMappedChannels = mappedChannels.Select(y => y.Value.Channel).ToList();
628
629 if (removeProvider)
630 lock (trackingContexts)
631 trackingContextsUpdateTask = ValueTaskExtensions.WhenAll(trackingContexts.Select(x => x.UpdateChannels(newMappedChannels, cancellationToken)));
632 else
633 trackingContextsUpdateTask = ValueTask.CompletedTask;
634 }
635
636 await trackingContextsUpdateTask;
637
638 return provider;
639 }
640
647 async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
648 {
649 logger.LogTrace("Remapping channels for provider reconnection...");
650 IEnumerable<Models.ChatChannel>? channelsToMap;
651 long providerId;
652 lock (providers)
653 providerId = providers.Where(x => x.Value == provider).Select(x => x.Key).First();
654
655 lock (activeChatBots)
656 channelsToMap = activeChatBots.FirstOrDefault(x => x.Id == providerId)?.Channels;
657
658 if (channelsToMap?.Any() ?? false)
659 await ChangeChannels(providerId, channelsToMap, cancellationToken);
660 }
661
670#pragma warning disable CA1502
671 async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
672#pragma warning restore CA1502
673 {
674 if (!provider.Connected)
675 {
676 logger.LogTrace("Abort message processing because provider is disconnected!");
677 return;
678 }
679
680 // provider reconnected, remap channels.
681 if (message == null)
682 {
683 await RemapProvider(provider, cancellationToken);
684 return;
685 }
686
687 // map the channel if it's private and we haven't seen it
688 var providerChannelId = message.User.Channel.RealId;
689 KeyValuePair<ulong, ChannelMapping>? mappedChannel;
690 long providerId;
691 bool hasChannelZero;
692 lock (providers)
693 {
694 // important, otherwise we could end up processing during shutdown
695 cancellationToken.ThrowIfCancellationRequested();
696
697 var providerIdNullable = providers
698 .Where(x => x.Value == provider)
699 .Select(x => (long?)x.Key)
700 .FirstOrDefault();
701
702 if (!providerIdNullable.HasValue)
703 {
704 // possible to have a message queued and then the provider immediately disconnects
705 logger.LogDebug("Unable to process command \"{command}\" due to provider disconnecting", message.Content);
706 return;
707 }
708
709 providerId = providerIdNullable.Value;
710 mappedChannel = mappedChannels
711 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == providerChannelId)
712 .Select(x => (KeyValuePair<ulong, ChannelMapping>?)x)
713 .FirstOrDefault();
714 hasChannelZero = mappedChannels
715 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == 0)
716 .Any();
717 }
718
719 if (!recursed && !mappedChannel.HasValue && !message.User.Channel.IsPrivateChannel && hasChannelZero)
720 {
721 logger.LogInformation("Receieved message from unmapped channel whose provider contains ID 0. Remapping...");
722 await RemapProvider(provider, cancellationToken);
723 logger.LogTrace("Resume processing original message...");
724 await ProcessMessage(provider, message, true, cancellationToken);
725 return;
726 }
727
728 ValueTask TextReply(string reply) => SendMessage(
729 new List<ulong>
730 {
731 message.User.Channel.RealId,
732 },
733 message,
735 {
736 Text = reply,
737 },
738 cancellationToken);
739
740 if (message.User.Channel.IsPrivateChannel)
741 lock (mappedChannels)
742 if (!mappedChannel.HasValue)
743 {
744 ulong newId;
745 lock (synchronizationLock)
746 newId = channelIdCounter++;
747 logger.LogTrace(
748 "Mapping private channel {connectionName}:{channelFriendlyName} as {newId}",
750 message.User.FriendlyName,
751 newId);
752 mappedChannels.Add(newId, new ChannelMapping(message.User.Channel)
753 {
754 ProviderChannelId = message.User.Channel.RealId,
755 ProviderId = providerId,
756 });
757
758 logger.LogTrace(
759 "Mapping DM {connectionName}:{userId} ({userFriendlyName}) as {newId}",
761 message.User.RealId,
762 message.User.FriendlyName,
763 newId);
764 message.User.Channel.RealId = newId;
765 }
766 else
767 message.User.Channel.RealId = mappedChannel.Value.Key;
768 else
769 {
770 if (!mappedChannel.HasValue)
771 {
772 logger.LogError(
773 "Error mapping message: Provider ID: {providerId}, Channel Real ID: {realId}",
774 providerId,
775 message.User.Channel.RealId);
776 logger.LogTrace("message: {messageJson}", JsonConvert.SerializeObject(message));
777 lock (mappedChannels)
778 logger.LogTrace("mappedChannels: {mappedChannelsJson}", JsonConvert.SerializeObject(mappedChannels));
779 await TextReply("TGS: Processing error, check logs!");
780 return;
781 }
782
783 var mappingChannelRepresentation = mappedChannel.Value.Value.Channel;
784
785 message.User.Channel.RealId = mappingChannelRepresentation.RealId;
786 message.User.Channel.Tag = mappingChannelRepresentation.Tag;
787 message.User.Channel.IsAdminChannel = mappingChannelRepresentation.IsAdminChannel;
788 }
789
790 var trimmedMessage = message.Content.Trim();
791 if (trimmedMessage.Length == 0)
792 return;
793
794 var splits = new List<string>(trimmedMessage.Split(' ', StringSplitOptions.RemoveEmptyEntries));
795 var address = splits[0];
796 if (address.Length > 1 && (address.Last() == ':' || address.Last() == ','))
797 address = address[0..^1];
798
799 var addressed =
800 address.Equals(CommonMention, StringComparison.OrdinalIgnoreCase)
801 || address.Equals(provider.BotMention, StringComparison.OrdinalIgnoreCase);
802
803 // no mention
804 if (!addressed && !message.User.Channel.IsPrivateChannel)
805 return;
806
807 logger.LogTrace(
808 "Start processing command: {message}. User (True provider Id): {profiderId}",
809 message.Content,
810 JsonConvert.SerializeObject(message.User));
811 try
812 {
813 if (addressed)
814 splits.RemoveAt(0);
815
816 if (splits.Count == 0)
817 {
818 // just a mention
819 await TextReply("Hi!");
820 return;
821 }
822
823 var command = splits[0];
824 splits.RemoveAt(0);
825 var arguments = String.Join(" ", splits);
826
827 Tuple<ICommand, IChatTrackingContext?>? GetCommand(string command)
828 {
829 if (!builtinCommands.TryGetValue(command, out var handler))
830 return trackingContexts
831 .Where(trackingContext => trackingContext.Active)
832 .SelectMany(trackingContext => trackingContext.CustomCommands.Select(customCommand => Tuple.Create<ICommand, IChatTrackingContext?>(customCommand, trackingContext)))
833 .Where(tuple => tuple.Item1.Name.Equals(command, StringComparison.OrdinalIgnoreCase))
834 .FirstOrDefault();
835
836 return Tuple.Create<ICommand, IChatTrackingContext?>(handler, null);
837 }
838
839 const string UnknownCommandMessage = "TGS: Unknown command! Type '?' or 'help' for available commands.";
840
841 if (command.Equals("help", StringComparison.OrdinalIgnoreCase) || command == "?")
842 {
843 string helpText;
844 if (splits.Count == 0)
845 {
846 var allCommands = builtinCommands.Select(x => x.Value).ToList();
847 allCommands.AddRange(
848 trackingContexts
849 .SelectMany(
850 x => x.CustomCommands));
851 helpText = String.Format(CultureInfo.InvariantCulture, "Available commands (Type '?' or 'help' and then a command name for more details): {0}", String.Join(", ", allCommands.Select(x => x.Name)));
852 }
853 else
854 {
855 var helpTuple = GetCommand(splits[0]);
856 if (helpTuple != default)
857 {
858 var (helpHandler, _) = helpTuple;
859 helpText = String.Format(CultureInfo.InvariantCulture, "{0}: {1}{2}", helpHandler.Name, helpHandler.HelpText, helpHandler.AdminOnly ? " - May only be used in admin channels" : String.Empty);
860 }
861 else
862 helpText = UnknownCommandMessage;
863 }
864
865 await TextReply(helpText);
866 return;
867 }
868
869 var tuple = GetCommand(command);
870
871 if (tuple == default)
872 {
873 await TextReply(UnknownCommandMessage);
874 return;
875 }
876
877 var (commandHandler, trackingContext) = tuple;
878
879 if (trackingContext?.Active == false)
880 {
881 await TextReply("TGS: The server is rebooting, please try again later");
882 return;
883 }
884
885 if (commandHandler.AdminOnly && !message.User.Channel.IsAdminChannel)
886 {
887 await TextReply("TGS: Use this command in an admin channel!");
888 return;
889 }
890
891 var result = await commandHandler.Invoke(arguments, message.User, cancellationToken);
892 if (result != null)
893 await SendMessage(new List<ulong> { message.User.Channel.RealId }, message, result, cancellationToken);
894 }
895 catch (OperationCanceledException ex)
896 {
897 logger.LogTrace(ex, "Command processing canceled!");
898 }
899 catch (Exception e)
900 {
901 // error bc custom commands should reply about why it failed
902 logger.LogError(e, "Error processing chat command");
903 await TextReply("TGS: Internal error processing command! Check server logs!");
904 }
905 finally
906 {
907 logger.LogTrace("Done processing command.");
908 }
909 }
910
916 async Task MonitorMessages(CancellationToken cancellationToken)
917 {
918 logger.LogTrace("Starting processing loop...");
919 var messageTasks = new Dictionary<IProvider, Task<Message?>>();
920 ValueTask activeProcessingTask = ValueTask.CompletedTask;
921 try
922 {
923 Task? updatedTask = null;
924 while (!cancellationToken.IsCancellationRequested)
925 {
926 if (updatedTask?.IsCompleted != false)
927 lock (synchronizationLock)
928 updatedTask = connectionsUpdated.Task;
929
930 // prune disconnected providers
931 foreach (var disposedProviderMessageTaskKvp in messageTasks.Where(x => x.Key.Disposed).ToList())
932 messageTasks.Remove(disposedProviderMessageTaskKvp.Key);
933
934 // add new ones
935 lock (providers)
936 foreach (var providerKvp in providers)
937 if (!messageTasks.ContainsKey(providerKvp.Value))
938 messageTasks.Add(
939 providerKvp.Value,
940 providerKvp.Value.NextMessage(cancellationToken));
941
942 if (messageTasks.Count == 0)
943 {
944 logger.LogTrace("No providers active, pausing messsage monitoring...");
945 await updatedTask.WaitAsync(cancellationToken);
946 logger.LogTrace("Resuming message monitoring...");
947 continue;
948 }
949
950 // wait for a message
951 await Task.WhenAny(updatedTask, Task.WhenAny(messageTasks.Select(x => x.Value)));
952
953 // process completed ones
954 foreach (var completedMessageTaskKvp in messageTasks.Where(x => x.Value.IsCompleted).ToList())
955 {
956 var provider = completedMessageTaskKvp.Key;
957 messageTasks.Remove(provider);
958
959 if (provider.Disposed) // valid to receive one, but don't process it
960 continue;
961
962 var message = await completedMessageTaskKvp.Value;
963 var messageNumber = Interlocked.Increment(ref messagesProcessed);
964
965 async ValueTask WrapProcessMessage()
966 {
967 var localActiveProcessingTask = activeProcessingTask;
968 using (LogContext.PushProperty(SerilogContextHelper.ChatMessageIterationContextProperty, messageNumber))
969 try
970 {
971 await ProcessMessage(provider, message, false, cancellationToken);
972 }
973 catch (Exception ex)
974 {
975 logger.LogError(ex, "Error processing message {messageNumber}!", messageNumber);
976 }
977
978 await localActiveProcessingTask;
979 }
980
981 activeProcessingTask = WrapProcessMessage();
982 }
983 }
984 }
985 catch (OperationCanceledException ex)
986 {
987 logger.LogTrace(ex, "Message processing loop cancelled!");
988 }
989 catch (Exception e)
990 {
991 logger.LogError(e, "Message loop crashed!");
992 }
993 finally
994 {
995 await activeProcessingTask;
996 }
997
998 logger.LogTrace("Leaving message processing loop");
999 }
1000
1009 ValueTask SendMessage(IEnumerable<ulong> channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
1010 {
1011 var channelIdsList = channelIds.ToList();
1012
1013 logger.LogTrace(
1014 "Chat send \"{message}\"{embed} to channels: [{channelIdsCommaSeperated}]",
1015 message.Text,
1016 message.Embed != null ? " (with embed)" : String.Empty,
1017 String.Join(", ", channelIdsList));
1018
1019 if (channelIdsList.Count == 0)
1020 return ValueTask.CompletedTask;
1021
1023 channelIdsList.Select(x =>
1024 {
1025 ChannelMapping? channelMapping;
1026 lock (mappedChannels)
1027 if (!mappedChannels.TryGetValue(x, out channelMapping))
1028 return ValueTask.CompletedTask;
1029 IProvider? provider;
1030 lock (providers)
1031 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
1032 return ValueTask.CompletedTask;
1033 return provider.SendMessage(replyTo, message, channelMapping.ProviderChannelId, cancellationToken);
1034 }));
1035 }
1036
1042 {
1043 await Task.WhenAll(providers.Select(x => x.Value.InitialConnectionJob));
1044 logger.LogTrace("Initial provider connection task completed");
1045 }
1046
1051 void AddMessageTask(Task task)
1052 {
1053 async Task Wrap(Task originalTask)
1054 {
1055 await originalTask;
1056 try
1057 {
1058 await task;
1059 }
1060 catch (OperationCanceledException ex)
1061 {
1062 logger.LogDebug(ex, "Async chat message cancelled!");
1063 }
1064 catch (Exception ex)
1065 {
1066 logger.LogError(ex, "Error in asynchronous chat message!");
1067 }
1068 }
1069
1070 lock (handlerCts)
1071 messageSendTask = Wrap(messageSendTask);
1072 }
1073
1080 void QueueMessageInternal(MessageContent message, Func<IEnumerable<ulong>> channelIdsFactory, bool waitForConnections)
1081 {
1082 async Task SendMessageTask()
1083 {
1084 var cancellationToken = handlerCts.Token;
1085 if (waitForConnections)
1086 await initialProviderConnectionsTask!.WaitAsync(cancellationToken);
1087
1088 await SendMessage(
1089 channelIdsFactory(),
1090 null,
1091 message,
1092 cancellationToken);
1093 }
1094
1095 AddMessageTask(SendMessageTask());
1096 }
1097
1104 void QueueMessageGeneric(Predicate<ChannelMapping> channelSelector, string message, string? prefix)
1105 {
1106 ArgumentNullException.ThrowIfNull(message);
1107
1108 if (prefix != null)
1109 {
1110 message = $"{prefix}: {message}";
1111 }
1112
1113 if (!initialProviderConnectionsTask!.IsCompleted)
1114 logger.LogTrace("Waiting for initial provider connections before sending chat message...");
1115
1116 // Reimplementing QueueMessage
1117 QueueMessageInternal(
1118 new MessageContent
1119 {
1120 Text = message,
1121 },
1122 () =>
1123 {
1124 // so it doesn't change while we're using it
1125 lock (mappedChannels)
1126 return mappedChannels.Where(x => channelSelector(x.Value)).Select(x => x.Key).ToList();
1127 },
1128 true);
1129 }
1130 }
1131}
Information about an engine installation.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Represents a mapping of a ChannelRepresentation.RealId.
string ConnectionName
The name of the connection the ChannelRepresentation belongs to.
bool IsAdminChannel
If this is considered a channel for admin commands.
const string CommonMention
The common bot mention.
long messagesProcessed
The number of Messages processed.
readonly IProviderFactory providerFactory
The IProviderFactory for the ChatManager.
ValueTask SendMessage(IEnumerable< ulong > channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
Asynchronously send a given message to a set of channelIds .
readonly object synchronizationLock
Used for various lock statements throughout this class.
ChatManager(IProviderFactory providerFactory, ICommandFactory commandFactory, IServerControl serverControl, ILoggerFactory loggerFactory, ILogger< ChatManager > logger, IEnumerable< Models.ChatBot > initialChatBots)
Initializes a new instance of the ChatManager class.
Task? initialProviderConnectionsTask
A Task that represents the IProviders initial connection.
ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
Handle a restart of the server.A ValueTask representing the running operation.
void QueueMessageInternal(MessageContent message, Func< IEnumerable< ulong > > channelIdsFactory, bool waitForConnections)
Adds a given message to the send queue.
readonly List< Models.ChatBot > activeChatBots
The active Models.ChatBot for the ChatManager.
async Task InitialConnection()
Aggregate all IProvider.InitialConnectionJobs into one <sse cref="Task">.
async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
Remap the channels for a given provider .
async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
Processes a message .
readonly Dictionary< long, IProvider > providers
Map of IProviders in use, keyed by ChatBotSettings EntityId.Id.
async Task MonitorMessages(CancellationToken cancellationToken)
Monitors active providers for new Messages.
ICustomCommandHandler? customCommandHandler
The ICustomCommandHandler for the ChangeChannels(long, IEnumerable<Models.ChatChannel>,...
async ValueTask ChangeChannels(long connectionId, IEnumerable< Models.ChatChannel > newChannels, CancellationToken cancellationToken)
readonly Dictionary< string, ICommand > builtinCommands
Unchanging ICommands in the ChatManager mapped by ICommand.Name.
void QueueMessage(MessageContent message, IEnumerable< ulong > channelIds)
Queue a chat message to a given set of channelIds .
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the ChatManager.
void QueueMessageGeneric(Predicate< ChannelMapping > channelSelector, string message, string? prefix)
Queues a message to a selected set of ChannelMappings.
Task messageSendTask
A Task that represents all sent messages.
Task? chatHandler
The Task that monitors incoming chat messages.
async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
Change chat settings. If the Api.Models.EntityId.Id is not currently in use, a new connection will be...
ulong channelIdCounter
Used for remapping ChannelRepresentation.RealIds.
readonly ICommandFactory commandFactory
The ICommandFactory for the ChatManager.
IChatTrackingContext CreateTrackingContext()
Start tracking Commands.CustomCommands and ChannelRepresentations.A new IChatTrackingContext.
void AddMessageTask(Task task)
Adds a given task to messageSendTask.
readonly ILogger< ChatManager > logger
The ILogger for the ChatManager.
readonly ConcurrentDictionary< long, SemaphoreSlim > changeChannelSemaphores
Map of SemaphoreSlims used to guard concurrent access to ChangeChannels(long, IEnumerable<Models....
void QueueRawDeploymentMessage(string message)
Queue a chat message to configured deployment channels.
readonly IRestartRegistration restartRegistration
The IRestartRegistration for the ChatManager.
void QueueWatchdogMessage(string message)
Queue a chat message to configured watchdog channels.
readonly CancellationTokenSource handlerCts
The CancellationTokenSource for chatHandler.
async Task StartAsync(CancellationToken cancellationToken)
void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
Registers a customCommandHandler to use.
TaskCompletionSource connectionsUpdated
The TaskCompletionSource that completes when ChatBotSettingss change.
async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
Force an update with the active channels on all active IChatTrackingContexts.A ValueTask representing...
async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
Disconnects and deletes a given connection.A Task representing the running operation.
Func< string?, string, Action< bool > > QueueDeploymentMessage(Models.RevisionInformation revisionInformation, Models.RevisionInformation? previousRevisionInformation, EngineVersion engineVersion, DateTimeOffset? estimatedCompletionTime, string? gitHubOwner, string? gitHubRepo, bool localCommitPushed)
Send the message for a deployment to configured deployment channels.A Func<T1, T2,...
async ValueTask< IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
Remove a IProvider from mappedChannels optionally removing the provider itself from providers and upd...
async Task StopAsync(CancellationToken cancellationToken)
readonly List< IChatTrackingContext > trackingContexts
The active IChatTrackingContexts for the ChatManager.
readonly Dictionary< ulong, ChannelMapping > mappedChannels
Map of ChannelRepresentation.RealIds to ChannelMappings.
string FriendlyName
The friendly name of the user.
Definition ChatUser.cs:31
ChannelRepresentation Channel
The ChannelRepresentation the user spoke from.
Definition ChatUser.cs:41
Represents a message received by a IProvider.
Definition Message.cs:9
ChatUser User
The ChatUser who sent the Message.
Definition Message.cs:18
Represents a message to send to a chat provider.
static async ValueTask< SemaphoreSlimContext > Lock(SemaphoreSlim semaphore, CancellationToken cancellationToken, ILogger? logger=null)
Asyncronously locks a semaphore .
Helpers for manipulating the Serilog.Context.LogContext.
const string ChatMessageIterationContextProperty
The Serilog.Context.LogContext property name for the ID of the chat message currently being processed...
Represents a command that can be invoked by talking to chat bots.
Definition ICommand.cs:12
ValueTask UpdateChannels(IEnumerable< ChannelRepresentation > newChannels, CancellationToken cancellationToken)
Called when newChannels are set.
For managing connected chat services.
Represents a tracking of dynamic chat json files.
Handles Commands.ICommands that map to those defined in a IChatTrackingContext.
IProvider CreateProvider(ChatBot settings)
Create a IProvider.
string BotMention
The string that indicates the IProvider was mentioned.
Definition IProvider.cs:30
bool Connected
If the IProvider is currently connected.
Definition IProvider.cs:20
Represents the lifetime of a IRestartHandler registration.
Represents a service that may take an updated Host assembly and run it, stopping the current assembly...
IRestartRegistration RegisterForRestart(IRestartHandler handler)
Register a given handler to run before stopping the server for a restart.