2using System.Collections.Generic;
3using System.Diagnostics;
6using System.Threading.Tasks;
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
60 readonly Dictionary<long, JobHandler>
jobs;
108 IMetricFactory metricFactory,
110 ILogger<JobService>
logger)
112 this.hub =
hub ??
throw new ArgumentNullException(nameof(
hub));
115 ArgumentNullException.ThrowIfNull(metricFactory);
116 this.logger =
logger ??
throw new ArgumentNullException(nameof(
logger));
118 runningJobs = metricFactory.CreateGauge(
"tgs_jobs_running",
"The number of TGS jobs running across all instances");
119 processedJobs = metricFactory.CreateCounter(
"tgs_jobs_processed",
"The number of TGS jobs that have run previously");
121 jobs =
new Dictionary<long, JobHandler>();
123 activationTcs =
new TaskCompletionSource<IInstanceCoreProvider>();
131 foreach (var job
in jobs)
138 ArgumentNullException.ThrowIfNull(job);
139 ArgumentNullException.ThrowIfNull(operation);
142 throw new InvalidOperationException(
"StartedBy User associated with job does not have a Name!");
145 throw new InvalidOperationException(
"No Instance associated with job!");
147 job.StartedAt = DateTimeOffset.UtcNow;
148 job.Cancelled =
false;
152 async databaseContext =>
156 Id = job.
Instance.Require(x => x.Id),
159 databaseContext.Instances.Attach(job.
Instance);
161 originalStartedBy ??= await databaseContext
166 Id = dbUser.Id!.Value,
171 job.StartedBy =
new User
173 Id = originalStartedBy.Require(x => x.Id),
176 databaseContext.Users.Attach(job.
StartedBy);
178 databaseContext.Jobs.Add(job);
180 await databaseContext.Save(cancellationToken);
183 job.StartedBy = originalStartedBy;
186 var jobHandler =
new JobHandler(jobCancellationToken =>
RunJob(job, operation, jobCancellationToken));
194 jobs.Add(job.Require(x => x.Id), jobHandler);
204 jobHandler.Dispose();
214 var badJobIds = await databaseContext
216 .Where(y => !y.StoppedAt.HasValue)
217 .Select(y => y.Id!.Value)
218 .ToListAsync(cancellationToken);
219 if (badJobIds.Count > 0)
221 logger.LogTrace(
"Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
222 foreach (var badJobId
in badJobIds)
224 var job =
new Job(badJobId);
225 databaseContext.Jobs.Attach(job);
226 job.Cancelled =
true;
227 job.StoppedAt = DateTimeOffset.UtcNow;
230 await databaseContext.Save(cancellationToken);
238 public Task
StopAsync(CancellationToken cancellationToken)
240 List<ValueTask<Job?>> joinTasks;
257 public async ValueTask<Job?>
CancelJob(
Job job,
User? user,
bool blocking, CancellationToken cancellationToken)
259 ArgumentNullException.ThrowIfNull(job);
261 var jid = job.Require(x => x.Id);
266 if (!
jobs.TryGetValue(jid, out handler))
269 logger.LogDebug(
"Cancelling job ID {jobId}...", jid);
275 var updatedJob =
new Job(jid);
276 databaseContext.Jobs.Attach(updatedJob);
277 var attachedUser = user ==
null
278 ? await databaseContext
283 Id = dbUser.Id!.Value,
288 Id = user.Require(x => x.Id),
291 databaseContext.Users.Attach(attachedUser);
292 updatedJob.CancelledBy = attachedUser;
295 await databaseContext.Save(cancellationToken);
296 job.CancelledBy = user;
301 logger.LogTrace(
"Waiting on cancelled job #{jobId}...", job.
Id);
302 await handler.
Wait(cancellationToken);
303 logger.LogTrace(
"Done waiting on job #{jobId}...", job.
Id);
312 ArgumentNullException.ThrowIfNull(apiResponse);
315 if (!
jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
317 apiResponse.Progress = handler.Progress;
318 apiResponse.Stage = handler.Stage;
323 public async ValueTask<bool?>
WaitForJobCompletion(
Job job,
User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
325 ArgumentNullException.ThrowIfNull(job);
327 if (!cancellationToken.CanBeCanceled)
328 throw new ArgumentException(
"A cancellable CancellationToken should be provided!", nameof(cancellationToken));
334 if (!
jobs.TryGetValue(job.Require(x => x.Id), out handler))
340 if (noMoreJobsShouldStart && !handler.
Started)
341 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
343 var cancelTask = ValueTask.FromResult<
Job?>(
null);
345 using (jobCancellationToken.Register(() => cancelTask =
CancelJob(job, canceller,
true, cancellationToken)))
346 result = await handler.
Wait(cancellationToken);
356 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
376#pragma warning disable CA1506
378#pragma warning restore CA1506
380 var jid = job.Require(x => x.Id);
385 void LogException(
Exception ex) =>
logger.LogDebug(ex,
"Job {jobId} exited with error!", jid);
387 var hubUpdatesTask = Task.CompletedTask;
389 var firstLogHappened =
false;
392 Stopwatch? stopwatch =
null;
393 void QueueHubUpdate(
JobResponse update,
bool final)
395 void NextUpdate(
bool bypassRate)
397 var currentUpdatesTask = hubUpdatesTask;
398 async Task ChainHubUpdate()
400 await currentUpdatesTask;
402 if (!firstLogHappened)
404 logger.LogTrace(
"Sending updates for job {id} to hub group {group}", jid, hubGroupName);
405 firstLogHappened =
true;
412 .ReceiveJobUpdate(update, CancellationToken.None);
415 Stopwatch? enteredLock =
null;
418 if (!bypassRate && stopwatch !=
null)
420 Monitor.Enter(stopwatch);
421 enteredLock = stopwatch;
426 hubUpdatesTask = ChainHubUpdate();
427 stopwatch = Stopwatch.StartNew();
431 if (enteredLock !=
null)
432 Monitor.Exit(enteredLock);
447 void UpdateProgress(
string? stage,
double? progress)
449 if (progress.HasValue
450 && (progress.Value < 0 || progress.Value > 1))
452 var exception =
new ArgumentOutOfRangeException(nameof(progress), progress,
"Progress must be a value from 0-1!");
453 logger.LogError(exception,
"Invalid progress value!");
457 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) :
null;
459 if (
jobs.TryGetValue(jid, out var handler))
461 handler.Stage = stage;
462 handler.Progress = newProgress;
464 var updatedJob = job.
ToApi();
465 updatedJob.Stage = stage;
466 updatedJob.Progress = newProgress;
467 QueueHubUpdate(updatedJob,
false);
473 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(),
"Non-server startup job registered before activation!");
475 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
477 QueueHubUpdate(job.
ToApi(),
false);
479 logger.LogTrace(
"Starting job...");
484 using var innerReporter = progressReporter.CreateSection(
null, 1.0);
486 instanceCoreProvider.GetInstance(job.
Instance!),
492 logger.LogDebug(
"Job {jobId} completed!", job.
Id);
495 catch (OperationCanceledException ex)
497 logger.LogDebug(ex,
"Job {jobId} cancelled!", job.
Id);
498 job.Cancelled =
true;
503 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $
" (Inner exception: {e.InnerException?.Message})";
508 job.ExceptionDetails = e.ToString();
516 var attachedJob =
new Job(jid);
518 databaseContext.Jobs.Attach(attachedJob);
519 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
525 await databaseContext.Save(CancellationToken.None);
535 var finalJob = await databaseContext
537 .Include(x => x.Instance)
538 .Include(x => x.StartedBy)
539 .Include(x => x.CancelledBy)
540 .Where(dbJob => dbJob.Id == jid)
541 .FirstAsync(CancellationToken.None);
542 QueueHubUpdate(finalJob.ToApi(),
true);
555 await hubUpdatesTask;
559 logger.LogError(ex,
"Error in hub updates chain task!");
568 var handler =
jobs[jid];
virtual ? long Id
The ID of the entity.
string? Description
English description of the Job.
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
bool? Cancelled
If the Job was cancelled.
string? ExceptionDetails
Details of any exceptions caught during the Job.
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
void Cancel()
Cancels task.
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
bool Started
If the job has started.
Progress reporter for a Job.
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly Counter processedJobs
Total number of jobs processed.
readonly ILogger< JobService > logger
The ILogger for the JobService.
readonly object synchronizationLock
lock object for various operations.
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
readonly Gauge runningJobs
Jobs currently running.
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, IMetricFactory metricFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
User? StartedBy
See JobResponse.StartedBy.
Instance? Instance
The Models.Instance the job belongs to if any.
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Provider for IInstanceCores.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.