tgstation-server 6.16.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
ChatManager.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Globalization;
5using System.Linq;
6using System.Threading;
7using System.Threading.Tasks;
8
9using Microsoft.Extensions.Logging;
10
11using Newtonsoft.Json;
12
13using Serilog.Context;
14
23
25{
27#pragma warning disable CA1506 // TODO: Decomplexify
29 {
33 public const string CommonMention = "!tgs";
34
39
44
49
53 readonly ILoggerFactory loggerFactory;
54
58 readonly ILogger<ChatManager> logger;
59
63 readonly Dictionary<string, ICommand> builtinCommands;
64
68 readonly Dictionary<long, IProvider> providers;
69
73 readonly ConcurrentDictionary<long, SemaphoreSlim> changeChannelSemaphores;
74
78 readonly Dictionary<ulong, ChannelMapping> mappedChannels;
79
83 readonly List<IChatTrackingContext> trackingContexts;
84
88 readonly List<Models.ChatBot> activeChatBots;
89
93 readonly CancellationTokenSource handlerCts;
94
98 readonly object synchronizationLock;
99
104
109
114
119
123 TaskCompletionSource connectionsUpdated;
124
129
134
147 IServerControl serverControl,
148 ILoggerFactory loggerFactory,
149 ILogger<ChatManager> logger,
150 IEnumerable<Models.ChatBot> initialChatBots)
151 {
152 this.providerFactory = providerFactory ?? throw new ArgumentNullException(nameof(providerFactory));
153 this.commandFactory = commandFactory ?? throw new ArgumentNullException(nameof(commandFactory));
154 ArgumentNullException.ThrowIfNull(serverControl);
155 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
156 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
157 activeChatBots = initialChatBots?.ToList() ?? throw new ArgumentNullException(nameof(initialChatBots));
158
159 restartRegistration = serverControl.RegisterForRestart(this);
160
161 synchronizationLock = new object();
162
163 builtinCommands = new Dictionary<string, ICommand>(StringComparer.OrdinalIgnoreCase);
164 providers = new Dictionary<long, IProvider>();
165 changeChannelSemaphores = new ConcurrentDictionary<long, SemaphoreSlim>();
166 mappedChannels = new Dictionary<ulong, ChannelMapping>();
167 trackingContexts = new List<IChatTrackingContext>();
168 handlerCts = new CancellationTokenSource();
169 connectionsUpdated = new TaskCompletionSource();
170
171 messageSendTask = Task.CompletedTask;
173 }
174
176 public async ValueTask DisposeAsync()
177 {
178 logger.LogTrace("Disposing...");
179 restartRegistration.Dispose();
180 handlerCts.Dispose();
181 foreach (var providerKvp in providers)
182 await providerKvp.Value.DisposeAsync();
183
184 foreach (var providerKvp in changeChannelSemaphores)
185 providerKvp.Value.Dispose();
186
187 await messageSendTask;
188 }
189
191 public async ValueTask ChangeChannels(long connectionId, IEnumerable<Models.ChatChannel> newChannels, CancellationToken cancellationToken)
192 {
193 ArgumentNullException.ThrowIfNull(newChannels);
194
195 logger.LogTrace("ChangeChannels {connectionId}...", connectionId);
196 var semaphore = changeChannelSemaphores.GetOrAdd(connectionId, _ =>
197 {
198 logger.LogTrace("Creating ChangeChannels semaphore for connection ID {connectionId}...", connectionId);
199 return new SemaphoreSlim(1);
200 });
201 using (await SemaphoreSlimContext.Lock(semaphore, cancellationToken))
202 {
203 var provider = await RemoveProviderChannels(connectionId, false, cancellationToken);
204 if (provider == null)
205 return;
206
207 if (!provider.Connected)
208 {
209 logger.LogDebug("Cannot map channels, provider {providerId} disconnected!", connectionId);
210 return;
211 }
212
213 var results = await provider.MapChannels(newChannels, cancellationToken);
214 try
215 {
216 lock (activeChatBots)
217 {
218 var botToUpdate = activeChatBots.FirstOrDefault(bot => bot.Id == connectionId);
219 if (botToUpdate != null)
220 botToUpdate.Channels = newChannels
221 .Select(apiModel => new Models.ChatChannel
222 {
223 DiscordChannelId = apiModel.DiscordChannelId,
224 IrcChannel = apiModel.IrcChannel,
225 IsAdminChannel = apiModel.IsAdminChannel,
226 IsUpdatesChannel = apiModel.IsUpdatesChannel,
227 IsSystemChannel = apiModel.IsSystemChannel,
228 IsWatchdogChannel = apiModel.IsWatchdogChannel,
229 Tag = apiModel.Tag,
230 })
231 .ToList();
232 }
233
234 var newMappings = results.SelectMany(
235 kvp => kvp.Value.Select(
236 channelRepresentation => new ChannelMapping(channelRepresentation)
237 {
238 IsWatchdogChannel = kvp.Key.IsWatchdogChannel == true,
239 IsUpdatesChannel = kvp.Key.IsUpdatesChannel == true,
240 IsAdminChannel = kvp.Key.IsAdminChannel == true,
241 IsSystemChannel = kvp.Key.IsSystemChannel == true,
242 ProviderChannelId = channelRepresentation.RealId,
243 ProviderId = connectionId,
244 }));
245
246 ulong baseId;
248 {
249 baseId = channelIdCounter;
250 channelIdCounter += (ulong)results.Count;
251 }
252
253 lock (mappedChannels)
254 {
255 lock (providers)
256 if (!providers.TryGetValue(connectionId, out var verify) || verify != provider) // aborted again
257 return;
258 foreach (var newMapping in newMappings)
259 {
260 var newId = baseId++;
261 logger.LogTrace("Mapping channel {connectionName}:{channelFriendlyName} as {newId}", newMapping.Channel.ConnectionName, newMapping.Channel.FriendlyName, newId);
262 mappedChannels.Add(newId, newMapping);
263 newMapping.Channel.RealId = newId;
264 }
265 }
266
267 // we only want to update contexts if everything at startup has connected once already
268 // otherwise we could send an incomplete channel set to the DMAPI, which will then spout all its queued messages into it instead of all relevant chatbots
269 // The watchdog can call this if it needs to after starting up
270 if (initialProviderConnectionsTask!.IsCompleted)
271 await UpdateTrackingContexts(cancellationToken);
272 }
273 finally
274 {
275 provider.InitialMappingComplete();
276 }
277 }
278 }
279
281 public async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
282 {
283 ArgumentNullException.ThrowIfNull(newSettings);
284
285 logger.LogTrace("ChangeSettings...");
286
287 Task disconnectTask;
288 IProvider? provider = null;
289 var newSettingsId = Models.ModelExtensions.Require(newSettings, x => x.Id);
290 var newSettingsEnabled = Models.ModelExtensions.Require(newSettings, x => x.Enabled);
291 lock (providers)
292 {
293 // raw settings changes forces a rebuild of the provider
294 if (providers.ContainsKey(newSettingsId))
295 disconnectTask = DeleteConnection(newSettingsId, cancellationToken);
296 else
297 disconnectTask = Task.CompletedTask;
298 if (newSettingsEnabled)
299 {
300 provider = providerFactory.CreateProvider(newSettings);
301 providers.Add(newSettingsId, provider);
302 }
303 }
304
305 lock (mappedChannels)
306 foreach (var oldMappedChannelId in mappedChannels.Where(x => x.Value.ProviderId == newSettingsId).Select(x => x.Key).ToList())
307 mappedChannels.Remove(oldMappedChannelId);
308
309 await disconnectTask;
310
312 {
313 // same thread shennanigans
314 var oldOne = connectionsUpdated;
315 connectionsUpdated = new TaskCompletionSource();
316 oldOne.SetResult();
317 }
318
319 var reconnectionUpdateTask = provider?.SetReconnectInterval(
320 Models.ModelExtensions.Require(newSettings, x => x.ReconnectionInterval),
321 newSettingsEnabled)
322 ?? Task.CompletedTask;
323 lock (activeChatBots)
324 {
325 var originalChatBot = activeChatBots.FirstOrDefault(bot => bot.Id == newSettings.Id);
326 if (originalChatBot != null)
327 activeChatBots.Remove(originalChatBot);
328
329 activeChatBots.Add(new Models.ChatBot(newSettings.Channels)
330 {
331 Id = newSettings.Id,
332 ConnectionString = newSettings.ConnectionString,
333 Enabled = newSettings.Enabled,
334 Name = newSettings.Name,
335 ReconnectionInterval = newSettings.ReconnectionInterval,
336 Provider = newSettings.Provider,
337 });
338 }
339
340 await reconnectionUpdateTask;
341 }
342
344 public void QueueMessage(MessageContent message, IEnumerable<ulong> channelIds)
345 {
346 ArgumentNullException.ThrowIfNull(message);
347 ArgumentNullException.ThrowIfNull(channelIds);
348
349 QueueMessageInternal(message, () => channelIds, false);
350 }
351
353 public void QueueWatchdogMessage(string message)
354 {
355 ArgumentNullException.ThrowIfNull(message);
356
357 message = String.Format(CultureInfo.InvariantCulture, "WD: {0}", message);
358
359 if (!initialProviderConnectionsTask!.IsCompleted)
360 logger.LogTrace("Waiting for initial provider connections before sending watchdog message...");
361
362 // Reimplementing QueueMessage
365 {
366 Text = message,
367 },
368 () =>
369 {
370 // so it doesn't change while we're using it
371 lock (mappedChannels)
372 return mappedChannels.Where(x => x.Value.IsWatchdogChannel).Select(x => x.Key).ToList();
373 },
374 true);
375 }
376
378 public Func<string?, string, Action<bool>> QueueDeploymentMessage(
379 Models.RevisionInformation revisionInformation,
380 Models.RevisionInformation? previousRevisionInformation,
381 EngineVersion engineVersion,
382 DateTimeOffset? estimatedCompletionTime,
383 string? gitHubOwner,
384 string? gitHubRepo,
385 bool localCommitPushed)
386 {
387 List<ulong> wdChannels;
388 lock (mappedChannels) // so it doesn't change while we're using it
389 wdChannels = mappedChannels.Where(x => x.Value.IsUpdatesChannel).Select(x => x.Key).ToList();
390
391 logger.LogTrace("Sending deployment message for RevisionInformation: {revisionInfoId}", revisionInformation.Id);
392
393 var callbacks = new List<Func<string?, string, ValueTask<Func<bool, ValueTask>>>>();
394
395 var task = Task.WhenAll(
396 wdChannels.Select(
397 async x =>
398 {
399 ChannelMapping? channelMapping;
400 lock (mappedChannels)
401 if (!mappedChannels.TryGetValue(x, out channelMapping))
402 return;
403 IProvider? provider;
404 lock (providers)
405 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
406 return;
407 try
408 {
409 var callback = await provider.SendUpdateMessage(
410 revisionInformation,
411 previousRevisionInformation,
412 engineVersion,
413 estimatedCompletionTime,
414 gitHubOwner,
415 gitHubRepo,
416 channelMapping.ProviderChannelId,
417 localCommitPushed,
418 handlerCts.Token);
419
420 lock (callbacks)
421 callbacks.Add(callback);
422 }
423 catch (Exception ex)
424 {
425 logger.LogWarning(
426 ex,
427 "Error sending deploy message to provider {providerId}!",
428 channelMapping.ProviderId);
429 }
430 }));
431
432 AddMessageTask(task);
433
434 Task callbackTask;
435 Func<bool, Task>? finalUpdateAction = null;
436 async Task CallbackTask(string? errorMessage, string dreamMakerOutput)
437 {
438 await task;
439 var callbackResults = await ValueTaskExtensions.WhenAll(
440 callbacks.Select(
441 x => x(
442 errorMessage,
443 dreamMakerOutput)),
444 callbacks.Count);
445
446 finalUpdateAction = active => ValueTaskExtensions.WhenAll(callbackResults.Select(finalizerCallback => finalizerCallback(active))).AsTask();
447 }
448
449 async Task CompletionTask(bool active)
450 {
451 try
452 {
453 await callbackTask;
454 }
455 catch
456 {
457 // Handled in AddMessageTask
458 return;
459 }
460
461 AddMessageTask(finalUpdateAction!(active));
462 }
463
464 return (errorMessage, dreamMakerOutput) =>
465 {
466 callbackTask = CallbackTask(errorMessage, dreamMakerOutput);
467 AddMessageTask(callbackTask);
468 return active => AddMessageTask(CompletionTask(active));
469 };
470 }
471
473 public async Task StartAsync(CancellationToken cancellationToken)
474 {
475 foreach (var tgsCommand in commandFactory.GenerateCommands())
476 builtinCommands.Add(tgsCommand.Name.ToUpperInvariant(), tgsCommand);
477 var initialChatBots = activeChatBots.ToList();
478 await ValueTaskExtensions.WhenAll(initialChatBots.Select(x => ChangeSettings(x, cancellationToken)));
479 initialProviderConnectionsTask = InitialConnection();
480 chatHandler = MonitorMessages(handlerCts.Token);
481 }
482
484 public async Task StopAsync(CancellationToken cancellationToken)
485 {
486 handlerCts.Cancel();
487 if (chatHandler != null)
488 await chatHandler;
489 await Task.WhenAll(providers.Select(x => x.Key).Select(x => DeleteConnection(x, cancellationToken)));
490 await messageSendTask;
491 }
492
495 {
496 if (customCommandHandler == null)
497 throw new InvalidOperationException("RegisterCommandHandler() hasn't been called!");
498
499 IChatTrackingContext context = null!;
500 lock (mappedChannels)
501 context = new ChatTrackingContext(
502 customCommandHandler,
503 mappedChannels.Select(y => y.Value.Channel),
504 loggerFactory.CreateLogger<ChatTrackingContext>(),
505 () =>
506 {
507 lock (trackingContexts)
508 trackingContexts.Remove(context);
509 });
510
511 lock (trackingContexts)
512 trackingContexts.Add(context);
513
514 return context;
515 }
516
518 public async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
519 {
520 var logMessageSent = 0;
521 async Task UpdateTrackingContext(IChatTrackingContext channelSink, IEnumerable<ChannelRepresentation> channels)
522 {
523 if (Interlocked.Exchange(ref logMessageSent, 1) == 0)
524
525 await channelSink.UpdateChannels(channels, cancellationToken);
526 }
527
528 var waitingForInitialConnection = !initialProviderConnectionsTask!.IsCompleted;
529 if (waitingForInitialConnection)
530 {
531 logger.LogTrace("Waiting for initial chat bot connections before updating tracking contexts...");
532 await initialProviderConnectionsTask.WaitAsync(cancellationToken);
533 }
534
535 List<Task> tasks;
536 lock (mappedChannels)
537 lock (trackingContexts)
538 tasks = trackingContexts.Select(x => UpdateTrackingContext(x, mappedChannels.Select(y => y.Value.Channel))).ToList();
539
540 if (waitingForInitialConnection)
541 if (tasks.Count > 0)
542 logger.LogTrace("Updating chat tracking contexts...");
543 else
544 logger.LogTrace("No chat tracking contexts to update");
545
546 await Task.WhenAll(tasks);
547 }
548
550 public void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
551 {
552 if (this.customCommandHandler != null)
553 throw new InvalidOperationException("RegisterCommandHandler() already called!");
554 this.customCommandHandler = customCommandHandler ?? throw new ArgumentNullException(nameof(customCommandHandler));
555 }
556
558 public async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
559 {
560 logger.LogTrace("DeleteConnection {connectionId}", connectionId);
561 var hasSemaphore = changeChannelSemaphores.TryRemove(connectionId, out var semaphore);
562 using (hasSemaphore
563 ? semaphore
564 : null)
565 using (hasSemaphore
566 ? await SemaphoreSlimContext.Lock(semaphore!, cancellationToken)
567 : null)
568 {
569 var provider = await RemoveProviderChannels(connectionId, true, cancellationToken);
570 if (provider != null)
571 {
572 var startTime = DateTimeOffset.UtcNow;
573 try
574 {
575 await provider.Disconnect(cancellationToken);
576 }
577 catch (Exception ex)
578 {
579 logger.LogError(ex, "Error disconnecting connection {connectionId}!", connectionId);
580 }
581
582 await provider.DisposeAsync();
583 var duration = DateTimeOffset.UtcNow - startTime;
584 if (duration.TotalSeconds > 3)
585 logger.LogWarning("Disconnecting a {providerType} took {totalSeconds}s!", provider.GetType().Name, duration.TotalSeconds);
586 }
587 else
588 logger.LogTrace("DeleteConnection: ID {connectionId} doesn't exist!", connectionId);
589 }
590 }
591
593 public ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
594 {
595 var message = updateVersion == null
596 ? $"TGS: {(handlerMayDelayShutdownWithExtremelyLongRunningTasks ? "Graceful shutdown" : "Going down")}..."
597 : $"TGS: Updating to version {updateVersion}...";
598 List<ulong> systemChannels;
599 lock (mappedChannels) // so it doesn't change while we're using it
600 systemChannels = mappedChannels
601 .Where(x => x.Value.IsSystemChannel)
602 .Select(x => x.Key)
603 .ToList();
604
605 return SendMessage(
606 systemChannels,
607 null,
609 {
610 Text = message,
611 },
612 cancellationToken);
613 }
614
622 async ValueTask<IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
623 {
624 logger.LogTrace("RemoveProviderChannels {connectionId}...", connectionId);
625 IProvider? provider;
626 lock (providers)
627 {
628 if (!providers.TryGetValue(connectionId, out provider))
629 {
630 logger.LogTrace("Aborted, no such provider!");
631 return null;
632 }
633
634 if (removeProvider)
635 providers.Remove(connectionId);
636 }
637
638 ValueTask trackingContextsUpdateTask;
639 lock (mappedChannels)
640 {
641 foreach (var mappedConnectionChannel in mappedChannels.Where(x => x.Value.ProviderId == connectionId).Select(x => x.Key).ToList())
642 mappedChannels.Remove(mappedConnectionChannel);
643
644 var newMappedChannels = mappedChannels.Select(y => y.Value.Channel).ToList();
645
646 if (removeProvider)
647 lock (trackingContexts)
648 trackingContextsUpdateTask = ValueTaskExtensions.WhenAll(trackingContexts.Select(x => x.UpdateChannels(newMappedChannels, cancellationToken)));
649 else
650 trackingContextsUpdateTask = ValueTask.CompletedTask;
651 }
652
653 await trackingContextsUpdateTask;
654
655 return provider;
656 }
657
664 async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
665 {
666 logger.LogTrace("Remapping channels for provider reconnection...");
667 IEnumerable<Models.ChatChannel>? channelsToMap;
668 long providerId;
669 lock (providers)
670 providerId = providers.Where(x => x.Value == provider).Select(x => x.Key).First();
671
672 lock (activeChatBots)
673 channelsToMap = activeChatBots.FirstOrDefault(x => x.Id == providerId)?.Channels;
674
675 if (channelsToMap?.Any() ?? false)
676 await ChangeChannels(providerId, channelsToMap, cancellationToken);
677 }
678
687#pragma warning disable CA1502
688 async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
689#pragma warning restore CA1502
690 {
691 if (!provider.Connected)
692 {
693 logger.LogTrace("Abort message processing because provider is disconnected!");
694 return;
695 }
696
697 // provider reconnected, remap channels.
698 if (message == null)
699 {
700 await RemapProvider(provider, cancellationToken);
701 return;
702 }
703
704 // map the channel if it's private and we haven't seen it
705 var providerChannelId = message.User.Channel.RealId;
706 KeyValuePair<ulong, ChannelMapping>? mappedChannel;
707 long providerId;
708 bool hasChannelZero;
709 lock (providers)
710 {
711 // important, otherwise we could end up processing during shutdown
712 cancellationToken.ThrowIfCancellationRequested();
713
714 var providerIdNullable = providers
715 .Where(x => x.Value == provider)
716 .Select(x => (long?)x.Key)
717 .FirstOrDefault();
718
719 if (!providerIdNullable.HasValue)
720 {
721 // possible to have a message queued and then the provider immediately disconnects
722 logger.LogDebug("Unable to process command \"{command}\" due to provider disconnecting", message.Content);
723 return;
724 }
725
726 providerId = providerIdNullable.Value;
727 mappedChannel = mappedChannels
728 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == providerChannelId)
729 .Select(x => (KeyValuePair<ulong, ChannelMapping>?)x)
730 .FirstOrDefault();
731 hasChannelZero = mappedChannels
732 .Where(x => x.Value.ProviderId == providerId && x.Value.ProviderChannelId == 0)
733 .Any();
734 }
735
736 if (!recursed && !mappedChannel.HasValue && !message.User.Channel.IsPrivateChannel && hasChannelZero)
737 {
738 logger.LogInformation("Receieved message from unmapped channel whose provider contains ID 0. Remapping...");
739 await RemapProvider(provider, cancellationToken);
740 logger.LogTrace("Resume processing original message...");
741 await ProcessMessage(provider, message, true, cancellationToken);
742 return;
743 }
744
745 ValueTask TextReply(string reply) => SendMessage(
746 new List<ulong>
747 {
748 message.User.Channel.RealId,
749 },
750 message,
752 {
753 Text = reply,
754 },
755 cancellationToken);
756
757 if (message.User.Channel.IsPrivateChannel)
758 lock (mappedChannels)
759 if (!mappedChannel.HasValue)
760 {
761 ulong newId;
762 lock (synchronizationLock)
763 newId = channelIdCounter++;
764 logger.LogTrace(
765 "Mapping private channel {connectionName}:{channelFriendlyName} as {newId}",
767 message.User.FriendlyName,
768 newId);
769 mappedChannels.Add(newId, new ChannelMapping(message.User.Channel)
770 {
771 ProviderChannelId = message.User.Channel.RealId,
772 ProviderId = providerId,
773 });
774
775 logger.LogTrace(
776 "Mapping DM {connectionName}:{userId} ({userFriendlyName}) as {newId}",
778 message.User.RealId,
779 message.User.FriendlyName,
780 newId);
781 message.User.Channel.RealId = newId;
782 }
783 else
784 message.User.Channel.RealId = mappedChannel.Value.Key;
785 else
786 {
787 if (!mappedChannel.HasValue)
788 {
789 logger.LogError(
790 "Error mapping message: Provider ID: {providerId}, Channel Real ID: {realId}",
791 providerId,
792 message.User.Channel.RealId);
793 logger.LogTrace("message: {messageJson}", JsonConvert.SerializeObject(message));
794 lock (mappedChannels)
795 logger.LogTrace("mappedChannels: {mappedChannelsJson}", JsonConvert.SerializeObject(mappedChannels));
796 await TextReply("TGS: Processing error, check logs!");
797 return;
798 }
799
800 var mappingChannelRepresentation = mappedChannel.Value.Value.Channel;
801
802 message.User.Channel.RealId = mappingChannelRepresentation.RealId;
803 message.User.Channel.Tag = mappingChannelRepresentation.Tag;
804 message.User.Channel.IsAdminChannel = mappingChannelRepresentation.IsAdminChannel;
805 }
806
807 var trimmedMessage = message.Content.Trim();
808 if (trimmedMessage.Length == 0)
809 return;
810
811 var splits = new List<string>(trimmedMessage.Split(' ', StringSplitOptions.RemoveEmptyEntries));
812 var address = splits[0];
813 if (address.Length > 1 && (address.Last() == ':' || address.Last() == ','))
814 address = address[0..^1];
815
816 var addressed =
817 address.Equals(CommonMention, StringComparison.OrdinalIgnoreCase)
818 || address.Equals(provider.BotMention, StringComparison.OrdinalIgnoreCase);
819
820 // no mention
821 if (!addressed && !message.User.Channel.IsPrivateChannel)
822 return;
823
824 logger.LogTrace(
825 "Start processing command: {message}. User (True provider Id): {profiderId}",
826 message.Content,
827 JsonConvert.SerializeObject(message.User));
828 try
829 {
830 if (addressed)
831 splits.RemoveAt(0);
832
833 if (splits.Count == 0)
834 {
835 // just a mention
836 await TextReply("Hi!");
837 return;
838 }
839
840 var command = splits[0];
841 splits.RemoveAt(0);
842 var arguments = String.Join(" ", splits);
843
844 Tuple<ICommand, IChatTrackingContext?>? GetCommand(string command)
845 {
846 if (!builtinCommands.TryGetValue(command, out var handler))
847 return trackingContexts
848 .Where(trackingContext => trackingContext.Active)
849 .SelectMany(trackingContext => trackingContext.CustomCommands.Select(customCommand => Tuple.Create<ICommand, IChatTrackingContext?>(customCommand, trackingContext)))
850 .Where(tuple => tuple.Item1.Name.Equals(command, StringComparison.OrdinalIgnoreCase))
851 .FirstOrDefault();
852
853 return Tuple.Create<ICommand, IChatTrackingContext?>(handler, null);
854 }
855
856 const string UnknownCommandMessage = "TGS: Unknown command! Type '?' or 'help' for available commands.";
857
858 if (command.Equals("help", StringComparison.OrdinalIgnoreCase) || command == "?")
859 {
860 string helpText;
861 if (splits.Count == 0)
862 {
863 var allCommands = builtinCommands.Select(x => x.Value).ToList();
864 allCommands.AddRange(
865 trackingContexts
866 .SelectMany(
867 x => x.CustomCommands));
868 helpText = String.Format(CultureInfo.InvariantCulture, "Available commands (Type '?' or 'help' and then a command name for more details): {0}", String.Join(", ", allCommands.Select(x => x.Name)));
869 }
870 else
871 {
872 var helpTuple = GetCommand(splits[0]);
873 if (helpTuple != default)
874 {
875 var (helpHandler, _) = helpTuple;
876 helpText = String.Format(CultureInfo.InvariantCulture, "{0}: {1}{2}", helpHandler.Name, helpHandler.HelpText, helpHandler.AdminOnly ? " - May only be used in admin channels" : String.Empty);
877 }
878 else
879 helpText = UnknownCommandMessage;
880 }
881
882 await TextReply(helpText);
883 return;
884 }
885
886 var tuple = GetCommand(command);
887
888 if (tuple == default)
889 {
890 await TextReply(UnknownCommandMessage);
891 return;
892 }
893
894 var (commandHandler, trackingContext) = tuple;
895
896 if (trackingContext?.Active == false)
897 {
898 await TextReply("TGS: The server is rebooting, please try again later");
899 return;
900 }
901
902 if (commandHandler.AdminOnly && !message.User.Channel.IsAdminChannel)
903 {
904 await TextReply("TGS: Use this command in an admin channel!");
905 return;
906 }
907
908 var result = await commandHandler.Invoke(arguments, message.User, cancellationToken);
909 if (result != null)
910 await SendMessage(new List<ulong> { message.User.Channel.RealId }, message, result, cancellationToken);
911 }
912 catch (OperationCanceledException ex)
913 {
914 logger.LogTrace(ex, "Command processing canceled!");
915 }
916 catch (Exception e)
917 {
918 // error bc custom commands should reply about why it failed
919 logger.LogError(e, "Error processing chat command");
920 await TextReply("TGS: Internal error processing command! Check server logs!");
921 }
922 finally
923 {
924 logger.LogTrace("Done processing command.");
925 }
926 }
927
933 async Task MonitorMessages(CancellationToken cancellationToken)
934 {
935 logger.LogTrace("Starting processing loop...");
936 var messageTasks = new Dictionary<IProvider, Task<Message?>>();
937 ValueTask activeProcessingTask = ValueTask.CompletedTask;
938 try
939 {
940 Task? updatedTask = null;
941 while (!cancellationToken.IsCancellationRequested)
942 {
943 if (updatedTask?.IsCompleted != false)
944 lock (synchronizationLock)
945 updatedTask = connectionsUpdated.Task;
946
947 // prune disconnected providers
948 foreach (var disposedProviderMessageTaskKvp in messageTasks.Where(x => x.Key.Disposed).ToList())
949 messageTasks.Remove(disposedProviderMessageTaskKvp.Key);
950
951 // add new ones
952 lock (providers)
953 foreach (var providerKvp in providers)
954 if (!messageTasks.ContainsKey(providerKvp.Value))
955 messageTasks.Add(
956 providerKvp.Value,
957 providerKvp.Value.NextMessage(cancellationToken));
958
959 if (messageTasks.Count == 0)
960 {
961 logger.LogTrace("No providers active, pausing messsage monitoring...");
962 await updatedTask.WaitAsync(cancellationToken);
963 logger.LogTrace("Resuming message monitoring...");
964 continue;
965 }
966
967 // wait for a message
968 await Task.WhenAny(updatedTask, Task.WhenAny(messageTasks.Select(x => x.Value)));
969
970 // process completed ones
971 foreach (var completedMessageTaskKvp in messageTasks.Where(x => x.Value.IsCompleted).ToList())
972 {
973 var provider = completedMessageTaskKvp.Key;
974 messageTasks.Remove(provider);
975
976 if (provider.Disposed) // valid to receive one, but don't process it
977 continue;
978
979 var message = await completedMessageTaskKvp.Value;
980 var messageNumber = Interlocked.Increment(ref messagesProcessed);
981
982 async ValueTask WrapProcessMessage()
983 {
984 var localActiveProcessingTask = activeProcessingTask;
985 using (LogContext.PushProperty(SerilogContextHelper.ChatMessageIterationContextProperty, messageNumber))
986 try
987 {
988 await ProcessMessage(provider, message, false, cancellationToken);
989 }
990 catch (Exception ex)
991 {
992 logger.LogError(ex, "Error processing message {messageNumber}!", messageNumber);
993 }
994
995 await localActiveProcessingTask;
996 }
997
998 activeProcessingTask = WrapProcessMessage();
999 }
1000 }
1001 }
1002 catch (OperationCanceledException ex)
1003 {
1004 logger.LogTrace(ex, "Message processing loop cancelled!");
1005 }
1006 catch (Exception e)
1007 {
1008 logger.LogError(e, "Message loop crashed!");
1009 }
1010 finally
1011 {
1012 await activeProcessingTask;
1013 }
1014
1015 logger.LogTrace("Leaving message processing loop");
1016 }
1017
1026 ValueTask SendMessage(IEnumerable<ulong> channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
1027 {
1028 var channelIdsList = channelIds.ToList();
1029
1030 logger.LogTrace(
1031 "Chat send \"{message}\"{embed} to channels: [{channelIdsCommaSeperated}]",
1032 message.Text,
1033 message.Embed != null ? " (with embed)" : String.Empty,
1034 String.Join(", ", channelIdsList));
1035
1036 if (channelIdsList.Count == 0)
1037 return ValueTask.CompletedTask;
1038
1040 channelIdsList.Select(x =>
1041 {
1042 ChannelMapping? channelMapping;
1043 lock (mappedChannels)
1044 if (!mappedChannels.TryGetValue(x, out channelMapping))
1045 return ValueTask.CompletedTask;
1046 IProvider? provider;
1047 lock (providers)
1048 if (!providers.TryGetValue(channelMapping.ProviderId, out provider))
1049 return ValueTask.CompletedTask;
1050 return provider.SendMessage(replyTo, message, channelMapping.ProviderChannelId, cancellationToken);
1051 }));
1052 }
1053
1059 {
1060 await Task.WhenAll(providers.Select(x => x.Value.InitialConnectionJob));
1061 logger.LogTrace("Initial provider connection task completed");
1062 }
1063
1068 void AddMessageTask(Task task)
1069 {
1070 async Task Wrap(Task originalTask)
1071 {
1072 await originalTask;
1073 try
1074 {
1075 await task;
1076 }
1077 catch (OperationCanceledException ex)
1078 {
1079 logger.LogDebug(ex, "Async chat message cancelled!");
1080 }
1081 catch (Exception ex)
1082 {
1083 logger.LogError(ex, "Error in asynchronous chat message!");
1084 }
1085 }
1086
1087 lock (handlerCts)
1088 messageSendTask = Wrap(messageSendTask);
1089 }
1090
1097 void QueueMessageInternal(MessageContent message, Func<IEnumerable<ulong>> channelIdsFactory, bool waitForConnections)
1098 {
1099 async Task SendMessageTask()
1100 {
1101 var cancellationToken = handlerCts.Token;
1102 if (waitForConnections)
1103 await initialProviderConnectionsTask!.WaitAsync(cancellationToken);
1104
1105 await SendMessage(
1106 channelIdsFactory(),
1107 null,
1108 message,
1109 cancellationToken);
1110 }
1111
1112 AddMessageTask(SendMessageTask());
1113 }
1114 }
1115}
Information about an engine installation.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Represents a mapping of a ChannelRepresentation.RealId.
string ConnectionName
The name of the connection the ChannelRepresentation belongs to.
bool IsAdminChannel
If this is considered a channel for admin commands.
const string CommonMention
The common bot mention.
long messagesProcessed
The number of Messages processed.
readonly IProviderFactory providerFactory
The IProviderFactory for the ChatManager.
ValueTask SendMessage(IEnumerable< ulong > channelIds, Message? replyTo, MessageContent message, CancellationToken cancellationToken)
Asynchronously send a given message to a set of channelIds .
readonly object synchronizationLock
Used for various lock statements throughout this class.
ChatManager(IProviderFactory providerFactory, ICommandFactory commandFactory, IServerControl serverControl, ILoggerFactory loggerFactory, ILogger< ChatManager > logger, IEnumerable< Models.ChatBot > initialChatBots)
Initializes a new instance of the ChatManager class.
Task? initialProviderConnectionsTask
A Task that represents the IProviders initial connection.
ValueTask HandleRestart(Version? updateVersion, bool handlerMayDelayShutdownWithExtremelyLongRunningTasks, CancellationToken cancellationToken)
Handle a restart of the server.A ValueTask representing the running operation.
void QueueMessageInternal(MessageContent message, Func< IEnumerable< ulong > > channelIdsFactory, bool waitForConnections)
Adds a given message to the send queue.
readonly List< Models.ChatBot > activeChatBots
The active Models.ChatBot for the ChatManager.
async Task InitialConnection()
Aggregate all IProvider.InitialConnectionJobs into one <sse cref="Task">.
async ValueTask RemapProvider(IProvider provider, CancellationToken cancellationToken)
Remap the channels for a given provider .
async ValueTask ProcessMessage(IProvider provider, Message? message, bool recursed, CancellationToken cancellationToken)
Processes a message .
readonly Dictionary< long, IProvider > providers
Map of IProviders in use, keyed by ChatBotSettings EntityId.Id.
async Task MonitorMessages(CancellationToken cancellationToken)
Monitors active providers for new Messages.
ICustomCommandHandler? customCommandHandler
The ICustomCommandHandler for the ChangeChannels(long, IEnumerable<Models.ChatChannel>,...
async ValueTask ChangeChannels(long connectionId, IEnumerable< Models.ChatChannel > newChannels, CancellationToken cancellationToken)
readonly Dictionary< string, ICommand > builtinCommands
Unchanging ICommands in the ChatManager mapped by ICommand.Name.
void QueueMessage(MessageContent message, IEnumerable< ulong > channelIds)
Queue a chat message to a given set of channelIds .
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the ChatManager.
Task messageSendTask
A Task that represents all sent messages.
Task? chatHandler
The Task that monitors incoming chat messages.
async ValueTask ChangeSettings(Models.ChatBot newSettings, CancellationToken cancellationToken)
Change chat settings. If the Api.Models.EntityId.Id is not currently in use, a new connection will be...
ulong channelIdCounter
Used for remapping ChannelRepresentation.RealIds.
readonly ICommandFactory commandFactory
The ICommandFactory for the ChatManager.
IChatTrackingContext CreateTrackingContext()
Start tracking Commands.CustomCommands and ChannelRepresentations.A new IChatTrackingContext.
void AddMessageTask(Task task)
Adds a given task to messageSendTask.
readonly ILogger< ChatManager > logger
The ILogger for the ChatManager.
readonly ConcurrentDictionary< long, SemaphoreSlim > changeChannelSemaphores
Map of SemaphoreSlims used to guard concurrent access to ChangeChannels(long, IEnumerable<Models....
readonly IRestartRegistration restartRegistration
The IRestartRegistration for the ChatManager.
void QueueWatchdogMessage(string message)
Queue a chat message to configured watchdog channels.
readonly CancellationTokenSource handlerCts
The CancellationTokenSource for chatHandler.
async Task StartAsync(CancellationToken cancellationToken)
void RegisterCommandHandler(ICustomCommandHandler customCommandHandler)
Registers a customCommandHandler to use.
TaskCompletionSource connectionsUpdated
The TaskCompletionSource that completes when ChatBotSettingss change.
async ValueTask UpdateTrackingContexts(CancellationToken cancellationToken)
Force an update with the active channels on all active IChatTrackingContexts.A ValueTask representing...
async Task DeleteConnection(long connectionId, CancellationToken cancellationToken)
Disconnects and deletes a given connection.A Task representing the running operation.
Func< string?, string, Action< bool > > QueueDeploymentMessage(Models.RevisionInformation revisionInformation, Models.RevisionInformation? previousRevisionInformation, EngineVersion engineVersion, DateTimeOffset? estimatedCompletionTime, string? gitHubOwner, string? gitHubRepo, bool localCommitPushed)
Send the message for a deployment to configured deployment channels.A Func<T1, T2,...
async ValueTask< IProvider?> RemoveProviderChannels(long connectionId, bool removeProvider, CancellationToken cancellationToken)
Remove a IProvider from mappedChannels optionally removing the provider itself from providers and upd...
async Task StopAsync(CancellationToken cancellationToken)
readonly List< IChatTrackingContext > trackingContexts
The active IChatTrackingContexts for the ChatManager.
readonly Dictionary< ulong, ChannelMapping > mappedChannels
Map of ChannelRepresentation.RealIds to ChannelMappings.
string FriendlyName
The friendly name of the user.
Definition ChatUser.cs:31
ChannelRepresentation Channel
The ChannelRepresentation the user spoke from.
Definition ChatUser.cs:41
Represents a message received by a IProvider.
Definition Message.cs:9
ChatUser User
The ChatUser who sent the Message.
Definition Message.cs:18
Represents a message to send to a chat provider.
static async ValueTask< SemaphoreSlimContext > Lock(SemaphoreSlim semaphore, CancellationToken cancellationToken, ILogger? logger=null)
Asyncronously locks a semaphore .
Helpers for manipulating the Serilog.Context.LogContext.
const string ChatMessageIterationContextProperty
The Serilog.Context.LogContext property name for the ID of the chat message currently being processed...
Represents a command that can be invoked by talking to chat bots.
Definition ICommand.cs:12
ValueTask UpdateChannels(IEnumerable< ChannelRepresentation > newChannels, CancellationToken cancellationToken)
Called when newChannels are set.
For managing connected chat services.
Represents a tracking of dynamic chat json files.
Handles Commands.ICommands that map to those defined in a IChatTrackingContext.
IProvider CreateProvider(ChatBot settings)
Create a IProvider.
string BotMention
The string that indicates the IProvider was mentioned.
Definition IProvider.cs:30
bool Connected
If the IProvider is currently connected.
Definition IProvider.cs:20
Represents the lifetime of a IRestartHandler registration.
Represents a service that may take an updated Host assembly and run it, stopping the current assembly...
IRestartRegistration RegisterForRestart(IRestartHandler handler)
Register a given handler to run before stopping the server for a restart.