2using System.Collections.Generic;
3using System.Diagnostics.CodeAnalysis;
7using System.Net.Http.Headers;
10using System.Threading;
11using System.Threading.Tasks;
14using Microsoft.Extensions.Logging;
15using Microsoft.Extensions.Options;
38#pragma warning disable CA1506
40#pragma warning restore CA1506
59 bool SwarmMode => swarmConfiguration.PrivateKey !=
null;
183 IOptions<SwarmConfiguration> swarmConfigurationOptions,
184 ILogger<SwarmService>
logger)
194 swarmConfiguration = swarmConfigurationOptions?.Value ??
throw new ArgumentNullException(nameof(swarmConfigurationOptions));
195 this.logger =
logger ??
throw new ArgumentNullException(nameof(
logger));
200 throw new InvalidOperationException(
"Swarm configuration missing Address!");
203 throw new InvalidOperationException(
"Swarm configuration missing Identifier!");
236 var localUpdateOperation = Interlocked.Exchange(ref
updateOperation,
null);
237 var abortResult = localUpdateOperation?.Abort();
243 logger.LogDebug(
"Another context already aborted this update.");
246 logger.LogDebug(
"Not aborting update because we have committed!");
249 logger.LogTrace(
"Attempted update abort but no operation was found!");
252 throw new InvalidOperationException($
"Invalid return value for SwarmUpdateOperation.Abort(): {abortResult}");
259 public async ValueTask<SwarmCommitResult>
CommitUpdate(CancellationToken cancellationToken)
266 if (localUpdateOperation ==
null)
268 logger.LogDebug(
"Update commit failed, no pending operation!");
273 logger.LogInformation(
"Waiting to commit update...");
278 logger.LogTrace(
"Sending ready-commit to swarm controller...");
287 using var commitReadyResponse = await httpClient.SendAsync(commitReadyRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
288 commitReadyResponse.EnsureSuccessStatusCode();
292 logger.LogWarning(ex,
"Unable to send ready-commit to swarm controller!");
301 cancellationToken).AsTask()
302 : Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
304 var commitTask = Task.WhenAny(localUpdateOperation.CommitGate, timeoutTask);
308 var commitGoAhead = localUpdateOperation.CommitGate.IsCompleted
309 && localUpdateOperation.CommitGate.Result
314 "Update commit failed!{maybeTimeout}",
315 timeoutTask.IsCompleted
322 logger.LogTrace(
"Update commit task complete");
330 logger.LogDebug(
"Sending remote commit message to nodes...");
343 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
344 response.EnsureSuccessStatusCode();
348 logger.LogCritical(ex,
"Failed to send update commit request to node {nodeId}!", swarmServer.
Identifier);
356 .Where(x => !x.Controller)
357 .Select(SendRemoteCommitUpdate)
377 ArgumentNullException.ThrowIfNull(fileStreamProvider);
379 ArgumentNullException.ThrowIfNull(version);
381 logger.LogTrace(
"Begin PrepareUpdate...");
386 UpdateVersion = version,
394 ArgumentNullException.ThrowIfNull(updateRequest);
396 logger.LogInformation(
"Received remote update request from {nodeType}", !
swarmController ?
"controller" :
"node");
406 public async ValueTask<SwarmRegistrationResult>
Initialize(CancellationToken cancellationToken)
410 "Swarm mode enabled: {nodeType} {nodeId}",
416 logger.LogTrace(
"Swarm mode disabled");
439 public async ValueTask
Shutdown(CancellationToken cancellationToken)
441 logger.LogTrace(
"Begin Shutdown");
454 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
455 response.EnsureSuccessStatusCode();
461 "Error unregistering {nodeType}!",
463 ? $
"node {swarmServer.Identifier}"
464 :
"from controller");
478 logger.LogInformation(
"Unregistering from swarm controller...");
479 await SendUnregistrationRequest(
null);
498 logger.LogInformation(
"Unregistering nodes...");
504 .Where(x => !x.Controller)
505 .Select(SendUnregistrationRequest)
514 logger.LogTrace(
"Swarm controller shutdown");
524 throw new InvalidOperationException(
"Swarm mode not enabled!");
527 throw new InvalidOperationException(
"Cannot UpdateSwarmServersList on swarm controller!");
529 lock (this.swarmServers)
531 this.swarmServers.Clear();
533 logger.LogDebug(
"Updated swarm server list with {nodeCount} total nodes", this.swarmServers.Count);
541 throw new InvalidOperationException(
"Swarm mode not enabled!");
555 public async ValueTask<SwarmRegistrationResponse?>
RegisterNode(
SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
557 ArgumentNullException.ThrowIfNull(node);
560 throw new ArgumentException(
"Node missing Identifier!", nameof(node));
563 throw new ArgumentException(
"Node missing Address!", nameof(node));
566 throw new InvalidOperationException(
"Swarm mode not enabled!");
569 throw new InvalidOperationException(
"Cannot RegisterNode on swarm node!");
571 logger.LogTrace(
"RegisterNode");
585 var preExistingRegistrationKvp =
registrationIdsAndTimes.FirstOrDefault(x => x.Value.RegistrationId == registrationId);
586 if (preExistingRegistrationKvp.Key == node.
Identifier)
588 logger.LogWarning(
"Node {nodeId} has already registered!", node.
Identifier);
589 return CreateResponse();
593 "Registration ID collision! Node {nodeId} tried to register with {otherNodeId}'s registration ID: {registrationId}",
595 preExistingRegistrationKvp.Key,
602 logger.LogInformation(
"Node {nodeId} is re-registering without first unregistering. Indicative of restart.", node.
Identifier);
617 logger.LogInformation(
"Registered node {nodeId} ({nodeIP}) with ID {registrationId}", node.
Identifier, node.
Address, registrationId);
619 return CreateResponse();
628 logger.LogDebug(
"Received remote commit go ahead");
629 return localUpdateOperation?.
Commit() ==
true;
633 if (nodeIdentifier ==
null)
636 logger.LogError(
"Aborting update due to unforseen circumstances!");
641 if (localUpdateOperation ==
null)
643 logger.LogDebug(
"Ignoring ready-commit from node {nodeId} as the update appears to have been aborted.", nodeIdentifier);
647 if (!localUpdateOperation.MarkNodeReady(nodeIdentifier))
650 "Attempting to mark {nodeId} as ready to commit resulted in the update being aborted!",
655 if (Interlocked.CompareExchange(ref
updateOperation,
null, localUpdateOperation) == localUpdateOperation)
660 logger.LogError(
"Aborting new update due to unforseen consequences!");
667 logger.LogDebug(
"Node {nodeId} is ready to commit.", nodeIdentifier);
672 public async ValueTask
UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
675 throw new InvalidOperationException(
"Swarm mode not enabled!");
677 logger.LogTrace(
"UnregisterNode {registrationId}", registrationId);
683 logger.LogInformation(
"Controller unregistering, will attempt re-registration...");
690 if (nodeIdentifier ==
null)
693 logger.LogInformation(
"Unregistering node {nodeId}...", nodeIdentifier);
697 swarmServers.RemoveAll(x => x.Identifier == nodeIdentifier);
711 logger.LogInformation(
"Aborting swarm update!");
725 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
726 response.EnsureSuccessStatusCode();
732 "Unable to send remote abort to {nodeOrController}!",
734 ? $
"node {swarmServer.Identifier}"
748 .Where(x => !x.Controller)
749 .Select(SendRemoteAbort)
767 $
"{SwarmConstants.UpdateRoute}?ticket={HttpUtility.UrlEncode(ticket.FileTicket)}",
772 request.Headers.Accept.Add(
new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Octet));
783 httpClient.Dispose();
805 var initiator = initiatorProvider !=
null;
806 logger.LogTrace(
"PrepareUpdateImpl {version}...", version);
808 var shouldAbort =
false;
813 List<SwarmServerInformation> currentNodes;
819 if (node.Identifier == updateRequest.
SourceNode)
833 var existingUpdateOperation = Interlocked.CompareExchange(ref
updateOperation, localUpdateOperation,
null);
834 if (existingUpdateOperation !=
null && existingUpdateOperation.TargetVersion != version)
836 logger.LogWarning(
"Aborting update preparation, version {targetUpdateVersion} already prepared!", existingUpdateOperation.TargetVersion);
841 if (existingUpdateOperation?.TargetVersion == version)
843 logger.LogTrace(
"PrepareUpdateImpl early out, already prepared!");
849 var downloadTickets = await
CreateDownloadTickets(initiatorProvider!, currentNodes, cancellationToken);
851 logger.LogInformation(
"Forwarding update request to swarm controller...");
859 UpdateVersion = version,
860 SourceNode = swarmConfiguration.Identifier,
861 DownloadTickets = downloadTickets,
865 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
866 if (response.IsSuccessStatusCode)
875 logger.LogTrace(
"Beginning local update process...");
877 if (sourceNode ==
null)
883 "Missing local node entry for update source node: {sourceNode}",
891 logger.LogError(
"Missing download tickets in update request!");
901 "Missing node entry for download ticket in update request!");
918 await downloaderStream.DisposeAsync();
924 logger.LogWarning(
"Failed to prepare update! Result: {serverUpdateResult}", updateApplyResult);
930 logger.LogTrace(
"No need to re-initiate update as it originated here on the swarm controller");
932 logger.LogDebug(
"Local node prepared for update to version {version}", version);
936 logger.LogWarning(ex,
"Failed to prepare update!");
952 localUpdateOperation,
968 CancellationToken cancellationToken)
970 bool abortUpdate =
false;
973 logger.LogInformation(
"Sending remote prepare to nodes...");
978 "Aborting update, controller expects to be in sync with {requiredNodeCount} nodes but currently only has {currentNodeCount}!",
985 var weAreInitiator = initiatorProvider !=
null;
988 logger.LogDebug(
"Controller has no nodes, setting commit-ready.");
992 logger.LogDebug(
"Update appears to have been aborted");
1000 "Aborting update, {receivedTickets} download tickets were provided but there are {nodesToUpdate} nodes in the swarm that require the package!",
1007 var downloadTicketDictionary = weAreInitiator
1011 var sourceNode = weAreInitiator
1012 ? swarmConfiguration.Identifier
1016 using var transferSemaphore =
new SemaphoreSlim(1);
1018 bool anyFailed =
false;
1019 var updateRequests = currentUpdateOperation
1021 .Where(node => !node.Controller)
1025 Dictionary<string, FileTicketResponse>? localTicketDictionary;
1026 var nodeId = node.Identifier!;
1027 if (nodeId == sourceNode)
1028 localTicketDictionary =
null;
1029 else if (!downloadTicketDictionary.TryGetValue(nodeId, out var ticket))
1031 logger.LogError(
"Missing download ticket for node {missingNodeId}!", nodeId);
1036 localTicketDictionary =
new Dictionary<string, FileTicketResponse>
1044 SourceNode = sourceNode,
1045 DownloadTickets = localTicketDictionary,
1048 return Tuple.Create(node, request);
1055 var tasks = updateRequests
1056 .Select(async tuple =>
1058 var node = tuple!.Item1;
1059 var body = tuple.Item2;
1067 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1068 return response.IsSuccessStatusCode;
1072 await Task.WhenAll(tasks);
1075 if (tasks.All(x => x.Result))
1077 logger.LogInformation(
"Distributed prepare for update to version {version} complete.", updateRequest.
UpdateVersion);
1078 return weAreInitiator
1079 ? SwarmPrepareResult.SuccessHoldProviderUntilCommit
1084 logger.LogDebug(
"Distrubuted prepare failed!");
1088 logger.LogWarning(ex,
"Error remotely preparing updates!");
1108 IReadOnlyCollection<SwarmServerInformation> involvedServers,
1109 CancellationToken cancellationToken)
1112 var streamRetrievalTask = initiatorProvider.
GetResult(cancellationToken);
1116 ? Api.Models.ErrorCode.ResourceNotPresent
1118 async downloadToken => await initiatorProvider.
GetOwnedResult(downloadToken),
1119 "<Swarm Update Package Provider>",
1122 var serversRequiringTickets = involvedServers
1123 .Where(node => node.Identifier != swarmConfiguration.Identifier)
1126 logger.LogTrace(
"Creating {n} download tickets for other nodes...", serversRequiringTickets.Count);
1128 var downloadTickets =
new Dictionary<string, FileTicketResponse>(serversRequiringTickets.Count);
1129 foreach (var node
in serversRequiringTickets)
1130 downloadTickets.Add(
1132 transferService.CreateDownload(downloadProvider));
1134 await streamRetrievalTask;
1135 return downloadTickets;
1145 using var httpClient = httpClientFactory.CreateClient();
1147 List<SwarmServerInformation> currentSwarmServers;
1148 lock (swarmServers!)
1149 currentSwarmServers = swarmServers.ToList();
1151 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1154 using var request = PrepareSwarmRequest(
1162 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1163 response.EnsureSuccessStatusCode();
1166 catch (
Exception ex) when (ex is not OperationCanceledException)
1170 "Error during swarm server health check on node '{nodeId}'! Unregistering...",
1176 swarmServers.Remove(swarmServer);
1177 registrationIdsAndTimes.Remove(swarmServer.
Identifier!);
1183 .Where(node => !node.Controller
1184 && registrationIdsAndTimes.TryGetValue(node.Identifier!, out var registrationAndTime)
1186 .Select(HealthRequestForServer));
1189 if (swarmServers.Count != currentSwarmServers.Count)
1193 await SendUpdatedServerListToNodes(cancellationToken);
1201 serversDirty =
true;
1202 if (TriggerHealthCheck())
1203 logger.LogTrace(
"Server list is dirty!");
1212 var currentTcs = Interlocked.Exchange(ref forceHealthCheckTcs,
new TaskCompletionSource());
1213 return currentTcs!.TrySetResult();
1223 using var httpClient = httpClientFactory.CreateClient();
1225 if (controllerRegistration.HasValue)
1228 using var request = PrepareSwarmRequest(
1233 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1234 response.EnsureSuccessStatusCode();
1235 logger.LogTrace(
"Controller health check successful");
1240 logger.LogWarning(ex,
"Error during swarm controller health check! Attempting to re-register...");
1241 controllerRegistration =
null;
1246 for (var registrationAttempt = 1UL; ; ++registrationAttempt)
1248 logger.LogInformation(
"Swarm re-registration attempt {attemptNumber}...", registrationAttempt);
1249 registrationResult = await RegisterWithController(cancellationToken);
1256 logger.LogError(
"Swarm re-registration failed, controller's private key has changed!");
1262 logger.LogError(
"Swarm re-registration failed, controller's TGS version has changed!");
1266 await asyncDelayer.Delay(TimeSpan.FromSeconds(5), cancellationToken);
1280 logger.LogInformation(
"Attempting to register with swarm controller at {controllerAddress}...", swarmConfiguration.ControllerAddress);
1281 var requestedRegistrationId = Guid.NewGuid();
1283 using var httpClient = httpClientFactory.CreateClient();
1284 using var registrationRequest = PrepareSwarmRequest(
1290 Identifier = swarmConfiguration.Identifier,
1291 Address = swarmConfiguration.Address,
1292 PublicAddress = swarmConfiguration.PublicAddress,
1294 requestedRegistrationId);
1298 using var response = await httpClient.SendAsync(registrationRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
1299 if (response.IsSuccessStatusCode)
1303 var json = await response.Content.ReadAsStringAsync(cancellationToken);
1306 logger.LogDebug(
"Error reading registration response content stream! Text was null!");
1311 if (registrationResponse ==
null)
1313 logger.LogDebug(
"Error reading registration response content stream! Payload was null!");
1317 if (registrationResponse.TokenSigningKeyBase64 ==
null)
1319 logger.LogDebug(
"Error reading registration response content stream! SigningKey was null!");
1323 tokenFactory.SigningKeyBytes = Convert.FromBase64String(registrationResponse.TokenSigningKeyBase64);
1327 logger.LogDebug(ex,
"Error reading registration response content stream!");
1331 logger.LogInformation(
"Sucessfully registered with ID {registrationId}", requestedRegistrationId);
1332 controllerRegistration = requestedRegistrationId;
1333 lastControllerHealthCheck = DateTimeOffset.UtcNow;
1337 logger.LogWarning(
"Unable to register with swarm: HTTP {statusCode}!", response.StatusCode);
1339 if (response.StatusCode == HttpStatusCode.Unauthorized)
1342 if (response.StatusCode == HttpStatusCode.UpgradeRequired)
1347 var responseData = await response.Content.ReadAsStringAsync(cancellationToken);
1348 if (!String.IsNullOrWhiteSpace(responseData))
1349 logger.LogDebug(
"Response:{newLine}{responseData}", Environment.NewLine, responseData);
1353 logger.LogDebug(ex,
"Error reading registration response content stream!");
1358 logger.LogWarning(ex,
"Error sending registration request!");
1371 List<SwarmServerInformation> currentSwarmServers;
1372 lock (swarmServers!)
1374 serversDirty =
false;
1375 currentSwarmServers = swarmServers.ToList();
1378 if (currentSwarmServers.Count == 1)
1380 logger.LogTrace(
"Skipping server list broadcast as no nodes are connected!");
1384 logger.LogDebug(
"Sending updated server list to all {nodeCount} nodes...", currentSwarmServers.Count - 1);
1386 using var httpClient = httpClientFactory.CreateClient();
1389 using var request = PrepareSwarmRequest(
1395 SwarmServers = currentSwarmServers,
1400 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1401 response.EnsureSuccessStatusCode();
1403 catch (
Exception ex) when (ex is not OperationCanceledException)
1405 logger.LogWarning(ex,
"Error during swarm server list update for node '{nodeId}'! Unregistering...", swarmServer.
Identifier);
1409 swarmServers.Remove(swarmServer);
1410 registrationIdsAndTimes!.Remove(swarmServer.
Identifier!);
1417 .Where(x => !x.Controller)
1418 .Select(UpdateRequestForServer)
1433 HttpMethod httpMethod,
1436 Guid? registrationIdOverride =
null)
1440 Address = swarmConfiguration.ControllerAddress,
1443 var fullRoute = $
"{SwarmConstants.ControllerRoute}/{route}";
1445 "{method} {route} to swarm server {nodeIdOrAddress}",
1450 var request =
new HttpRequestMessage(
1452 swarmServer.
Address + fullRoute[1..]);
1455 request.Headers.Accept.Clear();
1456 request.Headers.Accept.Add(
new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Json));
1459 if (registrationIdOverride.HasValue)
1461 else if (swarmController)
1463 lock (swarmServers!)
1464 if (registrationIdsAndTimes!.TryGetValue(swarmServer.
Identifier!, out var registrationIdAndTime))
1467 else if (controllerRegistration.HasValue)
1471 request.Content =
new StringContent(
1474 MediaTypeNames.Application.Json);
1492 logger.LogTrace(
"Starting HealthCheckLoop...");
1495 var nextForceHealthCheckTask = forceHealthCheckTcs!.Task;
1496 while (!cancellationToken.IsCancellationRequested)
1499 if (swarmController)
1504 if (lastControllerHealthCheck.HasValue)
1506 var recommendedTimeOfNextCheck = lastControllerHealthCheck.Value + delay;
1508 if (recommendedTimeOfNextCheck > DateTimeOffset.UtcNow)
1509 delay = recommendedTimeOfNextCheck - DateTimeOffset.UtcNow;
1513 var delayTask = asyncDelayer.Delay(
1518 var awakeningTask = Task.WhenAny(
1520 nextForceHealthCheckTask);
1522 await awakeningTask;
1524 if (nextForceHealthCheckTask.IsCompleted && swarmController)
1530 else if (!swarmController && !nextForceHealthCheckTask.IsCompleted)
1532 if (!lastControllerHealthCheck.HasValue)
1534 logger.LogTrace(
"Not initially registered with controller, skipping health check.");
1540 logger.LogTrace(
"Controller seems to be active, skipping health check.");
1545 nextForceHealthCheckTask = forceHealthCheckTcs.Task;
1547 logger.LogTrace(
"Performing swarm health check...");
1550 if (swarmController)
1551 await HealthCheckNodes(cancellationToken);
1553 await HealthCheckController(cancellationToken);
1555 catch (
Exception ex) when (ex is not OperationCanceledException)
1557 logger.LogError(ex,
"Health check error!");
1561 catch (OperationCanceledException ex)
1563 logger.LogTrace(ex,
"Health check loop cancelled!");
1566 logger.LogTrace(
"Stopped HealthCheckLoop");
1576 if (!swarmController)
1577 throw new InvalidOperationException(
"NodeIdentifierFromRegistration on node!");
1579 lock (swarmServers!)
1581 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1582 var exists = registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId);
1585 logger.LogWarning(
"A node that was to be looked up ({registrationId}) disappeared from our records!", registrationId);
1589 return registrationIdsAndTimes.First(x => x.Value.RegistrationId == registrationId).Key;
Information about a server in the swarm.
virtual ? Uri PublicAddress
The address the swarm server can be publically accessed.
virtual ? Uri Address
The public address of the server.
string? Identifier
The server's identifier.
Response for when file transfers are necessary.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Configuration for the server swarm system.
uint UpdateRequiredNodeCount
The number of nodes in addition to the controller required to be connected a server swarm before perf...
Uri? ControllerAddress
The SwarmServer.Address of the swarm controller. If null, the current server is considered the contro...
override? Uri PublicAddress
A IFileStreamProvider that represents the response of HttpRequestMessages.
Attribute for bringing in the master versions list from MSBuild that aren't embedded into assemblies ...
string RawSwarmProtocolVersion
The Version string of the TGS swarm protocol.
static MasterVersionsAttribute Instance
Return the Assembly's instance of the MasterVersionsAttribute.
Constants used by the swarm system.
const int ControllerHealthCheckIntervalMinutes
Interval at which the swarm controller makes health checks on nodes.
static JsonSerializerSettings SerializerSettings
See JsonSerializerSettings for the swarm system.
const string ApiKeyHeader
The header used to pass in the Configuration.SwarmConfiguration.PrivateKey.
const string UpdateRoute
The route used for swarm updates.
const string RegisterRoute
The route used for swarm registration.
const int NodeHealthCheckIntervalMinutes
Interval at which the node makes health checks on the controller if it has not received one.
const string RegistrationIdHeader
The header used to pass in swarm registration IDs.
const int UpdateCommitTimeoutMinutes
Number of minutes the controller waits to receive a ready-commit from all nodes before aborting an up...
const int SecondsToDelayForcedHealthChecks
Number of seconds between a health check global::System.Threading.Tasks.TaskCompletionSource triggeri...
A request to register with a swarm controller.
Response for a SwarmRegistrationRequest.
A request to update a nodes list of SwarmServers.
Helps keep servers connected to the same database in sync by coordinating updates.
volatile? TaskCompletionSource forceHealthCheckTcs
A TaskCompletionSource that is used to force a health check.
HttpRequestMessage PrepareSwarmRequest(SwarmServerInformation? swarmServer, HttpMethod httpMethod, string route, object? body, Guid? registrationIdOverride=null)
Prepares a HttpRequestMessage for swarm communication.
async ValueTask< SwarmPrepareResult > ControllerDistributedPrepareUpdate(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, SwarmUpdateOperation currentUpdateOperation, CancellationToken cancellationToken)
Send a given updateRequest out to nodes from the swarm controller.
void UpdateSwarmServersList(IEnumerable< SwarmServerInformation > swarmServers)
Pass in an updated list of swarmServers to the node.
string? NodeIdentifierFromRegistration(Guid registrationId)
Gets the SwarmServer.Identifier from a given registrationId .
readonly bool swarmController
If the current server is the swarm controller.
async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
Attempt to unregister a node with a given registrationId with the controller.A ValueTask representin...
bool ValidateRegistration(Guid registrationId)
Validate a given registrationId .true if the registration is valid, false otherwise.
readonly IFileTransferTicketProvider transferService
The IFileTransferTicketProvider for the SwarmService.
readonly? List< SwarmServerInformation > swarmServers
List<T> of connected SwarmServerInformations.
readonly IAsyncDelayer asyncDelayer
The IAsyncDelayer for the SwarmService.
ValueTask RemoteAbortUpdate()
Sends out remote abort update requests.
async ValueTask< SwarmCommitResult > CommitUpdate(CancellationToken cancellationToken)
Signal to the swarm that an update is ready to be applied.A ValueTask<TResult> resulting in the Swarm...
bool SwarmMode
If the swarm system is enabled.
async ValueTask< bool > RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
Notify the controller that the node with the given registrationId is ready to commit or notify the n...
SwarmService(IDatabaseContextFactory databaseContextFactory, IDatabaseSeeder databaseSeeder, IAssemblyInformationProvider assemblyInformationProvider, IAbstractHttpClientFactory httpClientFactory, IAsyncDelayer asyncDelayer, IServerUpdater serverUpdater, IFileTransferTicketProvider transferService, ITokenFactory tokenFactory, IOptions< SwarmConfiguration > swarmConfigurationOptions, ILogger< SwarmService > logger)
Initializes a new instance of the SwarmService class.
async ValueTask< SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
Attempt to register a given node with the controller.A ValueTask<TResult> resulting in a SwarmRegist...
bool ExpectedNumberOfNodesConnected
Gets a value indicating if the expected amount of nodes are connected to the swarm.
async Task HealthCheckLoop(CancellationToken cancellationToken)
Timed loop for calling HealthCheckNodes(CancellationToken).
async ValueTask< SwarmPrepareResult > PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Implementation of PrepareUpdate(ISeekableFileStreamProvider, Version, CancellationToken),...
async ValueTask< SwarmRegistrationResult > RegisterWithController(CancellationToken cancellationToken)
Attempt to register the node with the controller.
async ValueTask Shutdown(CancellationToken cancellationToken)
Deregister with the swarm controller or put clients into querying state.A ValueTask representing the ...
readonly IDatabaseContextFactory databaseContextFactory
The IDatabaseContextFactory for the SwarmService.
async ValueTask HealthCheckController(CancellationToken cancellationToken)
Ping the swarm controller to see that it is still running. If need be, reregister.
readonly ITokenFactory tokenFactory
The ITokenFactory for the SwarmService.
async ValueTask< bool > PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Notify the node of an update request from the controller.A ValueTask<TResult> resulting in true if th...
readonly? CancellationTokenSource serverHealthCheckCancellationTokenSource
The CancellationTokenSource for serverHealthCheckTask.
async ValueTask< SwarmRegistrationResult > Initialize(CancellationToken cancellationToken)
Attempt to register with the swarm controller if not one, sets up the database otherwise....
RequestFileStreamProvider CreateUpdateStreamProvider(SwarmServerInformation sourceNode, FileTicketResponse ticket)
Create the RequestFileStreamProvider for an update package retrieval from a given sourceNode .
readonly IAbstractHttpClientFactory httpClientFactory
The IAbstractHttpClientFactory for the SwarmService.
async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
Sends the controllers list of nodes to all nodes.
readonly? Dictionary< string,(Guid RegistrationId, DateTimeOffset RegisteredAt)> registrationIdsAndTimes
Dictionary<TKey, TValue> of SwarmServer.Identifiers to registration Guids and when they were created.
ValueTask< SwarmPrepareResult > PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
Signal to the swarm that an update is requested.A ValueTask<TResult> resulting in true if the update ...
async ValueTask< Dictionary< string, FileTicketResponse > > CreateDownloadTickets(ISeekableFileStreamProvider initiatorProvider, IReadOnlyCollection< SwarmServerInformation > involvedServers, CancellationToken cancellationToken)
Create a FileTicketResponse for downloading the content of a given initiatorProvider for the rest of...
void MarkServersDirty()
Set serversDirty and complete the current forceHealthCheckTcs.
bool serversDirty
If the swarmServers list has been updated and needs to be resent to clients.
readonly ILogger< SwarmService > logger
The ILogger for the SwarmService.
readonly SwarmConfiguration swarmConfiguration
The SwarmConfiguration for the SwarmService.
List< SwarmServerInformation >? GetSwarmServers()
Gets the list of SwarmServerInformations in the swarm, including the current one.A List<T> of SwarmSe...
readonly IAssemblyInformationProvider assemblyInformationProvider
The IAssemblyInformationProvider for the SwarmService.
Task? serverHealthCheckTask
The Task for the HealthCheckLoop(CancellationToken).
async ValueTask AbortUpdate()
Attempt to abort an uncommitted update.A ValueTask representing the running operation....
volatile? SwarmUpdateOperation updateOperation
A SwarmUpdateOperation that is currently in progress.
Guid? controllerRegistration
The registration Guid provided by the swarm controller.
DateTimeOffset? lastControllerHealthCheck
The last DateTimeOffset when the controller checked on this node.
readonly IServerUpdater serverUpdater
The IServerUpdater for the SwarmService.
readonly IDatabaseSeeder databaseSeeder
The IDatabaseSeeder for the SwarmService.
bool TriggerHealthCheck()
Complete the current forceHealthCheckTcs.
async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
Ping each node to see that they are still running.
Represents the state of a distributed swarm update.
Version TargetVersion
The Version being updated to.
IReadOnlyList< SwarmServerInformation > InvolvedServers
All of the SwarmServers that are involved in the updates.
bool Commit()
Attempt to commit the update.
A request to update the swarm's TGS version.
Dictionary< string, FileTicketResponse >? DownloadTickets
The map of Api.Models.Internal.SwarmServer.Identifiers to FileTicketResponses for retrieving the upda...
string? SourceNode
The Api.Models.Internal.SwarmServer.Identifier of the node to download the update package from.
Version? UpdateVersion
The TGS Version to update to.
Represents a file on disk to be downloaded.
IHttpClient CreateClient()
Create a IHttpClient.
ValueTask< ServerUpdateResult > BeginUpdate(ISwarmService swarmService, IFileStreamProvider? fileStreamProvider, Version version, CancellationToken cancellationToken)
Start the process of downloading and applying an update to a new server version .
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
For initially setting up a database.
ValueTask Downgrade(IDatabaseContext databaseContext, Version downgradeVersion, CancellationToken cancellationToken)
Migrate a given databaseContext down.
ValueTask Initialize(IDatabaseContext databaseContext, CancellationToken cancellationToken)
Setup up a given databaseContext .
ValueTask< Stream > GetResult(CancellationToken cancellationToken)
Gets the provided Stream. May be called multiple times, though cancelling any may cause all calls to ...
IFileStreamProvider that provides MemoryStreams.
bool Disposed
If the ISeekableFileStreamProvider has had global::System.IAsyncDisposable.DisposeAsync called on it.
ValueTask< MemoryStream > GetOwnedResult(CancellationToken cancellationToken)
Gets the provided MemoryStream. May be called multiple times, though cancelling any may cause all cal...
For creating TokenResponses.
ReadOnlySpan< byte > SigningKeyBytes
Gets or sets the ITokenFactory's signing key bytes.
Swarm service operations for the Controllers.SwarmController.
Start and stop controllers for a swarm service.
Used for swarm operations. Functions may be no-op based on configuration.
Service for temporarily storing files to be downloaded or uploaded.
For waiting asynchronously.
ValueTask Delay(TimeSpan timeSpan, CancellationToken cancellationToken)
Create a Task that completes after a given timeSpan .
ServerUpdateResult
The result of a call to start a server update.
SwarmUpdateAbortResult
Result of attempting to abort a SwarmUpdateOperation.
SwarmCommitResult
How to proceed on the commit step of an update.
@ AbortUpdate
The update should be aborted.
SwarmRegistrationResult
Result of attempting to register with a swarm controller.
SwarmPrepareResult
Indicates the result of a swarm update prepare operation.