tgstation-server 6.19.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
SwarmService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics.CodeAnalysis;
4using System.Linq;
5using System.Net;
6using System.Net.Http;
7using System.Net.Http.Headers;
8using System.Net.Mime;
9using System.Text;
10using System.Threading;
11using System.Threading.Tasks;
12using System.Web;
13
14using Microsoft.Extensions.Logging;
15using Microsoft.Extensions.Options;
16
17using Newtonsoft.Json;
18
31
33{
37#pragma warning disable CA1506 // TODO: Decomplexify
39#pragma warning restore CA1506
40 {
43 {
44 get
45 {
46 if (!SwarmMode)
47 return true;
48
49 lock (swarmServers)
50 return swarmServers.Count - 1 >= swarmConfiguration.UpdateRequiredNodeCount;
51 }
52 }
53
57 [MemberNotNullWhen(true, nameof(serverHealthCheckTask), nameof(forceHealthCheckTcs), nameof(serverHealthCheckCancellationTokenSource), nameof(swarmServers))]
58 bool SwarmMode => swarmConfiguration.PrivateKey != null;
59
64
69
74
78 readonly IHttpClientFactory httpClientFactory;
79
84
89
94
99
103 readonly ILogger<SwarmService> logger;
104
109
113 readonly CancellationTokenSource? serverHealthCheckCancellationTokenSource;
114
118 readonly List<SwarmServerInformation>? swarmServers;
119
123 readonly Dictionary<string, (Guid RegistrationId, DateTimeOffset RegisteredAt)>? registrationIdsAndTimes;
124
128 readonly bool swarmController;
129
134
138 volatile TaskCompletionSource? forceHealthCheckTcs;
139
144
149
154
159
177 IHttpClientFactory httpClientFactory,
182 IOptions<SwarmConfiguration> swarmConfigurationOptions,
183 ILogger<SwarmService> logger)
184 {
185 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
186 this.databaseSeeder = databaseSeeder ?? throw new ArgumentNullException(nameof(databaseSeeder));
187 this.assemblyInformationProvider = assemblyInformationProvider ?? throw new ArgumentNullException(nameof(assemblyInformationProvider));
188 this.httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
189 this.asyncDelayer = asyncDelayer ?? throw new ArgumentNullException(nameof(asyncDelayer));
190 this.serverUpdater = serverUpdater ?? throw new ArgumentNullException(nameof(serverUpdater));
191 this.transferService = transferService ?? throw new ArgumentNullException(nameof(transferService));
192 this.tokenFactory = tokenFactory ?? throw new ArgumentNullException(nameof(tokenFactory));
193 swarmConfiguration = swarmConfigurationOptions?.Value ?? throw new ArgumentNullException(nameof(swarmConfigurationOptions));
194 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
195
196 if (SwarmMode)
197 {
198 if (swarmConfiguration.Address == null)
199 throw new InvalidOperationException("Swarm configuration missing Address!");
200
201 if (String.IsNullOrWhiteSpace(swarmConfiguration.Identifier))
202 throw new InvalidOperationException("Swarm configuration missing Identifier!");
203
204 swarmController = swarmConfiguration.ControllerAddress == null;
205 if (swarmController)
207
208 serverHealthCheckCancellationTokenSource = new CancellationTokenSource();
209 forceHealthCheckTcs = new TaskCompletionSource();
210
211 swarmServers = new List<SwarmServerInformation>
212 {
213 new()
214 {
215 Address = swarmConfiguration.Address,
216 PublicAddress = swarmConfiguration.PublicAddress,
218 Identifier = swarmConfiguration.Identifier,
219 },
220 };
221 }
222 else
223 swarmController = true;
224 }
225
228
230 public async ValueTask AbortUpdate()
231 {
232 if (!SwarmMode)
233 return;
234
235 var localUpdateOperation = Interlocked.Exchange(ref updateOperation, null);
236 var abortResult = localUpdateOperation?.Abort();
237 switch (abortResult)
238 {
239 case SwarmUpdateAbortResult.Aborted:
240 break;
241 case SwarmUpdateAbortResult.AlreadyAborted:
242 logger.LogDebug("Another context already aborted this update.");
243 return;
244 case SwarmUpdateAbortResult.CantAbortCommitted:
245 logger.LogDebug("Not aborting update because we have committed!");
246 return;
247 case null:
248 logger.LogTrace("Attempted update abort but no operation was found!");
249 return;
250 default:
251 throw new InvalidOperationException($"Invalid return value for SwarmUpdateOperation.Abort(): {abortResult}");
252 }
253
254 await RemoteAbortUpdate();
255 }
256
258 public async ValueTask<SwarmCommitResult> CommitUpdate(CancellationToken cancellationToken)
259 {
260 if (!SwarmMode)
261 return SwarmCommitResult.ContinueUpdateNonCommitted;
262
263 // wait for the update commit TCS
264 var localUpdateOperation = updateOperation;
265 if (localUpdateOperation == null)
266 {
267 logger.LogDebug("Update commit failed, no pending operation!");
268 await AbortUpdate(); // unnecessary, but can never be too safe
269 return SwarmCommitResult.AbortUpdate;
270 }
271
272 logger.LogInformation("Waiting to commit update...");
273 using var httpClient = httpClientFactory.CreateClient();
274 if (!swarmController)
275 {
276 // let the controller know we're ready
277 logger.LogTrace("Sending ready-commit to swarm controller...");
278 using var commitReadyRequest = PrepareSwarmRequest(
279 null,
280 HttpMethod.Post,
282 null);
283
284 try
285 {
286 using var commitReadyResponse = await httpClient.SendAsync(commitReadyRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
287 commitReadyResponse.EnsureSuccessStatusCode();
288 }
289 catch (Exception ex)
290 {
291 logger.LogWarning(ex, "Unable to send ready-commit to swarm controller!");
292 await AbortUpdate();
293 return SwarmCommitResult.AbortUpdate;
294 }
295 }
296
297 var timeoutTask = swarmController
299 TimeSpan.FromMinutes(SwarmConstants.UpdateCommitTimeoutMinutes),
300 cancellationToken).AsTask()
301 : Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
302
303 var commitTask = Task.WhenAny(localUpdateOperation.CommitGate, timeoutTask);
304
305 await commitTask;
306
307 var commitGoAhead = localUpdateOperation.CommitGate.IsCompleted
308 && localUpdateOperation.CommitGate.Result
309 && localUpdateOperation == updateOperation;
310 if (!commitGoAhead)
311 {
312 logger.LogDebug(
313 "Update commit failed!{maybeTimeout}",
314 timeoutTask.IsCompleted
315 ? " Timed out!"
316 : String.Empty);
317 await AbortUpdate();
318 return SwarmCommitResult.AbortUpdate;
319 }
320
321 logger.LogTrace("Update commit task complete");
322
323 // on nodes, it means we can go straight ahead
324 if (!swarmController)
325 return SwarmCommitResult.MustCommitUpdate;
326
327 // on the controller, we first need to signal for nodes to go ahead
328 // if anything fails at this point, there's nothing we can do
329 logger.LogDebug("Sending remote commit message to nodes...");
330 async ValueTask SendRemoteCommitUpdate(SwarmServerInformation swarmServer)
331 {
332 using var request = PrepareSwarmRequest(
333 swarmServer,
334 HttpMethod.Post,
336 null);
337
338 try
339 {
340 // I know using the cancellationToken after this point doesn't seem very sane
341 // It's the token for Ctrl+C on server's console though, so we must respect it
342 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
343 response.EnsureSuccessStatusCode();
344 }
345 catch (Exception ex)
346 {
347 logger.LogCritical(ex, "Failed to send update commit request to node {nodeId}!", swarmServer.Identifier);
348 }
349 }
350
351 ValueTask task;
352 lock (swarmServers)
355 .Where(x => !x.Controller)
356 .Select(SendRemoteCommitUpdate)
357 .ToList());
358
359 await task;
360 return SwarmCommitResult.MustCommitUpdate;
361 }
362
364 public List<SwarmServerInformation>? GetSwarmServers()
365 {
366 if (!SwarmMode)
367 return null;
368
369 lock (swarmServers)
370 return swarmServers.ToList();
371 }
372
374 public ValueTask<SwarmPrepareResult> PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
375 {
376 ArgumentNullException.ThrowIfNull(fileStreamProvider);
377
378 ArgumentNullException.ThrowIfNull(version);
379
380 logger.LogTrace("Begin PrepareUpdate...");
381 return PrepareUpdateImpl(
382 fileStreamProvider,
384 {
385 UpdateVersion = version,
386 },
387 cancellationToken);
388 }
389
391 public async ValueTask<bool> PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
392 {
393 ArgumentNullException.ThrowIfNull(updateRequest);
394
395 logger.LogInformation("Received remote update request from {nodeType}", !swarmController ? "controller" : "node");
396 var result = await PrepareUpdateImpl(
397 null,
398 updateRequest,
399 cancellationToken);
400
401 return result != SwarmPrepareResult.Failure;
402 }
403
405 public async ValueTask<SwarmRegistrationResult> Initialize(CancellationToken cancellationToken)
406 {
407 if (SwarmMode)
408 logger.LogInformation(
409 "Swarm mode enabled: {nodeType} {nodeId}",
411 ? "Controller"
412 : "Node",
414 else
415 logger.LogTrace("Swarm mode disabled");
416
418 if (swarmController)
419 {
421 logger.LogInformation("Expecting connections from {expectedNodeCount} nodes", swarmConfiguration.UpdateRequiredNodeCount);
422
424 databaseContext => databaseSeeder.Initialize(databaseContext, cancellationToken));
425
426 result = SwarmRegistrationResult.Success;
427 }
428 else
429 result = await RegisterWithController(cancellationToken);
430
431 if (SwarmMode && result == SwarmRegistrationResult.Success)
433
434 return result;
435 }
436
438 public async ValueTask Shutdown(CancellationToken cancellationToken)
439 {
440 logger.LogTrace("Begin Shutdown");
441
442 async ValueTask SendUnregistrationRequest(SwarmServerInformation? swarmServer)
443 {
444 using var httpClient = httpClientFactory.CreateClient();
445 using var request = PrepareSwarmRequest(
446 swarmServer,
447 HttpMethod.Delete,
449 null);
450
451 try
452 {
453 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
454 response.EnsureSuccessStatusCode();
455 }
456 catch (Exception ex)
457 {
458 logger.LogWarning(
459 ex,
460 "Error unregistering {nodeType}!",
461 swarmServer != null
462 ? $"node {swarmServer.Identifier}"
463 : "from controller");
464 }
465 }
466
467 if (SwarmMode && serverHealthCheckTask != null)
468 {
471 }
472
473 if (!swarmController)
474 {
475 if (controllerRegistration != null)
476 {
477 logger.LogInformation("Unregistering from swarm controller...");
478 await SendUnregistrationRequest(null);
479 }
480
481 return;
482 }
483
484 // We're in a single-threaded-like context now so touching updateOperation directly is fine
485
486 // downgrade the db if necessary
487 if (updateOperation != null
490 db => databaseSeeder.Downgrade(db, updateOperation.TargetVersion, cancellationToken));
491
492 if (SwarmMode)
493 {
494 // Put the nodes into a reconnecting state
495 if (updateOperation == null)
496 {
497 logger.LogInformation("Unregistering nodes...");
498 ValueTask task;
499 lock (swarmServers)
500 {
503 .Where(x => !x.Controller)
504 .Select(SendUnregistrationRequest)
505 .ToList());
506 swarmServers.RemoveRange(1, swarmServers.Count - 1);
508 }
509
510 await task;
511 }
512
513 logger.LogTrace("Swarm controller shutdown");
514 }
515 }
516
518 public void UpdateSwarmServersList(IEnumerable<SwarmServerInformation> swarmServers)
519 {
520 ArgumentNullException.ThrowIfNull(swarmServers);
521
522 if (!SwarmMode)
523 throw new InvalidOperationException("Swarm mode not enabled!");
524
525 if (swarmController)
526 throw new InvalidOperationException("Cannot UpdateSwarmServersList on swarm controller!");
527
528 lock (this.swarmServers)
529 {
530 this.swarmServers.Clear();
531 this.swarmServers.AddRange(swarmServers);
532 logger.LogDebug("Updated swarm server list with {nodeCount} total nodes", this.swarmServers.Count);
533 }
534 }
535
537 public bool ValidateRegistration(Guid registrationId)
538 {
539 if (!SwarmMode)
540 throw new InvalidOperationException("Swarm mode not enabled!");
541
542 if (swarmController)
543 lock (swarmServers)
544 return registrationIdsAndTimes!.Values.Any(x => x.RegistrationId == registrationId);
545
546 if (registrationId != controllerRegistration)
547 return false;
548
549 lastControllerHealthCheck = DateTimeOffset.UtcNow;
550 return true;
551 }
552
554 public async ValueTask<SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
555 {
556 ArgumentNullException.ThrowIfNull(node);
557
558 if (node.Identifier == null)
559 throw new ArgumentException("Node missing Identifier!", nameof(node));
560
561 if (node.Address == null)
562 throw new ArgumentException("Node missing Address!", nameof(node));
563
564 if (!SwarmMode)
565 throw new InvalidOperationException("Swarm mode not enabled!");
566
567 if (!swarmController)
568 throw new InvalidOperationException("Cannot RegisterNode on swarm node!");
569
570 logger.LogTrace("RegisterNode");
571
572 await AbortUpdate();
573
574 SwarmRegistrationResponse CreateResponse() => new()
575 {
576 TokenSigningKeyBase64 = Convert.ToBase64String(tokenFactory.SigningKeyBytes),
577 };
578
579 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
580 lock (swarmServers)
581 {
582 if (registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId))
583 {
584 var preExistingRegistrationKvp = registrationIdsAndTimes.FirstOrDefault(x => x.Value.RegistrationId == registrationId);
585 if (preExistingRegistrationKvp.Key == node.Identifier)
586 {
587 logger.LogWarning("Node {nodeId} has already registered!", node.Identifier);
588 return CreateResponse();
589 }
590
591 logger.LogWarning(
592 "Registration ID collision! Node {nodeId} tried to register with {otherNodeId}'s registration ID: {registrationId}",
593 node.Identifier,
594 preExistingRegistrationKvp.Key,
595 registrationId);
596 return null;
597 }
598
599 if (registrationIdsAndTimes.TryGetValue(node.Identifier, out var oldRegistration))
600 {
601 logger.LogInformation("Node {nodeId} is re-registering without first unregistering. Indicative of restart.", node.Identifier);
602 swarmServers.RemoveAll(x => x.Identifier == node.Identifier);
604 }
605
607 {
608 PublicAddress = node.PublicAddress,
609 Address = node.Address,
610 Identifier = node.Identifier,
611 Controller = false,
612 });
613 registrationIdsAndTimes.Add(node.Identifier, (RegistrationId: registrationId, DateTimeOffset.UtcNow));
614 }
615
616 logger.LogInformation("Registered node {nodeId} ({nodeIP}) with ID {registrationId}", node.Identifier, node.Address, registrationId);
618 return CreateResponse();
619 }
620
622 public async ValueTask<bool> RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
623 {
624 var localUpdateOperation = updateOperation;
625 if (!swarmController)
626 {
627 logger.LogDebug("Received remote commit go ahead");
628 return localUpdateOperation?.Commit() == true;
629 }
630
631 var nodeIdentifier = NodeIdentifierFromRegistration(registrationId);
632 if (nodeIdentifier == null)
633 {
634 // Something fucky is happening, take no chances.
635 logger.LogError("Aborting update due to unforseen circumstances!");
636 await AbortUpdate();
637 return false;
638 }
639
640 if (localUpdateOperation == null)
641 {
642 logger.LogDebug("Ignoring ready-commit from node {nodeId} as the update appears to have been aborted.", nodeIdentifier);
643 return false;
644 }
645
646 if (!localUpdateOperation.MarkNodeReady(nodeIdentifier))
647 {
648 logger.LogError(
649 "Attempting to mark {nodeId} as ready to commit resulted in the update being aborted!",
650 nodeIdentifier);
651
652 // bit racy here, localUpdateOperation has already been aborted.
653 // now if, FOR SOME GODFORSAKEN REASON, there's a new update operation, abort that too.
654 if (Interlocked.CompareExchange(ref updateOperation, null, localUpdateOperation) == localUpdateOperation)
655 await RemoteAbortUpdate();
656 else
657 {
658 // marking as an error because how the actual fuck
659 logger.LogError("Aborting new update due to unforseen consequences!");
660 await AbortUpdate();
661 }
662
663 return false;
664 }
665
666 logger.LogDebug("Node {nodeId} is ready to commit.", nodeIdentifier);
667 return true;
668 }
669
671 public async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
672 {
673 if (!SwarmMode)
674 throw new InvalidOperationException("Swarm mode not enabled!");
675
676 logger.LogTrace("UnregisterNode {registrationId}", registrationId);
677 await AbortUpdate();
678
679 if (!swarmController)
680 {
681 // immediately trigger a health check
682 logger.LogInformation("Controller unregistering, will attempt re-registration...");
685 return;
686 }
687
688 var nodeIdentifier = NodeIdentifierFromRegistration(registrationId);
689 if (nodeIdentifier == null)
690 return;
691
692 logger.LogInformation("Unregistering node {nodeId}...", nodeIdentifier);
693
694 lock (swarmServers)
695 {
696 swarmServers.RemoveAll(x => x.Identifier == nodeIdentifier);
697 registrationIdsAndTimes!.Remove(nodeIdentifier);
698 }
699
701 }
702
709 {
710 logger.LogInformation("Aborting swarm update!");
711
712 using var httpClient = httpClientFactory.CreateClient();
713 async ValueTask SendRemoteAbort(SwarmServerInformation swarmServer)
714 {
715 using var request = PrepareSwarmRequest(
716 swarmServer,
717 HttpMethod.Delete,
719 null);
720
721 try
722 {
723 // DCT: Intentionally should not be cancelled
724 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
725 response.EnsureSuccessStatusCode();
726 }
727 catch (Exception ex)
728 {
729 logger.LogWarning(
730 ex,
731 "Unable to send remote abort to {nodeOrController}!",
733 ? $"node {swarmServer.Identifier}"
734 : "controller");
735 }
736 }
737
738 if (!swarmController)
739 return SendRemoteAbort(new SwarmServerInformation
740 {
742 });
743
744 lock (swarmServers!)
747 .Where(x => !x.Controller)
748 .Select(SendRemoteAbort)
749 .ToList());
750 }
751
759 {
760 var httpClient = httpClientFactory.CreateClient();
761 try
762 {
763 var request = PrepareSwarmRequest(
764 sourceNode,
765 HttpMethod.Get,
766 $"{SwarmConstants.UpdateRoute}?ticket={HttpUtility.UrlEncode(ticket.FileTicket)}",
767 null);
768
769 try
770 {
771 request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Octet));
772 return new RequestFileStreamProvider(httpClient, request);
773 }
774 catch
775 {
776 request.Dispose();
777 throw;
778 }
779 }
780 catch
781 {
782 httpClient.Dispose();
783 throw;
784 }
785 }
786
794 async ValueTask<SwarmPrepareResult> PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
795 {
796 var version = updateRequest.UpdateVersion!;
797 if (!SwarmMode)
798 {
799 // we still need an active update operation for the TargetVersion
801 return SwarmPrepareResult.SuccessProviderNotRequired;
802 }
803
804 var initiator = initiatorProvider != null;
805 logger.LogTrace("PrepareUpdateImpl {version}...", version);
806
807 var shouldAbort = false;
808 SwarmUpdateOperation localUpdateOperation;
809 try
810 {
811 SwarmServerInformation? sourceNode = null;
812 List<SwarmServerInformation> currentNodes;
813 lock (swarmServers)
814 {
815 currentNodes = swarmServers
816 .Select(node =>
817 {
818 if (node.Identifier == updateRequest.SourceNode)
819 sourceNode = node;
820
821 return node;
822 })
823 .ToList();
824 if (swarmController)
825 localUpdateOperation = new SwarmUpdateOperation(
826 version,
827 currentNodes);
828 else
829 localUpdateOperation = new SwarmUpdateOperation(version);
830 }
831
832 var existingUpdateOperation = Interlocked.CompareExchange(ref updateOperation, localUpdateOperation, null);
833 if (existingUpdateOperation != null && existingUpdateOperation.TargetVersion != version)
834 {
835 logger.LogWarning("Aborting update preparation, version {targetUpdateVersion} already prepared!", existingUpdateOperation.TargetVersion);
836 shouldAbort = true;
837 return SwarmPrepareResult.Failure;
838 }
839
840 if (existingUpdateOperation?.TargetVersion == version)
841 {
842 logger.LogTrace("PrepareUpdateImpl early out, already prepared!");
843 return SwarmPrepareResult.SuccessProviderNotRequired;
844 }
845
846 if (!swarmController && initiator)
847 {
848 var downloadTickets = await CreateDownloadTickets(initiatorProvider!, currentNodes, cancellationToken); // condition of initiator
849
850 logger.LogInformation("Forwarding update request to swarm controller...");
851 using var httpClient = httpClientFactory.CreateClient();
852 using var request = PrepareSwarmRequest(
853 null,
854 HttpMethod.Put,
857 {
858 UpdateVersion = version,
859 SourceNode = swarmConfiguration.Identifier,
860 DownloadTickets = downloadTickets,
861 });
862
863 // File transfer service will hold the necessary streams
864 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
865 if (response.IsSuccessStatusCode)
866 return SwarmPrepareResult.SuccessHoldProviderUntilCommit;
867
868 shouldAbort = true;
869 return SwarmPrepareResult.Failure;
870 }
871
872 if (!initiator)
873 {
874 logger.LogTrace("Beginning local update process...");
875
876 if (sourceNode == null)
877 {
878 logger.Log(
880 ? LogLevel.Error
881 : LogLevel.Warning,
882 "Missing local node entry for update source node: {sourceNode}",
883 updateRequest.SourceNode);
884 shouldAbort = true;
885 return SwarmPrepareResult.Failure;
886 }
887
888 if (updateRequest.DownloadTickets == null)
889 {
890 logger.LogError("Missing download tickets in update request!");
891 return SwarmPrepareResult.Failure;
892 }
893
894 if (!updateRequest.DownloadTickets.TryGetValue(swarmConfiguration.Identifier!, out var ticket))
895 {
896 logger.Log(
898 ? LogLevel.Error
899 : LogLevel.Warning,
900 "Missing node entry for download ticket in update request!");
901 shouldAbort = true;
902 return SwarmPrepareResult.Failure;
903 }
904
905 ServerUpdateResult updateApplyResult;
906 var downloaderStream = CreateUpdateStreamProvider(sourceNode, ticket);
907 try
908 {
909 updateApplyResult = await serverUpdater.BeginUpdate(
910 this,
911 downloaderStream,
912 version,
913 cancellationToken);
914 }
915 catch
916 {
917 await downloaderStream.DisposeAsync();
918 throw;
919 }
920
921 if (updateApplyResult != ServerUpdateResult.Started)
922 {
923 logger.LogWarning("Failed to prepare update! Result: {serverUpdateResult}", updateApplyResult);
924 shouldAbort = true;
925 return SwarmPrepareResult.Failure;
926 }
927 }
928 else
929 logger.LogTrace("No need to re-initiate update as it originated here on the swarm controller");
930
931 logger.LogDebug("Local node prepared for update to version {version}", version);
932 }
933 catch (Exception ex)
934 {
935 logger.LogWarning(ex, "Failed to prepare update!");
936 shouldAbort = true;
937 return SwarmPrepareResult.Failure;
938 }
939 finally
940 {
941 if (shouldAbort)
942 await AbortUpdate();
943 }
944
945 if (!swarmController)
946 return SwarmPrepareResult.SuccessProviderNotRequired;
947
949 initiatorProvider,
950 updateRequest,
951 localUpdateOperation,
952 cancellationToken);
953 }
954
963 async ValueTask<SwarmPrepareResult> ControllerDistributedPrepareUpdate(
964 ISeekableFileStreamProvider? initiatorProvider,
965 SwarmUpdateRequest updateRequest,
966 SwarmUpdateOperation currentUpdateOperation,
967 CancellationToken cancellationToken)
968 {
969 bool abortUpdate = false;
970 try
971 {
972 logger.LogInformation("Sending remote prepare to nodes...");
973
974 if (currentUpdateOperation.InvolvedServers.Count - 1 < swarmConfiguration.UpdateRequiredNodeCount)
975 {
976 logger.LogWarning(
977 "Aborting update, controller expects to be in sync with {requiredNodeCount} nodes but currently only has {currentNodeCount}!",
979 currentUpdateOperation.InvolvedServers.Count - 1);
980 abortUpdate = true;
981 return SwarmPrepareResult.Failure;
982 }
983
984 var weAreInitiator = initiatorProvider != null;
985 if (currentUpdateOperation.InvolvedServers.Count == 1)
986 {
987 logger.LogDebug("Controller has no nodes, setting commit-ready.");
988 if (updateOperation?.Commit() == true)
989 return SwarmPrepareResult.SuccessProviderNotRequired;
990
991 logger.LogDebug("Update appears to have been aborted");
992 return SwarmPrepareResult.Failure;
993 }
994
995 // The initiator node obviously doesn't create a ticket for itself
996 else if (!weAreInitiator && updateRequest.DownloadTickets!.Count != currentUpdateOperation.InvolvedServers.Count - 1)
997 {
998 logger.LogWarning(
999 "Aborting update, {receivedTickets} download tickets were provided but there are {nodesToUpdate} nodes in the swarm that require the package!",
1000 updateRequest.DownloadTickets.Count,
1001 currentUpdateOperation.InvolvedServers.Count);
1002 abortUpdate = true;
1003 return SwarmPrepareResult.Failure;
1004 }
1005
1006 var downloadTicketDictionary = weAreInitiator
1007 ? await CreateDownloadTickets(initiatorProvider!, currentUpdateOperation.InvolvedServers, cancellationToken)
1008 : updateRequest.DownloadTickets!;
1009
1010 var sourceNode = weAreInitiator
1011 ? swarmConfiguration.Identifier
1012 : updateRequest.SourceNode;
1013
1014 using var httpClient = httpClientFactory.CreateClient();
1015 using var transferSemaphore = new SemaphoreSlim(1);
1016
1017 bool anyFailed = false;
1018 var updateRequests = currentUpdateOperation
1020 .Where(node => !node.Controller)
1021 .Select(node =>
1022 {
1023 // only send the necessary ticket to each node from the controller
1024 Dictionary<string, FileTicketResponse>? localTicketDictionary;
1025 var nodeId = node.Identifier!;
1026 if (nodeId == sourceNode)
1027 localTicketDictionary = null;
1028 else if (!downloadTicketDictionary.TryGetValue(nodeId, out var ticket))
1029 {
1030 logger.LogError("Missing download ticket for node {missingNodeId}!", nodeId);
1031 anyFailed = true;
1032 return null;
1033 }
1034 else
1035 localTicketDictionary = new Dictionary<string, FileTicketResponse>
1036 {
1037 { nodeId, ticket },
1038 };
1039
1040 var request = new SwarmUpdateRequest
1041 {
1042 UpdateVersion = updateRequest.UpdateVersion,
1043 SourceNode = sourceNode,
1044 DownloadTickets = localTicketDictionary,
1045 };
1046
1047 return Tuple.Create(node, request);
1048 })
1049 .ToList();
1050
1051 if (anyFailed)
1052 return SwarmPrepareResult.Failure;
1053
1054 var tasks = updateRequests
1055 .Select(async tuple =>
1056 {
1057 var node = tuple!.Item1;
1058 var body = tuple.Item2;
1059
1060 using var request = PrepareSwarmRequest(
1061 node,
1062 HttpMethod.Put,
1064 body);
1065
1066 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1067 return response.IsSuccessStatusCode;
1068 })
1069 .ToList();
1070
1071 await Task.WhenAll(tasks);
1072
1073 // if all succeeds...
1074 if (tasks.All(x => x.Result))
1075 {
1076 logger.LogInformation("Distributed prepare for update to version {version} complete.", updateRequest.UpdateVersion);
1077 return weAreInitiator
1078 ? SwarmPrepareResult.SuccessHoldProviderUntilCommit
1079 : SwarmPrepareResult.SuccessProviderNotRequired;
1080 }
1081
1082 abortUpdate = true;
1083 logger.LogDebug("Distrubuted prepare failed!");
1084 }
1085 catch (Exception ex)
1086 {
1087 logger.LogWarning(ex, "Error remotely preparing updates!");
1088 }
1089 finally
1090 {
1091 if (abortUpdate)
1092 await AbortUpdate();
1093 }
1094
1095 return SwarmPrepareResult.Failure;
1096 }
1097
1105 async ValueTask<Dictionary<string, FileTicketResponse>> CreateDownloadTickets(
1106 ISeekableFileStreamProvider initiatorProvider,
1107 IReadOnlyCollection<SwarmServerInformation> involvedServers,
1108 CancellationToken cancellationToken)
1109 {
1110 // we need to ensure this thing is loaded before we start providing downloads or it'll create unnecessary delays
1111 var streamRetrievalTask = initiatorProvider.GetResult(cancellationToken);
1112
1113 var downloadProvider = new FileDownloadProvider(
1114 () => initiatorProvider.Disposed
1115 ? Api.Models.ErrorCode.ResourceNotPresent
1116 : null,
1117 async downloadToken => await initiatorProvider.GetOwnedResult(downloadToken), // let it throw if disposed, shouldn't happen regardless
1118 "<Swarm Update Package Provider>",
1119 false);
1120
1121 var serversRequiringTickets = involvedServers
1122 .Where(node => node.Identifier != swarmConfiguration.Identifier)
1123 .ToList();
1124
1125 logger.LogTrace("Creating {n} download tickets for other nodes...", serversRequiringTickets.Count);
1126
1127 var downloadTickets = new Dictionary<string, FileTicketResponse>(serversRequiringTickets.Count);
1128 foreach (var node in serversRequiringTickets)
1129 downloadTickets.Add(
1130 node.Identifier!,
1131 transferService.CreateDownload(downloadProvider));
1132
1133 await streamRetrievalTask;
1134 return downloadTickets;
1135 }
1136
1142 async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
1143 {
1144 using var httpClient = httpClientFactory.CreateClient();
1145
1146 List<SwarmServerInformation> currentSwarmServers;
1147 lock (swarmServers!)
1148 currentSwarmServers = swarmServers.ToList();
1149
1150 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1151 async ValueTask HealthRequestForServer(SwarmServerInformation swarmServer)
1152 {
1153 using var request = PrepareSwarmRequest(
1154 swarmServer,
1155 HttpMethod.Get,
1156 String.Empty,
1157 null);
1158
1159 try
1160 {
1161 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1162 response.EnsureSuccessStatusCode();
1163 return;
1164 }
1165 catch (Exception ex) when (ex is not OperationCanceledException)
1166 {
1167 logger.LogWarning(
1168 ex,
1169 "Error during swarm server health check on node '{nodeId}'! Unregistering...",
1170 swarmServer.Identifier);
1171 }
1172
1173 lock (swarmServers)
1174 {
1175 swarmServers.Remove(swarmServer);
1176 registrationIdsAndTimes.Remove(swarmServer.Identifier!);
1177 }
1178 }
1179
1181 currentSwarmServers
1182 .Where(node => !node.Controller
1183 && registrationIdsAndTimes.TryGetValue(node.Identifier!, out var registrationAndTime)
1184 && registrationAndTime.RegisteredAt.AddMinutes(SwarmConstants.ControllerHealthCheckIntervalMinutes) < DateTimeOffset.UtcNow)
1185 .Select(HealthRequestForServer));
1186
1187 lock (swarmServers)
1188 if (swarmServers.Count != currentSwarmServers.Count)
1189 MarkServersDirty();
1190
1191 if (serversDirty)
1192 await SendUpdatedServerListToNodes(cancellationToken);
1193 }
1194
1199 {
1200 serversDirty = true;
1201 if (TriggerHealthCheck())
1202 logger.LogTrace("Server list is dirty!");
1203 }
1204
1210 {
1211 var currentTcs = Interlocked.Exchange(ref forceHealthCheckTcs, new TaskCompletionSource());
1212 return currentTcs!.TrySetResult();
1213 }
1214
1220 async ValueTask HealthCheckController(CancellationToken cancellationToken)
1221 {
1222 using var httpClient = httpClientFactory.CreateClient();
1223
1224 if (controllerRegistration.HasValue)
1225 try
1226 {
1227 using var request = PrepareSwarmRequest(
1228 null,
1229 HttpMethod.Get,
1230 String.Empty,
1231 null);
1232 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1233 response.EnsureSuccessStatusCode();
1234 logger.LogTrace("Controller health check successful");
1235 return;
1236 }
1237 catch (Exception ex)
1238 {
1239 logger.LogWarning(ex, "Error during swarm controller health check! Attempting to re-register...");
1240 controllerRegistration = null;
1241 await AbortUpdate();
1242 }
1243
1244 SwarmRegistrationResult registrationResult;
1245 for (var registrationAttempt = 1UL; ; ++registrationAttempt)
1246 {
1247 logger.LogInformation("Swarm re-registration attempt {attemptNumber}...", registrationAttempt);
1248 registrationResult = await RegisterWithController(cancellationToken);
1249
1250 if (registrationResult == SwarmRegistrationResult.Success)
1251 return;
1252
1253 if (registrationResult == SwarmRegistrationResult.Unauthorized)
1254 {
1255 logger.LogError("Swarm re-registration failed, controller's private key has changed!");
1256 break;
1257 }
1258
1259 if (registrationResult == SwarmRegistrationResult.VersionMismatch)
1260 {
1261 logger.LogError("Swarm re-registration failed, controller's TGS version has changed!");
1262 break;
1263 }
1264
1265 await asyncDelayer.Delay(TimeSpan.FromSeconds(5), cancellationToken);
1266 }
1267
1268 // we could do something here... but what?
1269 // best to just let the health check loop keep retrying... we won't be able to update at least
1270 }
1271
1277 async ValueTask<SwarmRegistrationResult> RegisterWithController(CancellationToken cancellationToken)
1278 {
1279 logger.LogInformation("Attempting to register with swarm controller at {controllerAddress}...", swarmConfiguration.ControllerAddress);
1280 var requestedRegistrationId = Guid.NewGuid();
1281
1282 using var httpClient = httpClientFactory.CreateClient();
1283 using var registrationRequest = PrepareSwarmRequest(
1284 null,
1285 HttpMethod.Post,
1288 {
1289 Identifier = swarmConfiguration.Identifier,
1290 Address = swarmConfiguration.Address,
1291 PublicAddress = swarmConfiguration.PublicAddress,
1292 },
1293 requestedRegistrationId);
1294
1295 try
1296 {
1297 using var response = await httpClient.SendAsync(registrationRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
1298 if (response.IsSuccessStatusCode)
1299 {
1300 try
1301 {
1302 var json = await response.Content.ReadAsStringAsync(cancellationToken);
1303 if (json == null)
1304 {
1305 logger.LogDebug("Error reading registration response content stream! Text was null!");
1306 return SwarmRegistrationResult.PayloadFailure;
1307 }
1308
1309 var registrationResponse = JsonConvert.DeserializeObject<SwarmRegistrationResponse>(json);
1310 if (registrationResponse == null)
1311 {
1312 logger.LogDebug("Error reading registration response content stream! Payload was null!");
1313 return SwarmRegistrationResult.PayloadFailure;
1314 }
1315
1316 if (registrationResponse.TokenSigningKeyBase64 == null)
1317 {
1318 logger.LogDebug("Error reading registration response content stream! SigningKey was null!");
1319 return SwarmRegistrationResult.PayloadFailure;
1320 }
1321
1322 tokenFactory.SigningKeyBytes = Convert.FromBase64String(registrationResponse.TokenSigningKeyBase64);
1323 }
1324 catch (Exception ex)
1325 {
1326 logger.LogDebug(ex, "Error reading registration response content stream!");
1327 return SwarmRegistrationResult.PayloadFailure;
1328 }
1329
1330 logger.LogInformation("Sucessfully registered with ID {registrationId}", requestedRegistrationId);
1331 controllerRegistration = requestedRegistrationId;
1332 lastControllerHealthCheck = DateTimeOffset.UtcNow;
1333 return SwarmRegistrationResult.Success;
1334 }
1335
1336 logger.LogWarning("Unable to register with swarm: HTTP {statusCode}!", response.StatusCode);
1337
1338 if (response.StatusCode == HttpStatusCode.Unauthorized)
1339 return SwarmRegistrationResult.Unauthorized;
1340
1341 if (response.StatusCode == HttpStatusCode.UpgradeRequired)
1342 return SwarmRegistrationResult.VersionMismatch;
1343
1344 try
1345 {
1346 var responseData = await response.Content.ReadAsStringAsync(cancellationToken);
1347 if (!String.IsNullOrWhiteSpace(responseData))
1348 logger.LogDebug("Response:{newLine}{responseData}", Environment.NewLine, responseData);
1349 }
1350 catch (Exception ex)
1351 {
1352 logger.LogDebug(ex, "Error reading registration response content stream!");
1353 }
1354 }
1355 catch (Exception ex)
1356 {
1357 logger.LogWarning(ex, "Error sending registration request!");
1358 }
1359
1360 return SwarmRegistrationResult.CommunicationFailure;
1361 }
1362
1368 async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
1369 {
1370 List<SwarmServerInformation> currentSwarmServers;
1371 lock (swarmServers!)
1372 {
1373 serversDirty = false;
1374 currentSwarmServers = swarmServers.ToList();
1375 }
1376
1377 if (currentSwarmServers.Count == 1)
1378 {
1379 logger.LogTrace("Skipping server list broadcast as no nodes are connected!");
1380 return;
1381 }
1382
1383 logger.LogDebug("Sending updated server list to all {nodeCount} nodes...", currentSwarmServers.Count - 1);
1384
1385 using var httpClient = httpClientFactory.CreateClient();
1386 async ValueTask UpdateRequestForServer(SwarmServerInformation swarmServer)
1387 {
1388 using var request = PrepareSwarmRequest(
1389 swarmServer,
1390 HttpMethod.Post,
1391 String.Empty,
1393 {
1394 SwarmServers = currentSwarmServers,
1395 });
1396
1397 try
1398 {
1399 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1400 response.EnsureSuccessStatusCode();
1401 }
1402 catch (Exception ex) when (ex is not OperationCanceledException)
1403 {
1404 logger.LogWarning(ex, "Error during swarm server list update for node '{nodeId}'! Unregistering...", swarmServer.Identifier);
1405
1406 lock (swarmServers)
1407 {
1408 swarmServers.Remove(swarmServer);
1409 registrationIdsAndTimes!.Remove(swarmServer.Identifier!);
1410 }
1411 }
1412 }
1413
1415 currentSwarmServers
1416 .Where(x => !x.Controller)
1417 .Select(UpdateRequestForServer)
1418 .ToList());
1419 }
1420
1430 HttpRequestMessage PrepareSwarmRequest(
1431 SwarmServerInformation? swarmServer,
1432 HttpMethod httpMethod,
1433 string route,
1434 object? body,
1435 Guid? registrationIdOverride = null)
1436 {
1437 swarmServer ??= new SwarmServerInformation
1438 {
1439 Address = swarmConfiguration.ControllerAddress,
1440 };
1441
1442 var fullRoute = $"{SwarmConstants.ControllerRoute}/{route}";
1443 logger.LogTrace(
1444 "{method} {route} to swarm server {nodeIdOrAddress}",
1445 httpMethod,
1446 fullRoute,
1447 swarmServer.Identifier ?? swarmServer.Address!.ToString());
1448
1449 var request = new HttpRequestMessage(
1450 httpMethod,
1451 swarmServer.Address + fullRoute[1..]);
1452 try
1453 {
1454 request.Headers.Accept.Clear();
1455 request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Json));
1456
1457 request.Headers.Add(SwarmConstants.ApiKeyHeader, swarmConfiguration.PrivateKey);
1458 if (registrationIdOverride.HasValue)
1459 request.Headers.Add(SwarmConstants.RegistrationIdHeader, registrationIdOverride.Value.ToString());
1460 else if (swarmController)
1461 {
1462 lock (swarmServers!)
1463 if (registrationIdsAndTimes!.TryGetValue(swarmServer.Identifier!, out var registrationIdAndTime))
1464 request.Headers.Add(SwarmConstants.RegistrationIdHeader, registrationIdAndTime.RegistrationId.ToString());
1465 }
1466 else if (controllerRegistration.HasValue)
1467 request.Headers.Add(SwarmConstants.RegistrationIdHeader, controllerRegistration.Value.ToString());
1468
1469 if (body != null)
1470 request.Content = new StringContent(
1471 JsonConvert.SerializeObject(body, SwarmConstants.SerializerSettings),
1472 Encoding.UTF8,
1473 MediaTypeNames.Application.Json);
1474
1475 return request;
1476 }
1477 catch
1478 {
1479 request.Dispose();
1480 throw;
1481 }
1482 }
1483
1489 async Task HealthCheckLoop(CancellationToken cancellationToken)
1490 {
1491 logger.LogTrace("Starting HealthCheckLoop...");
1492 try
1493 {
1494 var nextForceHealthCheckTask = forceHealthCheckTcs!.Task;
1495 while (!cancellationToken.IsCancellationRequested)
1496 {
1497 TimeSpan delay;
1498 if (swarmController)
1499 delay = TimeSpan.FromMinutes(SwarmConstants.ControllerHealthCheckIntervalMinutes);
1500 else
1501 {
1502 delay = TimeSpan.FromMinutes(SwarmConstants.NodeHealthCheckIntervalMinutes);
1503 if (lastControllerHealthCheck.HasValue)
1504 {
1505 var recommendedTimeOfNextCheck = lastControllerHealthCheck.Value + delay;
1506
1507 if (recommendedTimeOfNextCheck > DateTimeOffset.UtcNow)
1508 delay = recommendedTimeOfNextCheck - DateTimeOffset.UtcNow;
1509 }
1510 }
1511
1512 var delayTask = asyncDelayer.Delay(
1513 delay,
1514 cancellationToken)
1515 .AsTask();
1516
1517 var awakeningTask = Task.WhenAny(
1518 delayTask,
1519 nextForceHealthCheckTask);
1520
1521 await awakeningTask;
1522
1523 if (nextForceHealthCheckTask.IsCompleted && swarmController)
1524 {
1525 // Intentionally wait a few seconds for the other server to start up before interogating it
1526 logger.LogTrace("Next health check triggering in {delaySeconds}s...", SwarmConstants.SecondsToDelayForcedHealthChecks);
1527 await asyncDelayer.Delay(TimeSpan.FromSeconds(SwarmConstants.SecondsToDelayForcedHealthChecks), cancellationToken);
1528 }
1529 else if (!swarmController && !nextForceHealthCheckTask.IsCompleted)
1530 {
1531 if (!lastControllerHealthCheck.HasValue)
1532 {
1533 logger.LogTrace("Not initially registered with controller, skipping health check.");
1534 continue; // unregistered
1535 }
1536
1537 if ((DateTimeOffset.UtcNow - lastControllerHealthCheck.Value).TotalMinutes < SwarmConstants.NodeHealthCheckIntervalMinutes)
1538 {
1539 logger.LogTrace("Controller seems to be active, skipping health check.");
1540 continue;
1541 }
1542 }
1543
1544 nextForceHealthCheckTask = forceHealthCheckTcs.Task;
1545
1546 logger.LogTrace("Performing swarm health check...");
1547 try
1548 {
1549 if (swarmController)
1550 await HealthCheckNodes(cancellationToken);
1551 else
1552 await HealthCheckController(cancellationToken);
1553 }
1554 catch (Exception ex) when (ex is not OperationCanceledException)
1555 {
1556 logger.LogError(ex, "Health check error!");
1557 }
1558 }
1559 }
1560 catch (OperationCanceledException ex)
1561 {
1562 logger.LogTrace(ex, "Health check loop cancelled!");
1563 }
1564
1565 logger.LogTrace("Stopped HealthCheckLoop");
1566 }
1567
1573 string? NodeIdentifierFromRegistration(Guid registrationId)
1574 {
1575 if (!swarmController)
1576 throw new InvalidOperationException("NodeIdentifierFromRegistration on node!");
1577
1578 lock (swarmServers!)
1579 {
1580 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1581 var exists = registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId);
1582 if (!exists)
1583 {
1584 logger.LogWarning("A node that was to be looked up ({registrationId}) disappeared from our records!", registrationId);
1585 return null;
1586 }
1587
1588 return registrationIdsAndTimes.First(x => x.Value.RegistrationId == registrationId).Key;
1589 }
1590 }
1591 }
1592}
Information about a server in the swarm.
virtual ? Uri PublicAddress
The address the swarm server can be publically accessed.
virtual ? Uri Address
The public address of the server.
string? Identifier
The server's identifier.
Represents information about a running SwarmServer.
Response for when file transfers are necessary.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Configuration for the server swarm system.
uint UpdateRequiredNodeCount
The number of nodes in addition to the controller required to be connected a server swarm before perf...
Uri? ControllerAddress
The SwarmServer.Address of the swarm controller. If null, the current server is considered the contro...
A IFileStreamProvider that represents the response of HttpRequestMessages.
Attribute for bringing in the master versions list from MSBuild that aren't embedded into assemblies ...
string RawSwarmProtocolVersion
The Version string of the TGS swarm protocol.
static MasterVersionsAttribute Instance
Return the Assembly's instance of the MasterVersionsAttribute.
Constants used by the swarm system.
const int ControllerHealthCheckIntervalMinutes
Interval at which the swarm controller makes health checks on nodes.
static JsonSerializerSettings SerializerSettings
See JsonSerializerSettings for the swarm system.
const string ApiKeyHeader
The header used to pass in the Configuration.SwarmConfiguration.PrivateKey.
const string UpdateRoute
The route used for swarm updates.
const string RegisterRoute
The route used for swarm registration.
const int NodeHealthCheckIntervalMinutes
Interval at which the node makes health checks on the controller if it has not received one.
const string RegistrationIdHeader
The header used to pass in swarm registration IDs.
const int UpdateCommitTimeoutMinutes
Number of minutes the controller waits to receive a ready-commit from all nodes before aborting an up...
const int SecondsToDelayForcedHealthChecks
Number of seconds between a health check global::System.Threading.Tasks.TaskCompletionSource triggeri...
A request to register with a swarm controller.
A request to update a nodes list of SwarmServers.
Helps keep servers connected to the same database in sync by coordinating updates.
volatile? TaskCompletionSource forceHealthCheckTcs
A TaskCompletionSource that is used to force a health check.
HttpRequestMessage PrepareSwarmRequest(SwarmServerInformation? swarmServer, HttpMethod httpMethod, string route, object? body, Guid? registrationIdOverride=null)
Prepares a HttpRequestMessage for swarm communication.
async ValueTask< SwarmPrepareResult > ControllerDistributedPrepareUpdate(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, SwarmUpdateOperation currentUpdateOperation, CancellationToken cancellationToken)
Send a given updateRequest out to nodes from the swarm controller.
void UpdateSwarmServersList(IEnumerable< SwarmServerInformation > swarmServers)
Pass in an updated list of swarmServers to the node.
string? NodeIdentifierFromRegistration(Guid registrationId)
Gets the SwarmServer.Identifier from a given registrationId .
readonly bool swarmController
If the current server is the swarm controller.
async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
Attempt to unregister a node with a given registrationId with the controller.A ValueTask representin...
bool ValidateRegistration(Guid registrationId)
Validate a given registrationId .true if the registration is valid, false otherwise.
readonly IFileTransferTicketProvider transferService
The IFileTransferTicketProvider for the SwarmService.
readonly? List< SwarmServerInformation > swarmServers
List<T> of connected SwarmServerInformations.
readonly IAsyncDelayer asyncDelayer
The IAsyncDelayer for the SwarmService.
ValueTask RemoteAbortUpdate()
Sends out remote abort update requests.
async ValueTask< SwarmCommitResult > CommitUpdate(CancellationToken cancellationToken)
Signal to the swarm that an update is ready to be applied.A ValueTask<TResult> resulting in the Swarm...
bool SwarmMode
If the swarm system is enabled.
async ValueTask< bool > RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
Notify the controller that the node with the given registrationId is ready to commit or notify the n...
async ValueTask< SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
Attempt to register a given node with the controller.A ValueTask<TResult> resulting in a SwarmRegist...
bool ExpectedNumberOfNodesConnected
Gets a value indicating if the expected amount of nodes are connected to the swarm.
async Task HealthCheckLoop(CancellationToken cancellationToken)
Timed loop for calling HealthCheckNodes(CancellationToken).
async ValueTask< SwarmPrepareResult > PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Implementation of PrepareUpdate(ISeekableFileStreamProvider, Version, CancellationToken),...
async ValueTask< SwarmRegistrationResult > RegisterWithController(CancellationToken cancellationToken)
Attempt to register the node with the controller.
async ValueTask Shutdown(CancellationToken cancellationToken)
Deregister with the swarm controller or put clients into querying state.A ValueTask representing the ...
readonly IDatabaseContextFactory databaseContextFactory
The IDatabaseContextFactory for the SwarmService.
async ValueTask HealthCheckController(CancellationToken cancellationToken)
Ping the swarm controller to see that it is still running. If need be, reregister.
readonly ITokenFactory tokenFactory
The ITokenFactory for the SwarmService.
async ValueTask< bool > PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Notify the node of an update request from the controller.A ValueTask<TResult> resulting in true if th...
readonly? CancellationTokenSource serverHealthCheckCancellationTokenSource
The CancellationTokenSource for serverHealthCheckTask.
async ValueTask< SwarmRegistrationResult > Initialize(CancellationToken cancellationToken)
Attempt to register with the swarm controller if not one, sets up the database otherwise....
RequestFileStreamProvider CreateUpdateStreamProvider(SwarmServerInformation sourceNode, FileTicketResponse ticket)
Create the RequestFileStreamProvider for an update package retrieval from a given sourceNode .
async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
Sends the controllers list of nodes to all nodes.
readonly? Dictionary< string,(Guid RegistrationId, DateTimeOffset RegisteredAt)> registrationIdsAndTimes
Dictionary<TKey, TValue> of SwarmServer.Identifiers to registration Guids and when they were created.
ValueTask< SwarmPrepareResult > PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
Signal to the swarm that an update is requested.A ValueTask<TResult> resulting in true if the update ...
async ValueTask< Dictionary< string, FileTicketResponse > > CreateDownloadTickets(ISeekableFileStreamProvider initiatorProvider, IReadOnlyCollection< SwarmServerInformation > involvedServers, CancellationToken cancellationToken)
Create a FileTicketResponse for downloading the content of a given initiatorProvider for the rest of...
void MarkServersDirty()
Set serversDirty and complete the current forceHealthCheckTcs.
bool serversDirty
If the swarmServers list has been updated and needs to be resent to clients.
readonly ILogger< SwarmService > logger
The ILogger for the SwarmService.
SwarmService(IDatabaseContextFactory databaseContextFactory, IDatabaseSeeder databaseSeeder, IAssemblyInformationProvider assemblyInformationProvider, IHttpClientFactory httpClientFactory, IAsyncDelayer asyncDelayer, IServerUpdater serverUpdater, IFileTransferTicketProvider transferService, ITokenFactory tokenFactory, IOptions< SwarmConfiguration > swarmConfigurationOptions, ILogger< SwarmService > logger)
Initializes a new instance of the SwarmService class.
readonly SwarmConfiguration swarmConfiguration
The SwarmConfiguration for the SwarmService.
List< SwarmServerInformation >? GetSwarmServers()
Gets the list of SwarmServerInformations in the swarm, including the current one.A List<T> of SwarmSe...
readonly IAssemblyInformationProvider assemblyInformationProvider
The IAssemblyInformationProvider for the SwarmService.
Task? serverHealthCheckTask
The Task for the HealthCheckLoop(CancellationToken).
async ValueTask AbortUpdate()
Attempt to abort an uncommitted update.A ValueTask representing the running operation....
volatile? SwarmUpdateOperation updateOperation
A SwarmUpdateOperation that is currently in progress.
Guid? controllerRegistration
The registration Guid provided by the swarm controller.
DateTimeOffset? lastControllerHealthCheck
The last DateTimeOffset when the controller checked on this node.
readonly IHttpClientFactory httpClientFactory
The IHttpClientFactory for the SwarmService.
readonly IServerUpdater serverUpdater
The IServerUpdater for the SwarmService.
readonly IDatabaseSeeder databaseSeeder
The IDatabaseSeeder for the SwarmService.
bool TriggerHealthCheck()
Complete the current forceHealthCheckTcs.
async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
Ping each node to see that they are still running.
Represents the state of a distributed swarm update.
Version TargetVersion
The Version being updated to.
IReadOnlyList< SwarmServerInformation > InvolvedServers
All of the SwarmServers that are involved in the updates.
A request to update the swarm's TGS version.
Dictionary< string, FileTicketResponse >? DownloadTickets
The map of Api.Models.Internal.SwarmServer.Identifiers to FileTicketResponses for retrieving the upda...
string? SourceNode
The Api.Models.Internal.SwarmServer.Identifier of the node to download the update package from.
Version? UpdateVersion
The TGS Version to update to.
Represents a file on disk to be downloaded.
ValueTask< ServerUpdateResult > BeginUpdate(ISwarmService swarmService, IFileStreamProvider? fileStreamProvider, Version version, CancellationToken cancellationToken)
Start the process of downloading and applying an update to a new server version .
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
For initially setting up a database.
ValueTask Downgrade(IDatabaseContext databaseContext, Version downgradeVersion, CancellationToken cancellationToken)
Migrate a given databaseContext down.
ValueTask Initialize(IDatabaseContext databaseContext, CancellationToken cancellationToken)
Setup up a given databaseContext .
ValueTask< Stream > GetResult(CancellationToken cancellationToken)
Gets the provided Stream. May be called multiple times, though cancelling any may cause all calls to ...
IFileStreamProvider that provides MemoryStreams.
bool Disposed
If the ISeekableFileStreamProvider has had global::System.IAsyncDisposable.DisposeAsync called on it.
ValueTask< MemoryStream > GetOwnedResult(CancellationToken cancellationToken)
Gets the provided MemoryStream. May be called multiple times, though cancelling any may cause all cal...
ReadOnlySpan< byte > SigningKeyBytes
Gets or sets the ITokenFactory's signing key bytes.
Swarm service operations for the Controllers.SwarmController.
Start and stop controllers for a swarm service.
Used for swarm operations. Functions may be no-op based on configuration.
Service for temporarily storing files to be downloaded or uploaded.
ValueTask Delay(TimeSpan timeSpan, CancellationToken cancellationToken)
Create a Task that completes after a given timeSpan .
ServerUpdateResult
The result of a call to start a server update.
SwarmUpdateAbortResult
Result of attempting to abort a SwarmUpdateOperation.
SwarmCommitResult
How to proceed on the commit step of an update.
@ AbortUpdate
The update should be aborted.
SwarmRegistrationResult
Result of attempting to register with a swarm controller.
SwarmPrepareResult
Indicates the result of a swarm update prepare operation.