2using System.Collections.Generic;
3using System.Diagnostics.CodeAnalysis;
7using System.Net.Http.Headers;
10using System.Threading;
11using System.Threading.Tasks;
14using Microsoft.Extensions.Logging;
15using Microsoft.Extensions.Options;
37#pragma warning disable CA1506
39#pragma warning restore CA1506
58 bool SwarmMode => swarmConfiguration.PrivateKey !=
null;
182 IOptions<SwarmConfiguration> swarmConfigurationOptions,
183 ILogger<SwarmService>
logger)
193 swarmConfiguration = swarmConfigurationOptions?.Value ??
throw new ArgumentNullException(nameof(swarmConfigurationOptions));
194 this.logger =
logger ??
throw new ArgumentNullException(nameof(
logger));
199 throw new InvalidOperationException(
"Swarm configuration missing Address!");
202 throw new InvalidOperationException(
"Swarm configuration missing Identifier!");
235 var localUpdateOperation = Interlocked.Exchange(ref
updateOperation,
null);
236 var abortResult = localUpdateOperation?.Abort();
242 logger.LogDebug(
"Another context already aborted this update.");
245 logger.LogDebug(
"Not aborting update because we have committed!");
248 logger.LogTrace(
"Attempted update abort but no operation was found!");
251 throw new InvalidOperationException($
"Invalid return value for SwarmUpdateOperation.Abort(): {abortResult}");
258 public async ValueTask<SwarmCommitResult>
CommitUpdate(CancellationToken cancellationToken)
265 if (localUpdateOperation ==
null)
267 logger.LogDebug(
"Update commit failed, no pending operation!");
272 logger.LogInformation(
"Waiting to commit update...");
277 logger.LogTrace(
"Sending ready-commit to swarm controller...");
286 using var commitReadyResponse = await httpClient.SendAsync(commitReadyRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
287 commitReadyResponse.EnsureSuccessStatusCode();
291 logger.LogWarning(ex,
"Unable to send ready-commit to swarm controller!");
300 cancellationToken).AsTask()
301 : Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
303 var commitTask = Task.WhenAny(localUpdateOperation.CommitGate, timeoutTask);
307 var commitGoAhead = localUpdateOperation.CommitGate.IsCompleted
308 && localUpdateOperation.CommitGate.Result
313 "Update commit failed!{maybeTimeout}",
314 timeoutTask.IsCompleted
321 logger.LogTrace(
"Update commit task complete");
329 logger.LogDebug(
"Sending remote commit message to nodes...");
342 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
343 response.EnsureSuccessStatusCode();
347 logger.LogCritical(ex,
"Failed to send update commit request to node {nodeId}!", swarmServer.
Identifier);
355 .Where(x => !x.Controller)
356 .Select(SendRemoteCommitUpdate)
376 ArgumentNullException.ThrowIfNull(fileStreamProvider);
378 ArgumentNullException.ThrowIfNull(version);
380 logger.LogTrace(
"Begin PrepareUpdate...");
385 UpdateVersion = version,
393 ArgumentNullException.ThrowIfNull(updateRequest);
395 logger.LogInformation(
"Received remote update request from {nodeType}", !
swarmController ?
"controller" :
"node");
405 public async ValueTask<SwarmRegistrationResult>
Initialize(CancellationToken cancellationToken)
409 "Swarm mode enabled: {nodeType} {nodeId}",
415 logger.LogTrace(
"Swarm mode disabled");
438 public async ValueTask
Shutdown(CancellationToken cancellationToken)
440 logger.LogTrace(
"Begin Shutdown");
453 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
454 response.EnsureSuccessStatusCode();
460 "Error unregistering {nodeType}!",
462 ? $
"node {swarmServer.Identifier}"
463 :
"from controller");
477 logger.LogInformation(
"Unregistering from swarm controller...");
478 await SendUnregistrationRequest(
null);
497 logger.LogInformation(
"Unregistering nodes...");
503 .Where(x => !x.Controller)
504 .Select(SendUnregistrationRequest)
513 logger.LogTrace(
"Swarm controller shutdown");
523 throw new InvalidOperationException(
"Swarm mode not enabled!");
526 throw new InvalidOperationException(
"Cannot UpdateSwarmServersList on swarm controller!");
528 lock (this.swarmServers)
530 this.swarmServers.Clear();
532 logger.LogDebug(
"Updated swarm server list with {nodeCount} total nodes", this.swarmServers.Count);
540 throw new InvalidOperationException(
"Swarm mode not enabled!");
554 public async ValueTask<SwarmRegistrationResponse?>
RegisterNode(
SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
556 ArgumentNullException.ThrowIfNull(node);
559 throw new ArgumentException(
"Node missing Identifier!", nameof(node));
562 throw new ArgumentException(
"Node missing Address!", nameof(node));
565 throw new InvalidOperationException(
"Swarm mode not enabled!");
568 throw new InvalidOperationException(
"Cannot RegisterNode on swarm node!");
570 logger.LogTrace(
"RegisterNode");
584 var preExistingRegistrationKvp =
registrationIdsAndTimes.FirstOrDefault(x => x.Value.RegistrationId == registrationId);
585 if (preExistingRegistrationKvp.Key == node.
Identifier)
587 logger.LogWarning(
"Node {nodeId} has already registered!", node.
Identifier);
588 return CreateResponse();
592 "Registration ID collision! Node {nodeId} tried to register with {otherNodeId}'s registration ID: {registrationId}",
594 preExistingRegistrationKvp.Key,
601 logger.LogInformation(
"Node {nodeId} is re-registering without first unregistering. Indicative of restart.", node.
Identifier);
616 logger.LogInformation(
"Registered node {nodeId} ({nodeIP}) with ID {registrationId}", node.
Identifier, node.
Address, registrationId);
618 return CreateResponse();
627 logger.LogDebug(
"Received remote commit go ahead");
628 return localUpdateOperation?.
Commit() ==
true;
632 if (nodeIdentifier ==
null)
635 logger.LogError(
"Aborting update due to unforseen circumstances!");
640 if (localUpdateOperation ==
null)
642 logger.LogDebug(
"Ignoring ready-commit from node {nodeId} as the update appears to have been aborted.", nodeIdentifier);
646 if (!localUpdateOperation.MarkNodeReady(nodeIdentifier))
649 "Attempting to mark {nodeId} as ready to commit resulted in the update being aborted!",
654 if (Interlocked.CompareExchange(ref
updateOperation,
null, localUpdateOperation) == localUpdateOperation)
659 logger.LogError(
"Aborting new update due to unforseen consequences!");
666 logger.LogDebug(
"Node {nodeId} is ready to commit.", nodeIdentifier);
671 public async ValueTask
UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
674 throw new InvalidOperationException(
"Swarm mode not enabled!");
676 logger.LogTrace(
"UnregisterNode {registrationId}", registrationId);
682 logger.LogInformation(
"Controller unregistering, will attempt re-registration...");
689 if (nodeIdentifier ==
null)
692 logger.LogInformation(
"Unregistering node {nodeId}...", nodeIdentifier);
696 swarmServers.RemoveAll(x => x.Identifier == nodeIdentifier);
710 logger.LogInformation(
"Aborting swarm update!");
724 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
725 response.EnsureSuccessStatusCode();
731 "Unable to send remote abort to {nodeOrController}!",
733 ? $
"node {swarmServer.Identifier}"
747 .Where(x => !x.Controller)
748 .Select(SendRemoteAbort)
766 $
"{SwarmConstants.UpdateRoute}?ticket={HttpUtility.UrlEncode(ticket.FileTicket)}",
771 request.Headers.Accept.Add(
new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Octet));
782 httpClient.Dispose();
804 var initiator = initiatorProvider !=
null;
805 logger.LogTrace(
"PrepareUpdateImpl {version}...", version);
807 var shouldAbort =
false;
812 List<SwarmServerInformation> currentNodes;
818 if (node.Identifier == updateRequest.
SourceNode)
832 var existingUpdateOperation = Interlocked.CompareExchange(ref
updateOperation, localUpdateOperation,
null);
833 if (existingUpdateOperation !=
null && existingUpdateOperation.TargetVersion != version)
835 logger.LogWarning(
"Aborting update preparation, version {targetUpdateVersion} already prepared!", existingUpdateOperation.TargetVersion);
840 if (existingUpdateOperation?.TargetVersion == version)
842 logger.LogTrace(
"PrepareUpdateImpl early out, already prepared!");
848 var downloadTickets = await
CreateDownloadTickets(initiatorProvider!, currentNodes, cancellationToken);
850 logger.LogInformation(
"Forwarding update request to swarm controller...");
858 UpdateVersion = version,
859 SourceNode = swarmConfiguration.Identifier,
860 DownloadTickets = downloadTickets,
864 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
865 if (response.IsSuccessStatusCode)
874 logger.LogTrace(
"Beginning local update process...");
876 if (sourceNode ==
null)
882 "Missing local node entry for update source node: {sourceNode}",
890 logger.LogError(
"Missing download tickets in update request!");
900 "Missing node entry for download ticket in update request!");
917 await downloaderStream.DisposeAsync();
923 logger.LogWarning(
"Failed to prepare update! Result: {serverUpdateResult}", updateApplyResult);
929 logger.LogTrace(
"No need to re-initiate update as it originated here on the swarm controller");
931 logger.LogDebug(
"Local node prepared for update to version {version}", version);
935 logger.LogWarning(ex,
"Failed to prepare update!");
951 localUpdateOperation,
967 CancellationToken cancellationToken)
969 bool abortUpdate =
false;
972 logger.LogInformation(
"Sending remote prepare to nodes...");
977 "Aborting update, controller expects to be in sync with {requiredNodeCount} nodes but currently only has {currentNodeCount}!",
984 var weAreInitiator = initiatorProvider !=
null;
987 logger.LogDebug(
"Controller has no nodes, setting commit-ready.");
991 logger.LogDebug(
"Update appears to have been aborted");
999 "Aborting update, {receivedTickets} download tickets were provided but there are {nodesToUpdate} nodes in the swarm that require the package!",
1006 var downloadTicketDictionary = weAreInitiator
1010 var sourceNode = weAreInitiator
1011 ? swarmConfiguration.Identifier
1015 using var transferSemaphore =
new SemaphoreSlim(1);
1017 bool anyFailed =
false;
1018 var updateRequests = currentUpdateOperation
1020 .Where(node => !node.Controller)
1024 Dictionary<string, FileTicketResponse>? localTicketDictionary;
1025 var nodeId = node.Identifier!;
1026 if (nodeId == sourceNode)
1027 localTicketDictionary =
null;
1028 else if (!downloadTicketDictionary.TryGetValue(nodeId, out var ticket))
1030 logger.LogError(
"Missing download ticket for node {missingNodeId}!", nodeId);
1035 localTicketDictionary =
new Dictionary<string, FileTicketResponse>
1043 SourceNode = sourceNode,
1044 DownloadTickets = localTicketDictionary,
1047 return Tuple.Create(node, request);
1054 var tasks = updateRequests
1055 .Select(async tuple =>
1057 var node = tuple!.Item1;
1058 var body = tuple.Item2;
1066 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1067 return response.IsSuccessStatusCode;
1071 await Task.WhenAll(tasks);
1074 if (tasks.All(x => x.Result))
1076 logger.LogInformation(
"Distributed prepare for update to version {version} complete.", updateRequest.
UpdateVersion);
1077 return weAreInitiator
1078 ? SwarmPrepareResult.SuccessHoldProviderUntilCommit
1083 logger.LogDebug(
"Distrubuted prepare failed!");
1087 logger.LogWarning(ex,
"Error remotely preparing updates!");
1107 IReadOnlyCollection<SwarmServerInformation> involvedServers,
1108 CancellationToken cancellationToken)
1111 var streamRetrievalTask = initiatorProvider.
GetResult(cancellationToken);
1115 ? Api.Models.ErrorCode.ResourceNotPresent
1117 async downloadToken => await initiatorProvider.
GetOwnedResult(downloadToken),
1118 "<Swarm Update Package Provider>",
1121 var serversRequiringTickets = involvedServers
1122 .Where(node => node.Identifier != swarmConfiguration.Identifier)
1125 logger.LogTrace(
"Creating {n} download tickets for other nodes...", serversRequiringTickets.Count);
1127 var downloadTickets =
new Dictionary<string, FileTicketResponse>(serversRequiringTickets.Count);
1128 foreach (var node
in serversRequiringTickets)
1129 downloadTickets.Add(
1131 transferService.CreateDownload(downloadProvider));
1133 await streamRetrievalTask;
1134 return downloadTickets;
1144 using var httpClient = httpClientFactory.CreateClient();
1146 List<SwarmServerInformation> currentSwarmServers;
1147 lock (swarmServers!)
1148 currentSwarmServers = swarmServers.ToList();
1150 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1153 using var request = PrepareSwarmRequest(
1161 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1162 response.EnsureSuccessStatusCode();
1165 catch (
Exception ex) when (ex is not OperationCanceledException)
1169 "Error during swarm server health check on node '{nodeId}'! Unregistering...",
1175 swarmServers.Remove(swarmServer);
1176 registrationIdsAndTimes.Remove(swarmServer.
Identifier!);
1182 .Where(node => !node.Controller
1183 && registrationIdsAndTimes.TryGetValue(node.Identifier!, out var registrationAndTime)
1185 .Select(HealthRequestForServer));
1188 if (swarmServers.Count != currentSwarmServers.Count)
1192 await SendUpdatedServerListToNodes(cancellationToken);
1200 serversDirty =
true;
1201 if (TriggerHealthCheck())
1202 logger.LogTrace(
"Server list is dirty!");
1211 var currentTcs = Interlocked.Exchange(ref forceHealthCheckTcs,
new TaskCompletionSource());
1212 return currentTcs!.TrySetResult();
1222 using var httpClient = httpClientFactory.CreateClient();
1224 if (controllerRegistration.HasValue)
1227 using var request = PrepareSwarmRequest(
1232 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1233 response.EnsureSuccessStatusCode();
1234 logger.LogTrace(
"Controller health check successful");
1239 logger.LogWarning(ex,
"Error during swarm controller health check! Attempting to re-register...");
1240 controllerRegistration =
null;
1245 for (var registrationAttempt = 1UL; ; ++registrationAttempt)
1247 logger.LogInformation(
"Swarm re-registration attempt {attemptNumber}...", registrationAttempt);
1248 registrationResult = await RegisterWithController(cancellationToken);
1255 logger.LogError(
"Swarm re-registration failed, controller's private key has changed!");
1261 logger.LogError(
"Swarm re-registration failed, controller's TGS version has changed!");
1265 await asyncDelayer.Delay(TimeSpan.FromSeconds(5), cancellationToken);
1279 logger.LogInformation(
"Attempting to register with swarm controller at {controllerAddress}...", swarmConfiguration.ControllerAddress);
1280 var requestedRegistrationId = Guid.NewGuid();
1282 using var httpClient = httpClientFactory.CreateClient();
1283 using var registrationRequest = PrepareSwarmRequest(
1289 Identifier = swarmConfiguration.Identifier,
1290 Address = swarmConfiguration.Address,
1291 PublicAddress = swarmConfiguration.PublicAddress,
1293 requestedRegistrationId);
1297 using var response = await httpClient.SendAsync(registrationRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
1298 if (response.IsSuccessStatusCode)
1302 var json = await response.Content.ReadAsStringAsync(cancellationToken);
1305 logger.LogDebug(
"Error reading registration response content stream! Text was null!");
1310 if (registrationResponse ==
null)
1312 logger.LogDebug(
"Error reading registration response content stream! Payload was null!");
1316 if (registrationResponse.TokenSigningKeyBase64 ==
null)
1318 logger.LogDebug(
"Error reading registration response content stream! SigningKey was null!");
1322 tokenFactory.SigningKeyBytes = Convert.FromBase64String(registrationResponse.TokenSigningKeyBase64);
1326 logger.LogDebug(ex,
"Error reading registration response content stream!");
1330 logger.LogInformation(
"Sucessfully registered with ID {registrationId}", requestedRegistrationId);
1331 controllerRegistration = requestedRegistrationId;
1332 lastControllerHealthCheck = DateTimeOffset.UtcNow;
1336 logger.LogWarning(
"Unable to register with swarm: HTTP {statusCode}!", response.StatusCode);
1338 if (response.StatusCode == HttpStatusCode.Unauthorized)
1341 if (response.StatusCode == HttpStatusCode.UpgradeRequired)
1346 var responseData = await response.Content.ReadAsStringAsync(cancellationToken);
1347 if (!String.IsNullOrWhiteSpace(responseData))
1348 logger.LogDebug(
"Response:{newLine}{responseData}", Environment.NewLine, responseData);
1352 logger.LogDebug(ex,
"Error reading registration response content stream!");
1357 logger.LogWarning(ex,
"Error sending registration request!");
1370 List<SwarmServerInformation> currentSwarmServers;
1371 lock (swarmServers!)
1373 serversDirty =
false;
1374 currentSwarmServers = swarmServers.ToList();
1377 if (currentSwarmServers.Count == 1)
1379 logger.LogTrace(
"Skipping server list broadcast as no nodes are connected!");
1383 logger.LogDebug(
"Sending updated server list to all {nodeCount} nodes...", currentSwarmServers.Count - 1);
1385 using var httpClient = httpClientFactory.CreateClient();
1388 using var request = PrepareSwarmRequest(
1394 SwarmServers = currentSwarmServers,
1399 using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken);
1400 response.EnsureSuccessStatusCode();
1402 catch (
Exception ex) when (ex is not OperationCanceledException)
1404 logger.LogWarning(ex,
"Error during swarm server list update for node '{nodeId}'! Unregistering...", swarmServer.
Identifier);
1408 swarmServers.Remove(swarmServer);
1409 registrationIdsAndTimes!.Remove(swarmServer.
Identifier!);
1416 .Where(x => !x.Controller)
1417 .Select(UpdateRequestForServer)
1432 HttpMethod httpMethod,
1435 Guid? registrationIdOverride =
null)
1439 Address = swarmConfiguration.ControllerAddress,
1442 var fullRoute = $
"{SwarmConstants.ControllerRoute}/{route}";
1444 "{method} {route} to swarm server {nodeIdOrAddress}",
1449 var request =
new HttpRequestMessage(
1451 swarmServer.
Address + fullRoute[1..]);
1454 request.Headers.Accept.Clear();
1455 request.Headers.Accept.Add(
new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Json));
1458 if (registrationIdOverride.HasValue)
1460 else if (swarmController)
1462 lock (swarmServers!)
1463 if (registrationIdsAndTimes!.TryGetValue(swarmServer.
Identifier!, out var registrationIdAndTime))
1466 else if (controllerRegistration.HasValue)
1470 request.Content =
new StringContent(
1473 MediaTypeNames.Application.Json);
1491 logger.LogTrace(
"Starting HealthCheckLoop...");
1494 var nextForceHealthCheckTask = forceHealthCheckTcs!.Task;
1495 while (!cancellationToken.IsCancellationRequested)
1498 if (swarmController)
1503 if (lastControllerHealthCheck.HasValue)
1505 var recommendedTimeOfNextCheck = lastControllerHealthCheck.Value + delay;
1507 if (recommendedTimeOfNextCheck > DateTimeOffset.UtcNow)
1508 delay = recommendedTimeOfNextCheck - DateTimeOffset.UtcNow;
1512 var delayTask = asyncDelayer.Delay(
1517 var awakeningTask = Task.WhenAny(
1519 nextForceHealthCheckTask);
1521 await awakeningTask;
1523 if (nextForceHealthCheckTask.IsCompleted && swarmController)
1529 else if (!swarmController && !nextForceHealthCheckTask.IsCompleted)
1531 if (!lastControllerHealthCheck.HasValue)
1533 logger.LogTrace(
"Not initially registered with controller, skipping health check.");
1539 logger.LogTrace(
"Controller seems to be active, skipping health check.");
1544 nextForceHealthCheckTask = forceHealthCheckTcs.Task;
1546 logger.LogTrace(
"Performing swarm health check...");
1549 if (swarmController)
1550 await HealthCheckNodes(cancellationToken);
1552 await HealthCheckController(cancellationToken);
1554 catch (
Exception ex) when (ex is not OperationCanceledException)
1556 logger.LogError(ex,
"Health check error!");
1560 catch (OperationCanceledException ex)
1562 logger.LogTrace(ex,
"Health check loop cancelled!");
1565 logger.LogTrace(
"Stopped HealthCheckLoop");
1575 if (!swarmController)
1576 throw new InvalidOperationException(
"NodeIdentifierFromRegistration on node!");
1578 lock (swarmServers!)
1580 var registrationIdsAndTimes = this.registrationIdsAndTimes!;
1581 var exists = registrationIdsAndTimes.Any(x => x.Value.RegistrationId == registrationId);
1584 logger.LogWarning(
"A node that was to be looked up ({registrationId}) disappeared from our records!", registrationId);
1588 return registrationIdsAndTimes.First(x => x.Value.RegistrationId == registrationId).Key;
Information about a server in the swarm.
virtual ? Uri PublicAddress
The address the swarm server can be publically accessed.
virtual ? Uri Address
The public address of the server.
string? Identifier
The server's identifier.
Response for when file transfers are necessary.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Configuration for the server swarm system.
uint UpdateRequiredNodeCount
The number of nodes in addition to the controller required to be connected a server swarm before perf...
Uri? ControllerAddress
The SwarmServer.Address of the swarm controller. If null, the current server is considered the contro...
override? Uri PublicAddress
A IFileStreamProvider that represents the response of HttpRequestMessages.
Attribute for bringing in the master versions list from MSBuild that aren't embedded into assemblies ...
string RawSwarmProtocolVersion
The Version string of the TGS swarm protocol.
static MasterVersionsAttribute Instance
Return the Assembly's instance of the MasterVersionsAttribute.
Constants used by the swarm system.
const int ControllerHealthCheckIntervalMinutes
Interval at which the swarm controller makes health checks on nodes.
static JsonSerializerSettings SerializerSettings
See JsonSerializerSettings for the swarm system.
const string ApiKeyHeader
The header used to pass in the Configuration.SwarmConfiguration.PrivateKey.
const string UpdateRoute
The route used for swarm updates.
const string RegisterRoute
The route used for swarm registration.
const int NodeHealthCheckIntervalMinutes
Interval at which the node makes health checks on the controller if it has not received one.
const string RegistrationIdHeader
The header used to pass in swarm registration IDs.
const int UpdateCommitTimeoutMinutes
Number of minutes the controller waits to receive a ready-commit from all nodes before aborting an up...
const int SecondsToDelayForcedHealthChecks
Number of seconds between a health check global::System.Threading.Tasks.TaskCompletionSource triggeri...
A request to register with a swarm controller.
Response for a SwarmRegistrationRequest.
A request to update a nodes list of SwarmServers.
Helps keep servers connected to the same database in sync by coordinating updates.
volatile? TaskCompletionSource forceHealthCheckTcs
A TaskCompletionSource that is used to force a health check.
HttpRequestMessage PrepareSwarmRequest(SwarmServerInformation? swarmServer, HttpMethod httpMethod, string route, object? body, Guid? registrationIdOverride=null)
Prepares a HttpRequestMessage for swarm communication.
async ValueTask< SwarmPrepareResult > ControllerDistributedPrepareUpdate(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, SwarmUpdateOperation currentUpdateOperation, CancellationToken cancellationToken)
Send a given updateRequest out to nodes from the swarm controller.
void UpdateSwarmServersList(IEnumerable< SwarmServerInformation > swarmServers)
Pass in an updated list of swarmServers to the node.
string? NodeIdentifierFromRegistration(Guid registrationId)
Gets the SwarmServer.Identifier from a given registrationId .
readonly bool swarmController
If the current server is the swarm controller.
async ValueTask UnregisterNode(Guid registrationId, CancellationToken cancellationToken)
Attempt to unregister a node with a given registrationId with the controller.A ValueTask representin...
bool ValidateRegistration(Guid registrationId)
Validate a given registrationId .true if the registration is valid, false otherwise.
readonly IFileTransferTicketProvider transferService
The IFileTransferTicketProvider for the SwarmService.
readonly? List< SwarmServerInformation > swarmServers
List<T> of connected SwarmServerInformations.
readonly IAsyncDelayer asyncDelayer
The IAsyncDelayer for the SwarmService.
ValueTask RemoteAbortUpdate()
Sends out remote abort update requests.
async ValueTask< SwarmCommitResult > CommitUpdate(CancellationToken cancellationToken)
Signal to the swarm that an update is ready to be applied.A ValueTask<TResult> resulting in the Swarm...
bool SwarmMode
If the swarm system is enabled.
async ValueTask< bool > RemoteCommitReceived(Guid registrationId, CancellationToken cancellationToken)
Notify the controller that the node with the given registrationId is ready to commit or notify the n...
async ValueTask< SwarmRegistrationResponse?> RegisterNode(SwarmServer node, Guid registrationId, CancellationToken cancellationToken)
Attempt to register a given node with the controller.A ValueTask<TResult> resulting in a SwarmRegist...
bool ExpectedNumberOfNodesConnected
Gets a value indicating if the expected amount of nodes are connected to the swarm.
async Task HealthCheckLoop(CancellationToken cancellationToken)
Timed loop for calling HealthCheckNodes(CancellationToken).
async ValueTask< SwarmPrepareResult > PrepareUpdateImpl(ISeekableFileStreamProvider? initiatorProvider, SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Implementation of PrepareUpdate(ISeekableFileStreamProvider, Version, CancellationToken),...
async ValueTask< SwarmRegistrationResult > RegisterWithController(CancellationToken cancellationToken)
Attempt to register the node with the controller.
async ValueTask Shutdown(CancellationToken cancellationToken)
Deregister with the swarm controller or put clients into querying state.A ValueTask representing the ...
readonly IDatabaseContextFactory databaseContextFactory
The IDatabaseContextFactory for the SwarmService.
async ValueTask HealthCheckController(CancellationToken cancellationToken)
Ping the swarm controller to see that it is still running. If need be, reregister.
readonly ITokenFactory tokenFactory
The ITokenFactory for the SwarmService.
async ValueTask< bool > PrepareUpdateFromController(SwarmUpdateRequest updateRequest, CancellationToken cancellationToken)
Notify the node of an update request from the controller.A ValueTask<TResult> resulting in true if th...
readonly? CancellationTokenSource serverHealthCheckCancellationTokenSource
The CancellationTokenSource for serverHealthCheckTask.
async ValueTask< SwarmRegistrationResult > Initialize(CancellationToken cancellationToken)
Attempt to register with the swarm controller if not one, sets up the database otherwise....
RequestFileStreamProvider CreateUpdateStreamProvider(SwarmServerInformation sourceNode, FileTicketResponse ticket)
Create the RequestFileStreamProvider for an update package retrieval from a given sourceNode .
async ValueTask SendUpdatedServerListToNodes(CancellationToken cancellationToken)
Sends the controllers list of nodes to all nodes.
readonly? Dictionary< string,(Guid RegistrationId, DateTimeOffset RegisteredAt)> registrationIdsAndTimes
Dictionary<TKey, TValue> of SwarmServer.Identifiers to registration Guids and when they were created.
ValueTask< SwarmPrepareResult > PrepareUpdate(ISeekableFileStreamProvider fileStreamProvider, Version version, CancellationToken cancellationToken)
Signal to the swarm that an update is requested.A ValueTask<TResult> resulting in true if the update ...
async ValueTask< Dictionary< string, FileTicketResponse > > CreateDownloadTickets(ISeekableFileStreamProvider initiatorProvider, IReadOnlyCollection< SwarmServerInformation > involvedServers, CancellationToken cancellationToken)
Create a FileTicketResponse for downloading the content of a given initiatorProvider for the rest of...
void MarkServersDirty()
Set serversDirty and complete the current forceHealthCheckTcs.
bool serversDirty
If the swarmServers list has been updated and needs to be resent to clients.
readonly ILogger< SwarmService > logger
The ILogger for the SwarmService.
SwarmService(IDatabaseContextFactory databaseContextFactory, IDatabaseSeeder databaseSeeder, IAssemblyInformationProvider assemblyInformationProvider, IHttpClientFactory httpClientFactory, IAsyncDelayer asyncDelayer, IServerUpdater serverUpdater, IFileTransferTicketProvider transferService, ITokenFactory tokenFactory, IOptions< SwarmConfiguration > swarmConfigurationOptions, ILogger< SwarmService > logger)
Initializes a new instance of the SwarmService class.
readonly SwarmConfiguration swarmConfiguration
The SwarmConfiguration for the SwarmService.
List< SwarmServerInformation >? GetSwarmServers()
Gets the list of SwarmServerInformations in the swarm, including the current one.A List<T> of SwarmSe...
readonly IAssemblyInformationProvider assemblyInformationProvider
The IAssemblyInformationProvider for the SwarmService.
Task? serverHealthCheckTask
The Task for the HealthCheckLoop(CancellationToken).
async ValueTask AbortUpdate()
Attempt to abort an uncommitted update.A ValueTask representing the running operation....
volatile? SwarmUpdateOperation updateOperation
A SwarmUpdateOperation that is currently in progress.
Guid? controllerRegistration
The registration Guid provided by the swarm controller.
DateTimeOffset? lastControllerHealthCheck
The last DateTimeOffset when the controller checked on this node.
readonly IHttpClientFactory httpClientFactory
The IHttpClientFactory for the SwarmService.
readonly IServerUpdater serverUpdater
The IServerUpdater for the SwarmService.
readonly IDatabaseSeeder databaseSeeder
The IDatabaseSeeder for the SwarmService.
bool TriggerHealthCheck()
Complete the current forceHealthCheckTcs.
async ValueTask HealthCheckNodes(CancellationToken cancellationToken)
Ping each node to see that they are still running.
Represents the state of a distributed swarm update.
Version TargetVersion
The Version being updated to.
IReadOnlyList< SwarmServerInformation > InvolvedServers
All of the SwarmServers that are involved in the updates.
bool Commit()
Attempt to commit the update.
A request to update the swarm's TGS version.
Dictionary< string, FileTicketResponse >? DownloadTickets
The map of Api.Models.Internal.SwarmServer.Identifiers to FileTicketResponses for retrieving the upda...
string? SourceNode
The Api.Models.Internal.SwarmServer.Identifier of the node to download the update package from.
Version? UpdateVersion
The TGS Version to update to.
Represents a file on disk to be downloaded.
ValueTask< ServerUpdateResult > BeginUpdate(ISwarmService swarmService, IFileStreamProvider? fileStreamProvider, Version version, CancellationToken cancellationToken)
Start the process of downloading and applying an update to a new server version .
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
For initially setting up a database.
ValueTask Downgrade(IDatabaseContext databaseContext, Version downgradeVersion, CancellationToken cancellationToken)
Migrate a given databaseContext down.
ValueTask Initialize(IDatabaseContext databaseContext, CancellationToken cancellationToken)
Setup up a given databaseContext .
ValueTask< Stream > GetResult(CancellationToken cancellationToken)
Gets the provided Stream. May be called multiple times, though cancelling any may cause all calls to ...
IFileStreamProvider that provides MemoryStreams.
bool Disposed
If the ISeekableFileStreamProvider has had global::System.IAsyncDisposable.DisposeAsync called on it.
ValueTask< MemoryStream > GetOwnedResult(CancellationToken cancellationToken)
Gets the provided MemoryStream. May be called multiple times, though cancelling any may cause all cal...
For creating TokenResponses.
ReadOnlySpan< byte > SigningKeyBytes
Gets or sets the ITokenFactory's signing key bytes.
Swarm service operations for the Controllers.SwarmController.
Start and stop controllers for a swarm service.
Used for swarm operations. Functions may be no-op based on configuration.
Service for temporarily storing files to be downloaded or uploaded.
For waiting asynchronously.
ValueTask Delay(TimeSpan timeSpan, CancellationToken cancellationToken)
Create a Task that completes after a given timeSpan .
ServerUpdateResult
The result of a call to start a server update.
SwarmUpdateAbortResult
Result of attempting to abort a SwarmUpdateOperation.
SwarmCommitResult
How to proceed on the commit step of an update.
@ AbortUpdate
The update should be aborted.
SwarmRegistrationResult
Result of attempting to register with a swarm controller.
SwarmPrepareResult
Indicates the result of a swarm update prepare operation.