2using System.Collections.Generic;
3using System.Diagnostics;
6using System.Threading.Tasks;
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
60 readonly Dictionary<long, JobHandler>
jobs;
108 IMetricFactory metricFactory,
110 ILogger<JobService>
logger)
112 this.hub =
hub ??
throw new ArgumentNullException(nameof(
hub));
115 ArgumentNullException.ThrowIfNull(metricFactory);
116 this.logger =
logger ??
throw new ArgumentNullException(nameof(
logger));
118 runningJobs = metricFactory.CreateGauge(
"tgs_jobs_running",
"The number of TGS jobs running across all instances");
119 processedJobs = metricFactory.CreateCounter(
"tgs_jobs_processed",
"The number of TGS jobs that have run previously");
121 jobs =
new Dictionary<long, JobHandler>();
123 activationTcs =
new TaskCompletionSource<IInstanceCoreProvider>();
131 foreach (var job
in jobs)
138 ArgumentNullException.ThrowIfNull(job);
139 ArgumentNullException.ThrowIfNull(operation);
142 throw new InvalidOperationException(
"StartedBy User associated with job does not have a Name!");
145 throw new InvalidOperationException(
"No Instance associated with job!");
147 job.StartedAt = DateTimeOffset.UtcNow;
148 job.Cancelled =
false;
152 async databaseContext =>
156 Id = job.
Instance.Require(x => x.Id),
159 databaseContext.Instances.Attach(job.
Instance);
161 originalStartedBy ??= await databaseContext
166 Id = dbUser.Id!.Value,
171 job.StartedBy =
new User
173 Id = originalStartedBy.Require(x => x.Id),
176 databaseContext.Users.Attach(job.
StartedBy);
178 databaseContext.Jobs.Add(job);
180 await databaseContext.Save(cancellationToken);
183 job.StartedBy = originalStartedBy;
186 var jobHandler =
new JobHandler(jobCancellationToken =>
RunJob(job, operation, jobCancellationToken));
194 jobs.Add(job.Require(x => x.Id), jobHandler);
204 jobHandler.Dispose();
214 var badJobIds = await databaseContext
217 .Where(y => !y.StoppedAt.HasValue)
218 .Select(y => y.Id!.Value)
219 .ToListAsync(cancellationToken);
220 if (badJobIds.Count > 0)
222 logger.LogTrace(
"Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
223 foreach (var badJobId
in badJobIds)
225 var job =
new Job(badJobId);
226 databaseContext.Jobs.Attach(job);
227 job.Cancelled =
true;
228 job.StoppedAt = DateTimeOffset.UtcNow;
231 await databaseContext.Save(cancellationToken);
239 public Task
StopAsync(CancellationToken cancellationToken)
241 List<ValueTask<Job?>> joinTasks;
258 public async ValueTask<Job?>
CancelJob(
Job job,
User? user,
bool blocking, CancellationToken cancellationToken)
260 ArgumentNullException.ThrowIfNull(job);
262 var jid = job.Require(x => x.Id);
267 if (!
jobs.TryGetValue(jid, out handler))
270 logger.LogDebug(
"Cancelling job ID {jobId}...", jid);
276 var updatedJob =
new Job(jid);
277 databaseContext.Jobs.Attach(updatedJob);
278 var attachedUser = user ==
null
279 ? await databaseContext
284 Id = dbUser.Id!.Value,
289 Id = user.Require(x => x.Id),
292 databaseContext.Users.Attach(attachedUser);
293 updatedJob.CancelledBy = attachedUser;
296 await databaseContext.Save(cancellationToken);
297 job.CancelledBy = user;
302 logger.LogTrace(
"Waiting on cancelled job #{jobId}...", job.
Id);
303 await handler.
Wait(cancellationToken);
304 logger.LogTrace(
"Done waiting on job #{jobId}...", job.
Id);
313 ArgumentNullException.ThrowIfNull(apiResponse);
316 if (!
jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
318 apiResponse.Progress = handler.Progress;
319 apiResponse.Stage = handler.Stage;
324 public async ValueTask<bool?>
WaitForJobCompletion(
Job job,
User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
326 ArgumentNullException.ThrowIfNull(job);
328 if (!cancellationToken.CanBeCanceled)
329 throw new ArgumentException(
"A cancellable CancellationToken should be provided!", nameof(cancellationToken));
335 if (!
jobs.TryGetValue(job.Require(x => x.Id), out handler))
341 if (noMoreJobsShouldStart && !handler.
Started)
342 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
344 var cancelTask = ValueTask.FromResult<
Job?>(
null);
346 using (jobCancellationToken.Register(() => cancelTask =
CancelJob(job, canceller,
true, cancellationToken)))
347 result = await handler.
Wait(cancellationToken);
357 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
359 logger.LogTrace(
"Activating job manager...");
378#pragma warning disable CA1506
380#pragma warning restore CA1506
382 var jid = job.Require(x => x.Id);
387 void LogException(
Exception ex) =>
logger.LogDebug(ex,
"Job {jobId} exited with error!", jid);
389 var hubUpdatesTask = Task.CompletedTask;
391 var firstLogHappened =
false;
394 Stopwatch? stopwatch =
null;
395 void QueueHubUpdate(
JobResponse update,
bool final)
397 void NextUpdate(
bool bypassRate)
399 var currentUpdatesTask = hubUpdatesTask;
400 async Task ChainHubUpdate()
402 await currentUpdatesTask;
404 if (!firstLogHappened)
406 logger.LogTrace(
"Sending updates for job {id} to hub group {group}", jid, hubGroupName);
407 firstLogHappened =
true;
414 .ReceiveJobUpdate(update, CancellationToken.None);
417 Stopwatch? enteredLock =
null;
420 if (!bypassRate && stopwatch !=
null)
422 Monitor.Enter(stopwatch);
423 enteredLock = stopwatch;
428 hubUpdatesTask = ChainHubUpdate();
429 stopwatch = Stopwatch.StartNew();
433 if (enteredLock !=
null)
434 Monitor.Exit(enteredLock);
449 void UpdateProgress(
string? stage,
double? progress)
451 if (progress.HasValue
452 && (progress.Value < 0 || progress.Value > 1))
454 var exception =
new ArgumentOutOfRangeException(nameof(progress), progress,
"Progress must be a value from 0-1!");
455 logger.LogError(exception,
"Invalid progress value!");
459 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) :
null;
461 if (
jobs.TryGetValue(jid, out var handler))
463 handler.Stage = stage;
464 handler.Progress = newProgress;
466 var updatedJob = job.
ToApi();
467 updatedJob.Stage = stage;
468 updatedJob.Progress = newProgress;
469 QueueHubUpdate(updatedJob,
false);
475 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(),
"Non-server startup job registered before activation!");
477 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
479 QueueHubUpdate(job.
ToApi(),
false);
481 logger.LogTrace(
"Starting job...");
486 using var innerReporter = progressReporter.CreateSection(
null, 1.0);
488 instanceCoreProvider.GetInstance(job.
Instance!),
494 logger.LogDebug(
"Job {jobId} completed!", job.
Id);
497 catch (OperationCanceledException ex)
499 logger.LogDebug(ex,
"Job {jobId} cancelled!", job.
Id);
500 job.Cancelled =
true;
505 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $
" (Inner exception: {e.InnerException?.Message})";
510 job.ExceptionDetails = e.ToString();
518 var attachedJob =
new Job(jid);
520 databaseContext.Jobs.Attach(attachedJob);
521 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
527 await databaseContext.Save(CancellationToken.None);
537 var finalJob = await databaseContext
540 .Include(x => x.Instance)
541 .Include(x => x.StartedBy)
542 .Include(x => x.CancelledBy)
543 .Where(dbJob => dbJob.Id == jid)
544 .FirstAsync(CancellationToken.None);
545 QueueHubUpdate(finalJob.ToApi(),
true);
558 await hubUpdatesTask;
562 logger.LogError(ex,
"Error in hub updates chain task!");
571 var handler =
jobs[jid];
virtual ? long Id
The ID of the entity.
string? Description
English description of the Job.
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
bool? Cancelled
If the Job was cancelled.
string? ExceptionDetails
Details of any exceptions caught during the Job.
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
void Cancel()
Cancels task.
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
bool Started
If the job has started.
Progress reporter for a Job.
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly Counter processedJobs
Total number of jobs processed.
readonly ILogger< JobService > logger
The ILogger for the JobService.
readonly object synchronizationLock
lock object for various operations.
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
readonly Gauge runningJobs
Jobs currently running.
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, IMetricFactory metricFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
User? StartedBy
See JobResponse.StartedBy.
Instance? Instance
The Models.Instance the job belongs to if any.
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Provider for IInstanceCores.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.