tgstation-server 6.12.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
SwarmService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics.CodeAnalysis;
4using System.Linq;
5using System.Net;
6using System.Net.Http;
7using System.Net.Http.Headers;
8using System.Net.Mime;
9using System.Text;
10using System.Threading;
11using System.Threading.Tasks;
12using System.Web;
13
14using Microsoft.Extensions.Logging;
15using Microsoft.Extensions.Options;
16
17using Newtonsoft.Json;
18
32
34{
38#pragma warning disable CA1506 // TODO: Decomplexify
40#pragma warning restore CA1506
41 {
44 {
45 get
46 {
47 if (!SwarmMode)
48 return true;
49
50 lock (swarmServers)
51 return swarmServers.Count - 1 >= swarmConfiguration.UpdateRequiredNodeCount;
52 }
53 }
54
58 [MemberNotNullWhen(true, nameof(serverHealthCheckTask), nameof(forceHealthCheckTcs), nameof(serverHealthCheckCancellationTokenSource), nameof(swarmServers))]
59 bool SwarmMode => swarmConfiguration.PrivateKey != null;
60
65
70
75
80
85
90
95
100
104 readonly ILogger<SwarmService> logger;
105
110
114 readonly CancellationTokenSource? serverHealthCheckCancellationTokenSource;
115
119 readonly List<SwarmServerInformation>? swarmServers;
120
124 readonly Dictionary<string, (Guid RegistrationId, DateTimeOffset RegisteredAt)>? registrationIdsAndTimes;
125
129 readonly bool swarmController;
130
135
139 volatile TaskCompletionSource? forceHealthCheckTcs;
140
145
150
155
160
183 IOptions<SwarmConfiguration> swarmConfigurationOptions,
184 ILogger<SwarmService> logger)
185 {
186 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
187 this.databaseSeeder = databaseSeeder ?? throw new ArgumentNullException(nameof(databaseSeeder));
188 this.assemblyInformationProvider = assemblyInformationProvider ?? throw new ArgumentNullException(nameof(assemblyInformationProvider));
189 this.httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
190 this.asyncDelayer = asyncDelayer ?? throw new ArgumentNullException(nameof(asyncDelayer));
191 this.serverUpdater = serverUpdater ?? throw new ArgumentNullException(nameof(serverUpdater));
192 this.transferService = transferService ?? throw new ArgumentNullException(nameof(transferService));
193 this.tokenFactory = tokenFactory ?? throw new ArgumentNullException(nameof(tokenFactory));
194 swarmConfiguration = swarmConfigurationOptions?.Value ?? throw new ArgumentNullException(nameof(swarmConfigurationOptions));
195 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
196
197 if (SwarmMode)
198 {
199 if (swarmConfiguration.Address == null)
200 throw new InvalidOperationException("Swarm configuration missing Address!");
201
202 if (String.IsNullOrWhiteSpace(swarmConfiguration.Identifier))
203 throw new InvalidOperationException("Swarm configuration missing Identifier!");
204
205 swarmController = swarmConfiguration.ControllerAddress == null;
206 if (swarmController)
208
209 serverHealthCheckCancellationTokenSource = new CancellationTokenSource();
210 forceHealthCheckTcs = new TaskCompletionSource();
211
212 swarmServers = new List<SwarmServerInformation>
213 {
214 new()
215 {
216 Address = swarmConfiguration.Address,
217 PublicAddress = swarmConfiguration.PublicAddress,
219 Identifier = swarmConfiguration.Identifier,
220 },
221 };
222 }
223 else
224 swarmController = true;
225 }
226
229
231 public async ValueTask AbortUpdate()
232 {
233 if (!SwarmMode)
234 return;
235
236 var localUpdateOperation = Interlocked.Exchange(ref updateOperation, null);
237 var abortResult = localUpdateOperation?.Abort();
238 switch (abortResult)
239 {
240 case SwarmUpdateAbortResult.Aborted:
241 break;
242 case SwarmUpdateAbortResult.AlreadyAborted:
243 logger.LogDebug("Another context already aborted this update.");
244 return;
245 case SwarmUpdateAbortResult.CantAbortCommitted:
246 logger.LogDebug("Not aborting update because we have committed!");
247 return;
248 case null:
249 logger.LogTrace("Attempted update abort but no operation was found!");
250 return;
251 default:
252 throw new InvalidOperationException($"Invalid return value for SwarmUpdateOperation.Abort(): {abortResult}");
253 }
254
255 await RemoteAbortUpdate();
256 }
257
259 public async ValueTask<SwarmCommitResult> CommitUpdate(CancellationToken cancellationToken)
260 {
261 if (!SwarmMode)
262 return SwarmCommitResult.ContinueUpdateNonCommitted;
263
264 // wait for the update commit TCS
265 var localUpdateOperation = updateOperation;
266 if (localUpdateOperation == null)
267 {
268 logger.LogDebug("Update commit failed, no pending operation!");
269 await AbortUpdate(); // unnecessary, but can never be too safe
270 return SwarmCommitResult.AbortUpdate;
271 }
272
273 logger.LogInformation("Waiting to commit update...");
274 using var httpClient = httpClientFactory.CreateClient();
275 if (!swarmController)
276 {
277 // let the controller know we're ready
278 logger.LogTrace("Sending ready-commit to swarm controller...");
279 using var commitReadyRequest = PrepareSwarmRequest(
280 null,
281 HttpMethod.Post,
283 null);
284
285 try
286 {
287 using var commitReadyResponse = await httpClient.SendAsync(commitReadyRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
288 commitReadyResponse.EnsureSuccessStatusCode();
289 }
290 catch (Exception ex)
291 {
292 logger.LogWarning(ex, "Unable to send ready-commit to swarm controller!");
293 await AbortUpdate();
294 return SwarmCommitResult.AbortUpdate;
295 }
296 }
297
298 var timeoutTask = swarmController
300 TimeSpan.FromMinutes(SwarmConstants.UpdateCommitTimeoutMinutes),
301 cancellationToken).AsTask()
302 : Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
303
304 var commitTask = Task.WhenAny(localUpdateOperation.CommitGate, timeoutTask);
305
306 await commitTask;
307
308 var commitGoAhead = localUpdateOperation.CommitGate.IsCompleted
309 && localUpdateOperation.CommitGate.Result
310 && localUpdateOperation == updateOperation;
311 if (!commitGoAhead)
312 {
313 logger.LogDebug(
314 "Update commit failed!{maybeTimeout}",
315 timeoutTask.IsCompleted
316 ? " Timed out!"
317 : String.Empty);
318 await AbortUpdate();
319 return SwarmCommitResult.AbortUpdate;
320 }
321
322 logger.LogTrace("Update commit task complete");
323
324 // on nodes, it means we can go straight ahead
325 if (!swarmController)
326 return SwarmCommitResult.MustCommitUpdate;
327
328 // on the controller, we first need to signal for nodes to go ahead
329 // if anything fails at this point, there's nothing we can do
330 logger.LogDebug("Sending remote commit message to nodes...");
331 async ValueTask SendRemoteCommitUpdate(SwarmServerInformation swarmServer)
332 {
333 using var request = PrepareSwarmRequest(
334 swarmServer,
335 HttpMethod.Post,
337 null);
338
339 try
340 {
341 // I know using the cancellationToken after this point doesn't seem very sane
342 // It's the token for Ctrl+C on server's console though, so we must respect it
343 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
344 response.EnsureSuccessStatusCode();
345 }
346 catch (Exception ex)
347 {
348 logger.LogCritical(ex, "Failed to send update commit request to node {nodeId}!", swarmServer.Identifier);
349 }
350 }
351
352 ValueTask task;
353 lock (swarmServers)
356 .Where(x => !x.Controller)
357 .Select(SendRemoteCommitUpdate)
358 .ToList());
359
360 await task;
361 return SwarmCommitResult.MustCommitUpdate;
362 }
363
365 public List<SwarmServerInformation>? GetSwarmServers()
366 {
367 if (!SwarmMode)
368 return null;
369
370 lock (swarmServers)
371 return swarmServers.ToList();
372 }
373
375 public ValueTask<SwarmPrepareResult> PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
376 {
377 ArgumentNullException.ThrowIfNull(fileStreamProvider);
378
379 ArgumentNullException.ThrowIfNull(version);
380
381 logger.LogTrace("Begin PrepareUpdate...");
382 return PrepareUpdateImpl(
383 fileStreamProvider,
385 {
386 UpdateVersion = version,
387 },
388 cancellationToken);
389 }
390
392 public async ValueTask<bool> PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
393 {
394 ArgumentNullException.ThrowIfNull(updateRequest);
395
396 logger.LogInformation("Received remote update request from {nodeType}", !swarmController ? "controller" : "node");
397 var result = await PrepareUpdateImpl(
398 null,
399 updateRequest,
400 cancellationToken);
401
402 return result != SwarmPrepareResult.Failure;
403 }
404
406 public async ValueTask<SwarmRegistrationResult> Initialize(CancellationToken cancellationToken)
407 {
408 if (SwarmMode)
409 logger.LogInformation(
410 "Swarm mode enabled: {nodeType} {nodeId}",
412 ? "Controller"
413 : "Node",
415 else
416 logger.LogTrace("Swarm mode disabled");
417
419 if (swarmController)
420 {
422 logger.LogInformation("Expecting connections from {expectedNodeCount} nodes", swarmConfiguration.UpdateRequiredNodeCount);
423
425 databaseContext => databaseSeeder.Initialize(databaseContext, cancellationToken));
426
427 result = SwarmRegistrationResult.Success;
428 }
429 else
430 result = await RegisterWithController(cancellationToken);
431
432 if (SwarmMode && result == SwarmRegistrationResult.Success)
434
435 return result;
436 }
437
439 public async ValueTask Shutdown(CancellationToken cancellationToken)
440 {
441 logger.LogTrace("Begin Shutdown");
442
443 async ValueTask SendUnregistrationRequest(SwarmServerInformation? swarmServer)
444 {
445 using var httpClient = httpClientFactory.CreateClient();
446 using var request = PrepareSwarmRequest(
447 swarmServer,
448 HttpMethod.Delete,
450 null);
451
452 try
453 {
454 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
455 response.EnsureSuccessStatusCode();
456 }
457 catch (Exception ex)
458 {
459 logger.LogWarning(
460 ex,
461 "Error unregistering {nodeType}!",
462 swarmServer != null
463 ? $"node {swarmServer.Identifier}"
464 : "from controller");
465 }
466 }
467
468 if (SwarmMode && serverHealthCheckTask != null)
469 {
472 }
473
474 if (!swarmController)
475 {
476 if (controllerRegistration != null)
477 {
478 logger.LogInformation("Unregistering from swarm controller...");
479 await SendUnregistrationRequest(null);
480 }
481
482 return;
483 }
484
485 // We're in a single-threaded-like context now so touching updateOperation directly is fine
486
487 // downgrade the db if necessary
488 if (updateOperation != null
491 db => databaseSeeder.Downgrade(db, updateOperation.TargetVersion, cancellationToken));
492
493 if (SwarmMode)
494 {
495 // Put the nodes into a reconnecting state
496 if (updateOperation == null)
497 {
498 logger.LogInformation("Unregistering nodes...");
499 ValueTask task;
500 lock (swarmServers)
501 {
504 .Where(x => !x.Controller)
505 .Select(SendUnregistrationRequest)
506 .ToList());
507 swarmServers.RemoveRange(1, swarmServers.Count - 1);
509 }
510
511 await task;
512 }
513
514 logger.LogTrace("Swarm controller shutdown");
515 }
516 }
517
519 public void UpdateSwarmServersList(IEnumerable<SwarmServerInformation> swarmServers)
520 {
521 ArgumentNullException.ThrowIfNull(swarmServers);
522
523 if (!SwarmMode)
524 throw new InvalidOperationException("Swarm mode not enabled!");
525
526 if (swarmController)
527 throw new InvalidOperationException("Cannot UpdateSwarmServersList on swarm controller!");
528
529 lock (this.swarmServers)
530 {
531 this.swarmServers.Clear();
532 this.swarmServers.AddRange(swarmServers);
533 logger.LogDebug("Updated swarm server list with {nodeCount} total nodes", this.swarmServers.Count);
534 }
535 }
536
538 public bool ValidateRegistration(Guid registrationId)
539 {
540 if (!SwarmMode)
541 throw new InvalidOperationException("Swarm mode not enabled!");
542
543 if (swarmController)
544 lock (swarmServers)
545 return registrationIdsAndTimes!.Values.Any(x => x.RegistrationId == registrationId);
546
547 if (registrationId != controllerRegistration)
548 return false;
549
550 lastControllerHealthCheck = DateTimeOffset.UtcNow;
551 return true;
552 }
553
555 public async ValueTask<SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
556 {
557 ArgumentNullException.ThrowIfNull(node);
558
559 if (node.Identifier == null)
560 throw new ArgumentException("Node missing Identifier!", nameof(node));
561
562 if (node.Address == null)
563 throw new ArgumentException("Node missing Address!", nameof(node));
564
565 if (!SwarmMode)
566 throw new InvalidOperationException("Swarm mode not enabled!");
567
568 if (!swarmController)
569 throw new InvalidOperationException("Cannot RegisterNode on swarm node!");
570
571 logger.LogTrace("RegisterNode");
572
573 await AbortUpdate();
574
575 SwarmRegistrationResponse CreateResponse() => new()
576 {
577 TokenSigningKeyBase64 = Convert.ToBase64String(tokenFactory.SigningKeyBytes),
578 };
579
580 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
581 lock (swarmServers)
582 {
583 if (registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId))
584 {
585 var preExistingRegistrationKvp = registrationIdsAndTimes.FirstOrDefault(x => x.Value.RegistrationId == registrationId);
586 if (preExistingRegistrationKvp.Key == node.Identifier)
587 {
588 logger.LogWarning("Node {nodeId} has already registered!", node.Identifier);
589 return CreateResponse();
590 }
591
592 logger.LogWarning(
593 "Registration ID collision! Node {nodeId} tried to register with {otherNodeId}'s registration ID: {registrationId}",
594 node.Identifier,
595 preExistingRegistrationKvp.Key,
596 registrationId);
597 return null;
598 }
599
600 if (registrationIdsAndTimes.TryGetValue(node.Identifier, out var oldRegistration))
601 {
602 logger.LogInformation("Node {nodeId} is re-registering without first unregistering. Indicative of restart.", node.Identifier);
603 swarmServers.RemoveAll(x => x.Identifier == node.Identifier);
605 }
606
608 {
609 PublicAddress = node.PublicAddress,
610 Address = node.Address,
611 Identifier = node.Identifier,
612 Controller = false,
613 });
614 registrationIdsAndTimes.Add(node.Identifier, (RegistrationId: registrationId, DateTimeOffset.UtcNow));
615 }
616
617 logger.LogInformation("Registered node {nodeId} ({nodeIP}) with ID {registrationId}", node.Identifier, node.Address, registrationId);
619 return CreateResponse();
620 }
621
623 public async ValueTask<bool> RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
624 {
625 var localUpdateOperation = updateOperation;
626 if (!swarmController)
627 {
628 logger.LogDebug("Received remote commit go ahead");
629 return localUpdateOperation?.Commit() == true;
630 }
631
632 var nodeIdentifier = NodeIdentifierFromRegistration(registrationId);
633 if (nodeIdentifier == null)
634 {
635 // Something fucky is happening, take no chances.
636 logger.LogError("Aborting update due to unforseen circumstances!");
637 await AbortUpdate();
638 return false;
639 }
640
641 if (localUpdateOperation == null)
642 {
643 logger.LogDebug("Ignoring ready-commit from node {nodeId} as the update appears to have been aborted.", nodeIdentifier);
644 return false;
645 }
646
647 if (!localUpdateOperation.MarkNodeReady(nodeIdentifier))
648 {
649 logger.LogError(
650 "Attempting to mark {nodeId} as ready to commit resulted in the update being aborted!",
651 nodeIdentifier);
652
653 // bit racy here, localUpdateOperation has already been aborted.
654 // now if, FOR SOME GODFORSAKEN REASON, there's a new update operation, abort that too.
655 if (Interlocked.CompareExchange(ref updateOperation, null, localUpdateOperation) == localUpdateOperation)
656 await RemoteAbortUpdate();
657 else
658 {
659 // marking as an error because how the actual fuck
660 logger.LogError("Aborting new update due to unforseen consequences!");
661 await AbortUpdate();
662 }
663
664 return false;
665 }
666
667 logger.LogDebug("Node {nodeId} is ready to commit.", nodeIdentifier);
668 return true;
669 }
670
672 public async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
673 {
674 if (!SwarmMode)
675 throw new InvalidOperationException("Swarm mode not enabled!");
676
677 logger.LogTrace("UnregisterNode {registrationId}", registrationId);
678 await AbortUpdate();
679
680 if (!swarmController)
681 {
682 // immediately trigger a health check
683 logger.LogInformation("Controller unregistering, will attempt re-registration...");
686 return;
687 }
688
689 var nodeIdentifier = NodeIdentifierFromRegistration(registrationId);
690 if (nodeIdentifier == null)
691 return;
692
693 logger.LogInformation("Unregistering node {nodeId}...", nodeIdentifier);
694
695 lock (swarmServers)
696 {
697 swarmServers.RemoveAll(x => x.Identifier == nodeIdentifier);
698 registrationIdsAndTimes!.Remove(nodeIdentifier);
699 }
700
702 }
703
710 {
711 logger.LogInformation("Aborting swarm update!");
712
713 using var httpClient = httpClientFactory.CreateClient();
714 async ValueTask SendRemoteAbort(SwarmServerInformation swarmServer)
715 {
716 using var request = PrepareSwarmRequest(
717 swarmServer,
718 HttpMethod.Delete,
720 null);
721
722 try
723 {
724 // DCT: Intentionally should not be cancelled
725 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
726 response.EnsureSuccessStatusCode();
727 }
728 catch (Exception ex)
729 {
730 logger.LogWarning(
731 ex,
732 "Unable to send remote abort to {nodeOrController}!",
734 ? $"node {swarmServer.Identifier}"
735 : "controller");
736 }
737 }
738
739 if (!swarmController)
740 return SendRemoteAbort(new SwarmServerInformation
741 {
743 });
744
745 lock (swarmServers!)
748 .Where(x => !x.Controller)
749 .Select(SendRemoteAbort)
750 .ToList());
751 }
752
760 {
761 var httpClient = httpClientFactory.CreateClient();
762 try
763 {
764 var request = PrepareSwarmRequest(
765 sourceNode,
766 HttpMethod.Get,
767 $"{SwarmConstants.UpdateRoute}?ticket={HttpUtility.UrlEncode(ticket.FileTicket)}",
768 null);
769
770 try
771 {
772 request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Octet));
773 return new RequestFileStreamProvider(httpClient, request);
774 }
775 catch
776 {
777 request.Dispose();
778 throw;
779 }
780 }
781 catch
782 {
783 httpClient.Dispose();
784 throw;
785 }
786 }
787
795 async ValueTask<SwarmPrepareResult> PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
796 {
797 var version = updateRequest.UpdateVersion!;
798 if (!SwarmMode)
799 {
800 // we still need an active update operation for the TargetVersion
802 return SwarmPrepareResult.SuccessProviderNotRequired;
803 }
804
805 var initiator = initiatorProvider != null;
806 logger.LogTrace("PrepareUpdateImpl {version}...", version);
807
808 var shouldAbort = false;
809 SwarmUpdateOperation localUpdateOperation;
810 try
811 {
812 SwarmServerInformation? sourceNode = null;
813 List<SwarmServerInformation> currentNodes;
814 lock (swarmServers)
815 {
816 currentNodes = swarmServers
817 .Select(node =>
818 {
819 if (node.Identifier == updateRequest.SourceNode)
820 sourceNode = node;
821
822 return node;
823 })
824 .ToList();
825 if (swarmController)
826 localUpdateOperation = new SwarmUpdateOperation(
827 version,
828 currentNodes);
829 else
830 localUpdateOperation = new SwarmUpdateOperation(version);
831 }
832
833 var existingUpdateOperation = Interlocked.CompareExchange(ref updateOperation, localUpdateOperation, null);
834 if (existingUpdateOperation != null && existingUpdateOperation.TargetVersion != version)
835 {
836 logger.LogWarning("Aborting update preparation, version {targetUpdateVersion} already prepared!", existingUpdateOperation.TargetVersion);
837 shouldAbort = true;
838 return SwarmPrepareResult.Failure;
839 }
840
841 if (existingUpdateOperation?.TargetVersion == version)
842 {
843 logger.LogTrace("PrepareUpdateImpl early out, already prepared!");
844 return SwarmPrepareResult.SuccessProviderNotRequired;
845 }
846
847 if (!swarmController && initiator)
848 {
849 var downloadTickets = await CreateDownloadTickets(initiatorProvider!, currentNodes, cancellationToken); // condition of initiator
850
851 logger.LogInformation("Forwarding update request to swarm controller...");
852 using var httpClient = httpClientFactory.CreateClient();
853 using var request = PrepareSwarmRequest(
854 null,
855 HttpMethod.Put,
858 {
859 UpdateVersion = version,
860 SourceNode = swarmConfiguration.Identifier,
861 DownloadTickets = downloadTickets,
862 });
863
864 // File transfer service will hold the necessary streams
865 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
866 if (response.IsSuccessStatusCode)
867 return SwarmPrepareResult.SuccessHoldProviderUntilCommit;
868
869 shouldAbort = true;
870 return SwarmPrepareResult.Failure;
871 }
872
873 if (!initiator)
874 {
875 logger.LogTrace("Beginning local update process...");
876
877 if (sourceNode == null)
878 {
879 logger.Log(
881 ? LogLevel.Error
882 : LogLevel.Warning,
883 "Missing local node entry for update source node: {sourceNode}",
884 updateRequest.SourceNode);
885 shouldAbort = true;
886 return SwarmPrepareResult.Failure;
887 }
888
889 if (updateRequest.DownloadTickets == null)
890 {
891 logger.LogError("Missing download tickets in update request!");
892 return SwarmPrepareResult.Failure;
893 }
894
895 if (!updateRequest.DownloadTickets.TryGetValue(swarmConfiguration.Identifier!, out var ticket))
896 {
897 logger.Log(
899 ? LogLevel.Error
900 : LogLevel.Warning,
901 "Missing node entry for download ticket in update request!");
902 shouldAbort = true;
903 return SwarmPrepareResult.Failure;
904 }
905
906 ServerUpdateResult updateApplyResult;
907 var downloaderStream = CreateUpdateStreamProvider(sourceNode, ticket);
908 try
909 {
910 updateApplyResult = await serverUpdater.BeginUpdate(
911 this,
912 downloaderStream,
913 version,
914 cancellationToken);
915 }
916 catch
917 {
918 await downloaderStream.DisposeAsync();
919 throw;
920 }
921
922 if (updateApplyResult != ServerUpdateResult.Started)
923 {
924 logger.LogWarning("Failed to prepare update! Result: {serverUpdateResult}", updateApplyResult);
925 shouldAbort = true;
926 return SwarmPrepareResult.Failure;
927 }
928 }
929 else
930 logger.LogTrace("No need to re-initiate update as it originated here on the swarm controller");
931
932 logger.LogDebug("Local node prepared for update to version {version}", version);
933 }
934 catch (Exception ex)
935 {
936 logger.LogWarning(ex, "Failed to prepare update!");
937 shouldAbort = true;
938 return SwarmPrepareResult.Failure;
939 }
940 finally
941 {
942 if (shouldAbort)
943 await AbortUpdate();
944 }
945
946 if (!swarmController)
947 return SwarmPrepareResult.SuccessProviderNotRequired;
948
950 initiatorProvider,
951 updateRequest,
952 localUpdateOperation,
953 cancellationToken);
954 }
955
964 async ValueTask<SwarmPrepareResult> ControllerDistributedPrepareUpdate(
965 ISeekableFileStreamProvider? initiatorProvider,
966 SwarmUpdateRequest updateRequest,
967 SwarmUpdateOperation currentUpdateOperation,
968 CancellationToken cancellationToken)
969 {
970 bool abortUpdate = false;
971 try
972 {
973 logger.LogInformation("Sending remote prepare to nodes...");
974
975 if (currentUpdateOperation.InvolvedServers.Count - 1 < swarmConfiguration.UpdateRequiredNodeCount)
976 {
977 logger.LogWarning(
978 "Aborting update, controller expects to be in sync with {requiredNodeCount} nodes but currently only has {currentNodeCount}!",
980 currentUpdateOperation.InvolvedServers.Count - 1);
981 abortUpdate = true;
982 return SwarmPrepareResult.Failure;
983 }
984
985 var weAreInitiator = initiatorProvider != null;
986 if (currentUpdateOperation.InvolvedServers.Count == 1)
987 {
988 logger.LogDebug("Controller has no nodes, setting commit-ready.");
989 if (updateOperation?.Commit() == true)
990 return SwarmPrepareResult.SuccessProviderNotRequired;
991
992 logger.LogDebug("Update appears to have been aborted");
993 return SwarmPrepareResult.Failure;
994 }
995
996 // The initiator node obviously doesn't create a ticket for itself
997 else if (!weAreInitiator && updateRequest.DownloadTickets!.Count != currentUpdateOperation.InvolvedServers.Count - 1)
998 {
999 logger.LogWarning(
1000 "Aborting update, {receivedTickets} download tickets were provided but there are {nodesToUpdate} nodes in the swarm that require the package!",
1001 updateRequest.DownloadTickets.Count,
1002 currentUpdateOperation.InvolvedServers.Count);
1003 abortUpdate = true;
1004 return SwarmPrepareResult.Failure;
1005 }
1006
1007 var downloadTicketDictionary = weAreInitiator
1008 ? await CreateDownloadTickets(initiatorProvider!, currentUpdateOperation.InvolvedServers, cancellationToken)
1009 : updateRequest.DownloadTickets!;
1010
1011 var sourceNode = weAreInitiator
1012 ? swarmConfiguration.Identifier
1013 : updateRequest.SourceNode;
1014
1015 using var httpClient = httpClientFactory.CreateClient();
1016 using var transferSemaphore = new SemaphoreSlim(1);
1017
1018 bool anyFailed = false;
1019 var updateRequests = currentUpdateOperation
1021 .Where(node => !node.Controller)
1022 .Select(node =>
1023 {
1024 // only send the necessary ticket to each node from the controller
1025 Dictionary<string, FileTicketResponse>? localTicketDictionary;
1026 var nodeId = node.Identifier!;
1027 if (nodeId == sourceNode)
1028 localTicketDictionary = null;
1029 else if (!downloadTicketDictionary.TryGetValue(nodeId, out var ticket))
1030 {
1031 logger.LogError("Missing download ticket for node {missingNodeId}!", nodeId);
1032 anyFailed = true;
1033 return null;
1034 }
1035 else
1036 localTicketDictionary = new Dictionary<string, FileTicketResponse>
1037 {
1038 { nodeId, ticket },
1039 };
1040
1041 var request = new SwarmUpdateRequest
1042 {
1043 UpdateVersion = updateRequest.UpdateVersion,
1044 SourceNode = sourceNode,
1045 DownloadTickets = localTicketDictionary,
1046 };
1047
1048 return Tuple.Create(node, request);
1049 })
1050 .ToList();
1051
1052 if (anyFailed)
1053 return SwarmPrepareResult.Failure;
1054
1055 var tasks = updateRequests
1056 .Select(async tuple =>
1057 {
1058 var node = tuple!.Item1;
1059 var body = tuple.Item2;
1060
1061 using var request = PrepareSwarmRequest(
1062 node,
1063 HttpMethod.Put,
1065 body);
1066
1067 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1068 return response.IsSuccessStatusCode;
1069 })
1070 .ToList();
1071
1072 await Task.WhenAll(tasks);
1073
1074 // if all succeeds...
1075 if (tasks.All(x => x.Result))
1076 {
1077 logger.LogInformation("Distributed prepare for update to version {version} complete.", updateRequest.UpdateVersion);
1078 return weAreInitiator
1079 ? SwarmPrepareResult.SuccessHoldProviderUntilCommit
1080 : SwarmPrepareResult.SuccessProviderNotRequired;
1081 }
1082
1083 abortUpdate = true;
1084 logger.LogDebug("Distrubuted prepare failed!");
1085 }
1086 catch (Exception ex)
1087 {
1088 logger.LogWarning(ex, "Error remotely preparing updates!");
1089 }
1090 finally
1091 {
1092 if (abortUpdate)
1093 await AbortUpdate();
1094 }
1095
1096 return SwarmPrepareResult.Failure;
1097 }
1098
1106 async ValueTask<Dictionary<string, FileTicketResponse>> CreateDownloadTickets(
1107 ISeekableFileStreamProvider initiatorProvider,
1108 IReadOnlyCollection<SwarmServerInformation> involvedServers,
1109 CancellationToken cancellationToken)
1110 {
1111 // we need to ensure this thing is loaded before we start providing downloads or it'll create unnecessary delays
1112 var streamRetrievalTask = initiatorProvider.GetResult(cancellationToken);
1113
1114 var downloadProvider = new FileDownloadProvider(
1115 () => initiatorProvider.Disposed
1116 ? Api.Models.ErrorCode.ResourceNotPresent
1117 : null,
1118 async downloadToken => await initiatorProvider.GetOwnedResult(downloadToken), // let it throw if disposed, shouldn't happen regardless
1119 "<Swarm Update Package Provider>",
1120 false);
1121
1122 var serversRequiringTickets = involvedServers
1123 .Where(node => node.Identifier != swarmConfiguration.Identifier)
1124 .ToList();
1125
1126 logger.LogTrace("Creating {n} download tickets for other nodes...", serversRequiringTickets.Count);
1127
1128 var downloadTickets = new Dictionary<string, FileTicketResponse>(serversRequiringTickets.Count);
1129 foreach (var node in serversRequiringTickets)
1130 downloadTickets.Add(
1131 node.Identifier!,
1132 transferService.CreateDownload(downloadProvider));
1133
1134 await streamRetrievalTask;
1135 return downloadTickets;
1136 }
1137
1143 async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
1144 {
1145 using var httpClient = httpClientFactory.CreateClient();
1146
1147 List<SwarmServerInformation> currentSwarmServers;
1148 lock (swarmServers!)
1149 currentSwarmServers = swarmServers.ToList();
1150
1151 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1152 async ValueTask HealthRequestForServer(SwarmServerInformation swarmServer)
1153 {
1154 using var request = PrepareSwarmRequest(
1155 swarmServer,
1156 HttpMethod.Get,
1157 String.Empty,
1158 null);
1159
1160 try
1161 {
1162 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1163 response.EnsureSuccessStatusCode();
1164 return;
1165 }
1166 catch (Exception ex) when (ex is not OperationCanceledException)
1167 {
1168 logger.LogWarning(
1169 ex,
1170 "Error during swarm server health check on node '{nodeId}'! Unregistering...",
1171 swarmServer.Identifier);
1172 }
1173
1174 lock (swarmServers)
1175 {
1176 swarmServers.Remove(swarmServer);
1177 registrationIdsAndTimes.Remove(swarmServer.Identifier!);
1178 }
1179 }
1180
1182 currentSwarmServers
1183 .Where(node => !node.Controller
1184 && registrationIdsAndTimes.TryGetValue(node.Identifier!, out var registrationAndTime)
1185 && registrationAndTime.RegisteredAt.AddMinutes(SwarmConstants.ControllerHealthCheckIntervalMinutes) < DateTimeOffset.UtcNow)
1186 .Select(HealthRequestForServer));
1187
1188 lock (swarmServers)
1189 if (swarmServers.Count != currentSwarmServers.Count)
1190 MarkServersDirty();
1191
1192 if (serversDirty)
1193 await SendUpdatedServerListToNodes(cancellationToken);
1194 }
1195
1200 {
1201 serversDirty = true;
1202 if (TriggerHealthCheck())
1203 logger.LogTrace("Server list is dirty!");
1204 }
1205
1211 {
1212 var currentTcs = Interlocked.Exchange(ref forceHealthCheckTcs, new TaskCompletionSource());
1213 return currentTcs!.TrySetResult();
1214 }
1215
1221 async ValueTask HealthCheckController(CancellationToken cancellationToken)
1222 {
1223 using var httpClient = httpClientFactory.CreateClient();
1224
1225 if (controllerRegistration.HasValue)
1226 try
1227 {
1228 using var request = PrepareSwarmRequest(
1229 null,
1230 HttpMethod.Get,
1231 String.Empty,
1232 null);
1233 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1234 response.EnsureSuccessStatusCode();
1235 logger.LogTrace("Controller health check successful");
1236 return;
1237 }
1238 catch (Exception ex)
1239 {
1240 logger.LogWarning(ex, "Error during swarm controller health check! Attempting to re-register...");
1241 controllerRegistration = null;
1242 await AbortUpdate();
1243 }
1244
1245 SwarmRegistrationResult registrationResult;
1246 for (var registrationAttempt = 1UL; ; ++registrationAttempt)
1247 {
1248 logger.LogInformation("Swarm re-registration attempt {attemptNumber}...", registrationAttempt);
1249 registrationResult = await RegisterWithController(cancellationToken);
1250
1251 if (registrationResult == SwarmRegistrationResult.Success)
1252 return;
1253
1254 if (registrationResult == SwarmRegistrationResult.Unauthorized)
1255 {
1256 logger.LogError("Swarm re-registration failed, controller's private key has changed!");
1257 break;
1258 }
1259
1260 if (registrationResult == SwarmRegistrationResult.VersionMismatch)
1261 {
1262 logger.LogError("Swarm re-registration failed, controller's TGS version has changed!");
1263 break;
1264 }
1265
1266 await asyncDelayer.Delay(TimeSpan.FromSeconds(5), cancellationToken);
1267 }
1268
1269 // we could do something here... but what?
1270 // best to just let the health check loop keep retrying... we won't be able to update at least
1271 }
1272
1278 async ValueTask<SwarmRegistrationResult> RegisterWithController(CancellationToken cancellationToken)
1279 {
1280 logger.LogInformation("Attempting to register with swarm controller at {controllerAddress}...", swarmConfiguration.ControllerAddress);
1281 var requestedRegistrationId = Guid.NewGuid();
1282
1283 using var httpClient = httpClientFactory.CreateClient();
1284 using var registrationRequest = PrepareSwarmRequest(
1285 null,
1286 HttpMethod.Post,
1289 {
1290 Identifier = swarmConfiguration.Identifier,
1291 Address = swarmConfiguration.Address,
1292 PublicAddress = swarmConfiguration.PublicAddress,
1293 },
1294 requestedRegistrationId);
1295
1296 try
1297 {
1298 using var response = await httpClient.SendAsync(registrationRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
1299 if (response.IsSuccessStatusCode)
1300 {
1301 try
1302 {
1303 var json = await response.Content.ReadAsStringAsync(cancellationToken);
1304 if (json == null)
1305 {
1306 logger.LogDebug("Error reading registration response content stream! Text was null!");
1307 return SwarmRegistrationResult.PayloadFailure;
1308 }
1309
1310 var registrationResponse = JsonConvert.DeserializeObject<SwarmRegistrationResponse>(json);
1311 if (registrationResponse == null)
1312 {
1313 logger.LogDebug("Error reading registration response content stream! Payload was null!");
1314 return SwarmRegistrationResult.PayloadFailure;
1315 }
1316
1317 if (registrationResponse.TokenSigningKeyBase64 == null)
1318 {
1319 logger.LogDebug("Error reading registration response content stream! SigningKey was null!");
1320 return SwarmRegistrationResult.PayloadFailure;
1321 }
1322
1323 tokenFactory.SigningKeyBytes = Convert.FromBase64String(registrationResponse.TokenSigningKeyBase64);
1324 }
1325 catch (Exception ex)
1326 {
1327 logger.LogDebug(ex, "Error reading registration response content stream!");
1328 return SwarmRegistrationResult.PayloadFailure;
1329 }
1330
1331 logger.LogInformation("Sucessfully registered with ID {registrationId}", requestedRegistrationId);
1332 controllerRegistration = requestedRegistrationId;
1333 lastControllerHealthCheck = DateTimeOffset.UtcNow;
1334 return SwarmRegistrationResult.Success;
1335 }
1336
1337 logger.LogWarning("Unable to register with swarm: HTTP {statusCode}!", response.StatusCode);
1338
1339 if (response.StatusCode == HttpStatusCode.Unauthorized)
1340 return SwarmRegistrationResult.Unauthorized;
1341
1342 if (response.StatusCode == HttpStatusCode.UpgradeRequired)
1343 return SwarmRegistrationResult.VersionMismatch;
1344
1345 try
1346 {
1347 var responseData = await response.Content.ReadAsStringAsync(cancellationToken);
1348 if (!String.IsNullOrWhiteSpace(responseData))
1349 logger.LogDebug("Response:{newLine}{responseData}", Environment.NewLine, responseData);
1350 }
1351 catch (Exception ex)
1352 {
1353 logger.LogDebug(ex, "Error reading registration response content stream!");
1354 }
1355 }
1356 catch (Exception ex)
1357 {
1358 logger.LogWarning(ex, "Error sending registration request!");
1359 }
1360
1361 return SwarmRegistrationResult.CommunicationFailure;
1362 }
1363
1369 async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
1370 {
1371 List<SwarmServerInformation> currentSwarmServers;
1372 lock (swarmServers!)
1373 {
1374 serversDirty = false;
1375 currentSwarmServers = swarmServers.ToList();
1376 }
1377
1378 if (currentSwarmServers.Count == 1)
1379 {
1380 logger.LogTrace("Skipping server list broadcast as no nodes are connected!");
1381 return;
1382 }
1383
1384 logger.LogDebug("Sending updated server list to all {nodeCount} nodes...", currentSwarmServers.Count - 1);
1385
1386 using var httpClient = httpClientFactory.CreateClient();
1387 async ValueTask UpdateRequestForServer(SwarmServerInformation swarmServer)
1388 {
1389 using var request = PrepareSwarmRequest(
1390 swarmServer,
1391 HttpMethod.Post,
1392 String.Empty,
1394 {
1395 SwarmServers = currentSwarmServers,
1396 });
1397
1398 try
1399 {
1400 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1401 response.EnsureSuccessStatusCode();
1402 }
1403 catch (Exception ex) when (ex is not OperationCanceledException)
1404 {
1405 logger.LogWarning(ex, "Error during swarm server list update for node '{nodeId}'! Unregistering...", swarmServer.Identifier);
1406
1407 lock (swarmServers)
1408 {
1409 swarmServers.Remove(swarmServer);
1410 registrationIdsAndTimes!.Remove(swarmServer.Identifier!);
1411 }
1412 }
1413 }
1414
1416 currentSwarmServers
1417 .Where(x => !x.Controller)
1418 .Select(UpdateRequestForServer)
1419 .ToList());
1420 }
1421
1431 HttpRequestMessage PrepareSwarmRequest(
1432 SwarmServerInformation? swarmServer,
1433 HttpMethod httpMethod,
1434 string route,
1435 object? body,
1436 Guid? registrationIdOverride = null)
1437 {
1438 swarmServer ??= new SwarmServerInformation
1439 {
1440 Address = swarmConfiguration.ControllerAddress,
1441 };
1442
1443 var fullRoute = $"{SwarmConstants.ControllerRoute}/{route}";
1444 logger.LogTrace(
1445 "{method} {route} to swarm server {nodeIdOrAddress}",
1446 httpMethod,
1447 fullRoute,
1448 swarmServer.Identifier ?? swarmServer.Address!.ToString());
1449
1450 var request = new HttpRequestMessage(
1451 httpMethod,
1452 swarmServer.Address + fullRoute[1..]);
1453 try
1454 {
1455 request.Headers.Accept.Clear();
1456 request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Json));
1457
1458 request.Headers.Add(SwarmConstants.ApiKeyHeader, swarmConfiguration.PrivateKey);
1459 if (registrationIdOverride.HasValue)
1460 request.Headers.Add(SwarmConstants.RegistrationIdHeader, registrationIdOverride.Value.ToString());
1461 else if (swarmController)
1462 {
1463 lock (swarmServers!)
1464 if (registrationIdsAndTimes!.TryGetValue(swarmServer.Identifier!, out var registrationIdAndTime))
1465 request.Headers.Add(SwarmConstants.RegistrationIdHeader, registrationIdAndTime.RegistrationId.ToString());
1466 }
1467 else if (controllerRegistration.HasValue)
1468 request.Headers.Add(SwarmConstants.RegistrationIdHeader, controllerRegistration.Value.ToString());
1469
1470 if (body != null)
1471 request.Content = new StringContent(
1472 JsonConvert.SerializeObject(body, SwarmConstants.SerializerSettings),
1473 Encoding.UTF8,
1474 MediaTypeNames.Application.Json);
1475
1476 return request;
1477 }
1478 catch
1479 {
1480 request.Dispose();
1481 throw;
1482 }
1483 }
1484
1490 async Task HealthCheckLoop(CancellationToken cancellationToken)
1491 {
1492 logger.LogTrace("Starting HealthCheckLoop...");
1493 try
1494 {
1495 var nextForceHealthCheckTask = forceHealthCheckTcs!.Task;
1496 while (!cancellationToken.IsCancellationRequested)
1497 {
1498 TimeSpan delay;
1499 if (swarmController)
1500 delay = TimeSpan.FromMinutes(SwarmConstants.ControllerHealthCheckIntervalMinutes);
1501 else
1502 {
1503 delay = TimeSpan.FromMinutes(SwarmConstants.NodeHealthCheckIntervalMinutes);
1504 if (lastControllerHealthCheck.HasValue)
1505 {
1506 var recommendedTimeOfNextCheck = lastControllerHealthCheck.Value + delay;
1507
1508 if (recommendedTimeOfNextCheck > DateTimeOffset.UtcNow)
1509 delay = recommendedTimeOfNextCheck - DateTimeOffset.UtcNow;
1510 }
1511 }
1512
1513 var delayTask = asyncDelayer.Delay(
1514 delay,
1515 cancellationToken)
1516 .AsTask();
1517
1518 var awakeningTask = Task.WhenAny(
1519 delayTask,
1520 nextForceHealthCheckTask);
1521
1522 await awakeningTask;
1523
1524 if (nextForceHealthCheckTask.IsCompleted && swarmController)
1525 {
1526 // Intentionally wait a few seconds for the other server to start up before interogating it
1527 logger.LogTrace("Next health check triggering in {delaySeconds}s...", SwarmConstants.SecondsToDelayForcedHealthChecks);
1528 await asyncDelayer.Delay(TimeSpan.FromSeconds(SwarmConstants.SecondsToDelayForcedHealthChecks), cancellationToken);
1529 }
1530 else if (!swarmController && !nextForceHealthCheckTask.IsCompleted)
1531 {
1532 if (!lastControllerHealthCheck.HasValue)
1533 {
1534 logger.LogTrace("Not initially registered with controller, skipping health check.");
1535 continue; // unregistered
1536 }
1537
1538 if ((DateTimeOffset.UtcNow - lastControllerHealthCheck.Value).TotalMinutes < SwarmConstants.NodeHealthCheckIntervalMinutes)
1539 {
1540 logger.LogTrace("Controller seems to be active, skipping health check.");
1541 continue;
1542 }
1543 }
1544
1545 nextForceHealthCheckTask = forceHealthCheckTcs.Task;
1546
1547 logger.LogTrace("Performing swarm health check...");
1548 try
1549 {
1550 if (swarmController)
1551 await HealthCheckNodes(cancellationToken);
1552 else
1553 await HealthCheckController(cancellationToken);
1554 }
1555 catch (Exception ex) when (ex is not OperationCanceledException)
1556 {
1557 logger.LogError(ex, "Health check error!");
1558 }
1559 }
1560 }
1561 catch (OperationCanceledException ex)
1562 {
1563 logger.LogTrace(ex, "Health check loop cancelled!");
1564 }
1565
1566 logger.LogTrace("Stopped HealthCheckLoop");
1567 }
1568
1574 string? NodeIdentifierFromRegistration(Guid registrationId)
1575 {
1576 if (!swarmController)
1577 throw new InvalidOperationException("NodeIdentifierFromRegistration on node!");
1578
1579 lock (swarmServers!)
1580 {
1581 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1582 var exists = registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId);
1583 if (!exists)
1584 {
1585 logger.LogWarning("A node that was to be looked up ({registrationId}) disappeared from our records!", registrationId);
1586 return null;
1587 }
1588
1589 return registrationIdsAndTimes.First(x => x.Value.RegistrationId == registrationId).Key;
1590 }
1591 }
1592 }
1593}
Information about a server in the swarm.
virtual ? Uri PublicAddress
The address the swarm server can be publically accessed.
virtual ? Uri Address
The public address of the server.
string? Identifier
The server's identifier.
Represents information about a running SwarmServer.
Response for when file transfers are necessary.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Configuration for the server swarm system.
uint UpdateRequiredNodeCount
The number of nodes in addition to the controller required to be connected a server swarm before perf...
Uri? ControllerAddress
The SwarmServer.Address of the swarm controller. If null, the current server is considered the contro...
A IFileStreamProvider that represents the response of HttpRequestMessages.
Attribute for bringing in the master versions list from MSBuild that aren't embedded into assemblies ...
string RawSwarmProtocolVersion
The Version string of the TGS swarm protocol.
static MasterVersionsAttribute Instance
Return the Assembly's instance of the MasterVersionsAttribute.
Constants used by the swarm system.
const int ControllerHealthCheckIntervalMinutes
Interval at which the swarm controller makes health checks on nodes.
static JsonSerializerSettings SerializerSettings
See JsonSerializerSettings for the swarm system.
const string ApiKeyHeader
The header used to pass in the Configuration.SwarmConfiguration.PrivateKey.
const string UpdateRoute
The route used for swarm updates.
const string RegisterRoute
The route used for swarm registration.
const int NodeHealthCheckIntervalMinutes
Interval at which the node makes health checks on the controller if it has not received one.
const string RegistrationIdHeader
The header used to pass in swarm registration IDs.
const int UpdateCommitTimeoutMinutes
Number of minutes the controller waits to receive a ready-commit from all nodes before aborting an up...
const int SecondsToDelayForcedHealthChecks
Number of seconds between a health check global::System.Threading.Tasks.TaskCompletionSource triggeri...
A request to register with a swarm controller.
A request to update a nodes list of SwarmServers.
Helps keep servers connected to the same database in sync by coordinating updates.
volatile? TaskCompletionSource forceHealthCheckTcs
A TaskCompletionSource that is used to force a health check.
HttpRequestMessage PrepareSwarmRequest(SwarmServerInformation? swarmServer, HttpMethod httpMethod, string route, object? body, Guid? registrationIdOverride=null)
Prepares a HttpRequestMessage for swarm communication.
async ValueTask< SwarmPrepareResult > ControllerDistributedPrepareUpdate(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, SwarmUpdateOperation currentUpdateOperation, CancellationToken cancellationToken)
Send a given updateRequest out to nodes from the swarm controller.
void UpdateSwarmServersList(IEnumerable< SwarmServerInformation > swarmServers)
Pass in an updated list of swarmServers to the node.
string? NodeIdentifierFromRegistration(Guid registrationId)
Gets the SwarmServer.Identifier from a given registrationId .
readonly bool swarmController
If the current server is the swarm controller.
async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
Attempt to unregister a node with a given registrationId with the controller.A ValueTask representin...
bool ValidateRegistration(Guid registrationId)
Validate a given registrationId .true if the registration is valid, false otherwise.
readonly IFileTransferTicketProvider transferService
The IFileTransferTicketProvider for the SwarmService.
readonly? List< SwarmServerInformation > swarmServers
List<T> of connected SwarmServerInformations.
readonly IAsyncDelayer asyncDelayer
The IAsyncDelayer for the SwarmService.
ValueTask RemoteAbortUpdate()
Sends out remote abort update requests.
async ValueTask< SwarmCommitResult > CommitUpdate(CancellationToken cancellationToken)
Signal to the swarm that an update is ready to be applied.A ValueTask<TResult> resulting in the Swarm...
bool SwarmMode
If the swarm system is enabled.
async ValueTask< bool > RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
Notify the controller that the node with the given registrationId is ready to commit or notify the n...
SwarmService(IDatabaseContextFactory databaseContextFactory, IDatabaseSeeder databaseSeeder, IAssemblyInformationProvider assemblyInformationProvider, IAbstractHttpClientFactory httpClientFactory, IAsyncDelayer asyncDelayer, IServerUpdater serverUpdater, IFileTransferTicketProvider transferService, ITokenFactory tokenFactory, IOptions< SwarmConfiguration > swarmConfigurationOptions, ILogger< SwarmService > logger)
Initializes a new instance of the SwarmService class.
async ValueTask< SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
Attempt to register a given node with the controller.A ValueTask<TResult> resulting in a SwarmRegist...
bool ExpectedNumberOfNodesConnected
Gets a value indicating if the expected amount of nodes are connected to the swarm.
async Task HealthCheckLoop(CancellationToken cancellationToken)
Timed loop for calling HealthCheckNodes(CancellationToken).
async ValueTask< SwarmPrepareResult > PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Implementation of PrepareUpdate(ISeekableFileStreamProvider, Version, CancellationToken),...
async ValueTask< SwarmRegistrationResult > RegisterWithController(CancellationToken cancellationToken)
Attempt to register the node with the controller.
async ValueTask Shutdown(CancellationToken cancellationToken)
Deregister with the swarm controller or put clients into querying state.A ValueTask representing the ...
readonly IDatabaseContextFactory databaseContextFactory
The IDatabaseContextFactory for the SwarmService.
async ValueTask HealthCheckController(CancellationToken cancellationToken)
Ping the swarm controller to see that it is still running. If need be, reregister.
readonly ITokenFactory tokenFactory
The ITokenFactory for the SwarmService.
async ValueTask< bool > PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Notify the node of an update request from the controller.A ValueTask<TResult> resulting in true if th...
readonly? CancellationTokenSource serverHealthCheckCancellationTokenSource
The CancellationTokenSource for serverHealthCheckTask.
async ValueTask< SwarmRegistrationResult > Initialize(CancellationToken cancellationToken)
Attempt to register with the swarm controller if not one, sets up the database otherwise....
RequestFileStreamProvider CreateUpdateStreamProvider(SwarmServerInformation sourceNode, FileTicketResponse ticket)
Create the RequestFileStreamProvider for an update package retrieval from a given sourceNode .
readonly IAbstractHttpClientFactory httpClientFactory
The IAbstractHttpClientFactory for the SwarmService.
async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
Sends the controllers list of nodes to all nodes.
readonly? Dictionary< string,(Guid RegistrationId, DateTimeOffset RegisteredAt)> registrationIdsAndTimes
Dictionary<TKey, TValue> of SwarmServer.Identifiers to registration Guids and when they were created.
ValueTask< SwarmPrepareResult > PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
Signal to the swarm that an update is requested.A ValueTask<TResult> resulting in true if the update ...
async ValueTask< Dictionary< string, FileTicketResponse > > CreateDownloadTickets(ISeekableFileStreamProvider initiatorProvider, IReadOnlyCollection< SwarmServerInformation > involvedServers, CancellationToken cancellationToken)
Create a FileTicketResponse for downloading the content of a given initiatorProvider for the rest of...
void MarkServersDirty()
Set serversDirty and complete the current forceHealthCheckTcs.
bool serversDirty
If the swarmServers list has been updated and needs to be resent to clients.
readonly ILogger< SwarmService > logger
The ILogger for the SwarmService.
readonly SwarmConfiguration swarmConfiguration
The SwarmConfiguration for the SwarmService.
List< SwarmServerInformation >? GetSwarmServers()
Gets the list of SwarmServerInformations in the swarm, including the current one.A List<T> of SwarmSe...
readonly IAssemblyInformationProvider assemblyInformationProvider
The IAssemblyInformationProvider for the SwarmService.
Task? serverHealthCheckTask
The Task for the HealthCheckLoop(CancellationToken).
async ValueTask AbortUpdate()
Attempt to abort an uncommitted update.A ValueTask representing the running operation....
volatile? SwarmUpdateOperation updateOperation
A SwarmUpdateOperation that is currently in progress.
Guid? controllerRegistration
The registration Guid provided by the swarm controller.
DateTimeOffset? lastControllerHealthCheck
The last DateTimeOffset when the controller checked on this node.
readonly IServerUpdater serverUpdater
The IServerUpdater for the SwarmService.
readonly IDatabaseSeeder databaseSeeder
The IDatabaseSeeder for the SwarmService.
bool TriggerHealthCheck()
Complete the current forceHealthCheckTcs.
async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
Ping each node to see that they are still running.
Represents the state of a distributed swarm update.
Version TargetVersion
The Version being updated to.
IReadOnlyList< SwarmServerInformation > InvolvedServers
All of the SwarmServers that are involved in the updates.
A request to update the swarm's TGS version.
Dictionary< string, FileTicketResponse >? DownloadTickets
The map of Api.Models.Internal.SwarmServer.Identifiers to FileTicketResponses for retrieving the upda...
string? SourceNode
The Api.Models.Internal.SwarmServer.Identifier of the node to download the update package from.
Version? UpdateVersion
The TGS Version to update to.
Represents a file on disk to be downloaded.
IHttpClient CreateClient()
Create a IHttpClient.
ValueTask< ServerUpdateResult > BeginUpdate(ISwarmService swarmService, IFileStreamProvider? fileStreamProvider, Version version, CancellationToken cancellationToken)
Start the process of downloading and applying an update to a new server version .
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
For initially setting up a database.
ValueTask Downgrade(IDatabaseContext databaseContext, Version downgradeVersion, CancellationToken cancellationToken)
Migrate a given databaseContext down.
ValueTask Initialize(IDatabaseContext databaseContext, CancellationToken cancellationToken)
Setup up a given databaseContext .
ValueTask< Stream > GetResult(CancellationToken cancellationToken)
Gets the provided Stream. May be called multiple times, though cancelling any may cause all calls to ...
IFileStreamProvider that provides MemoryStreams.
bool Disposed
If the ISeekableFileStreamProvider has had global::System.IAsyncDisposable.DisposeAsync called on it.
ValueTask< MemoryStream > GetOwnedResult(CancellationToken cancellationToken)
Gets the provided MemoryStream. May be called multiple times, though cancelling any may cause all cal...
ReadOnlySpan< byte > SigningKeyBytes
Gets or sets the ITokenFactory's signing key bytes.
Swarm service operations for the Controllers.SwarmController.
Start and stop controllers for a swarm service.
Used for swarm operations. Functions may be no-op based on configuration.
Service for temporarily storing files to be downloaded or uploaded.
ValueTask Delay(TimeSpan timeSpan, CancellationToken cancellationToken)
Create a Task that completes after a given timeSpan .
ServerUpdateResult
The result of a call to start a server update.
SwarmUpdateAbortResult
Result of attempting to abort a SwarmUpdateOperation.
SwarmCommitResult
How to proceed on the commit step of an update.
@ AbortUpdate
The update should be aborted.
SwarmRegistrationResult
Result of attempting to register with a swarm controller.
SwarmPrepareResult
Indicates the result of a swarm update prepare operation.