2using System.Collections.Generic;
3using System.Diagnostics;
6using System.Threading.Tasks;
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
58 readonly Dictionary<long, JobHandler>
jobs;
96 ILogger<JobService>
logger)
98 this.hub =
hub ??
throw new ArgumentNullException(nameof(
hub));
101 this.logger =
logger ??
throw new ArgumentNullException(nameof(
logger));
103 jobs =
new Dictionary<long, JobHandler>();
105 activationTcs =
new TaskCompletionSource<IInstanceCoreProvider>();
113 foreach (var job
in jobs)
120 ArgumentNullException.ThrowIfNull(job);
121 ArgumentNullException.ThrowIfNull(operation);
124 throw new InvalidOperationException(
"StartedBy User associated with job does not have a Name!");
127 throw new InvalidOperationException(
"No Instance associated with job!");
129 job.StartedAt = DateTimeOffset.UtcNow;
130 job.Cancelled =
false;
134 async databaseContext =>
138 Id = job.
Instance.Require(x => x.Id),
141 databaseContext.Instances.Attach(job.
Instance);
143 originalStartedBy ??= await databaseContext
148 Id = dbUser.Id!.Value,
153 job.StartedBy =
new User
155 Id = originalStartedBy.Require(x => x.Id),
158 databaseContext.Users.Attach(job.
StartedBy);
160 databaseContext.Jobs.Add(job);
162 await databaseContext.Save(cancellationToken);
165 job.StartedBy = originalStartedBy;
168 var jobHandler =
new JobHandler(jobCancellationToken =>
RunJob(job, operation, jobCancellationToken));
176 jobs.Add(job.Require(x => x.Id), jobHandler);
186 jobHandler.Dispose();
196 var badJobIds = await databaseContext
199 .Where(y => !y.StoppedAt.HasValue)
200 .Select(y => y.Id!.Value)
201 .ToListAsync(cancellationToken);
202 if (badJobIds.Count > 0)
204 logger.LogTrace(
"Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
205 foreach (var badJobId
in badJobIds)
207 var job =
new Job(badJobId);
208 databaseContext.Jobs.Attach(job);
209 job.Cancelled =
true;
210 job.StoppedAt = DateTimeOffset.UtcNow;
213 await databaseContext.Save(cancellationToken);
221 public Task
StopAsync(CancellationToken cancellationToken)
223 List<ValueTask<Job?>> joinTasks;
240 public async ValueTask<Job?>
CancelJob(
Job job,
User? user,
bool blocking, CancellationToken cancellationToken)
242 ArgumentNullException.ThrowIfNull(job);
244 var jid = job.Require(x => x.Id);
249 if (!
jobs.TryGetValue(jid, out handler))
252 logger.LogDebug(
"Cancelling job ID {jobId}...", jid);
258 var updatedJob =
new Job(jid);
259 databaseContext.Jobs.Attach(updatedJob);
260 var attachedUser = user ==
null
261 ? await databaseContext
266 Id = dbUser.Id!.Value,
271 Id = user.Require(x => x.Id),
274 databaseContext.Users.Attach(attachedUser);
275 updatedJob.CancelledBy = attachedUser;
278 await databaseContext.Save(cancellationToken);
279 job.CancelledBy = user;
284 logger.LogTrace(
"Waiting on cancelled job #{jobId}...", job.
Id);
285 await handler.
Wait(cancellationToken);
286 logger.LogTrace(
"Done waiting on job #{jobId}...", job.
Id);
295 ArgumentNullException.ThrowIfNull(apiResponse);
298 if (!
jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
300 apiResponse.Progress = handler.Progress;
301 apiResponse.Stage = handler.Stage;
306 public async ValueTask<bool?>
WaitForJobCompletion(
Job job,
User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
308 ArgumentNullException.ThrowIfNull(job);
310 if (!cancellationToken.CanBeCanceled)
311 throw new ArgumentException(
"A cancellable CancellationToken should be provided!", nameof(cancellationToken));
317 if (!
jobs.TryGetValue(job.Require(x => x.Id), out handler))
323 if (noMoreJobsShouldStart && !handler.
Started)
324 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
326 var cancelTask = ValueTask.FromResult<
Job?>(
null);
328 using (jobCancellationToken.Register(() => cancelTask =
CancelJob(job, canceller,
true, cancellationToken)))
329 result = await handler.
Wait(cancellationToken);
339 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
341 logger.LogTrace(
"Activating job manager...");
360#pragma warning disable CA1506
362#pragma warning restore CA1506
364 var jid = job.Require(x => x.Id);
368 void LogException(
Exception ex) =>
logger.LogDebug(ex,
"Job {jobId} exited with error!", jid);
370 var hubUpdatesTask = Task.CompletedTask;
372 var firstLogHappened =
false;
375 Stopwatch? stopwatch =
null;
376 void QueueHubUpdate(
JobResponse update,
bool final)
378 void NextUpdate(
bool bypassRate)
380 var currentUpdatesTask = hubUpdatesTask;
381 async Task ChainHubUpdate()
383 await currentUpdatesTask;
385 if (!firstLogHappened)
387 logger.LogTrace(
"Sending updates for job {id} to hub group {group}", jid, hubGroupName);
388 firstLogHappened =
true;
395 .ReceiveJobUpdate(update, CancellationToken.None);
398 Stopwatch? enteredLock =
null;
401 if (!bypassRate && stopwatch !=
null)
403 Monitor.Enter(stopwatch);
404 enteredLock = stopwatch;
409 hubUpdatesTask = ChainHubUpdate();
410 stopwatch = Stopwatch.StartNew();
414 if (enteredLock !=
null)
415 Monitor.Exit(enteredLock);
430 void UpdateProgress(
string? stage,
double? progress)
432 if (progress.HasValue
433 && (progress.Value < 0 || progress.Value > 1))
435 var exception =
new ArgumentOutOfRangeException(nameof(progress), progress,
"Progress must be a value from 0-1!");
436 logger.LogError(exception,
"Invalid progress value!");
440 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) :
null;
442 if (
jobs.TryGetValue(jid, out var handler))
444 handler.Stage = stage;
445 handler.Progress = newProgress;
447 var updatedJob = job.
ToApi();
448 updatedJob.Stage = stage;
449 updatedJob.Progress = newProgress;
450 QueueHubUpdate(updatedJob,
false);
456 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(),
"Non-server startup job registered before activation!");
458 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
460 QueueHubUpdate(job.
ToApi(),
false);
462 logger.LogTrace(
"Starting job...");
467 using var innerReporter = progressReporter.CreateSection(
null, 1.0);
469 instanceCoreProvider.GetInstance(job.
Instance!),
475 logger.LogDebug(
"Job {jobId} completed!", job.
Id);
478 catch (OperationCanceledException ex)
480 logger.LogDebug(ex,
"Job {jobId} cancelled!", job.
Id);
481 job.Cancelled =
true;
486 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $
" (Inner exception: {e.InnerException?.Message})";
491 job.ExceptionDetails = e.ToString();
499 var attachedJob =
new Job(jid);
501 databaseContext.Jobs.Attach(attachedJob);
502 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
508 await databaseContext.Save(CancellationToken.None);
518 var finalJob = await databaseContext
521 .Include(x => x.Instance)
522 .Include(x => x.StartedBy)
523 .Include(x => x.CancelledBy)
524 .Where(dbJob => dbJob.Id == jid)
525 .FirstAsync(CancellationToken.None);
526 QueueHubUpdate(finalJob.ToApi(),
true);
539 await hubUpdatesTask;
543 logger.LogError(ex,
"Error in hub updates chain task!");
552 var handler =
jobs[jid];
virtual ? long Id
The ID of the entity.
string? Description
English description of the Job.
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
bool? Cancelled
If the Job was cancelled.
string? ExceptionDetails
Details of any exceptions caught during the Job.
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
void Cancel()
Cancels task.
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
bool Started
If the job has started.
Progress reporter for a Job.
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly ILogger< JobService > logger
The ILogger for the JobService.
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
readonly object synchronizationLock
lock object for various operations.
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
User? StartedBy
See JobResponse.StartedBy.
Instance? Instance
The Models.Instance the job belongs to if any.
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Provider for IInstanceCores.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.