tgstation-server 6.14.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
JobService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
11
12using Prometheus;
13
14using Serilog.Context;
15
26
28{
31 {
36
41
46
50 readonly ILoggerFactory loggerFactory;
51
55 readonly ILogger<JobService> logger;
56
60 readonly Dictionary<long, JobHandler> jobs;
61
65 readonly Dictionary<long, Action> hubUpdateActions;
66
70 readonly TaskCompletionSource<IInstanceCoreProvider> activationTcs;
71
75 readonly Counter processedJobs;
76
80 readonly Gauge runningJobs;
81
85 readonly object synchronizationLock;
86
90 readonly object addCancelLock;
91
95 volatile bool noMoreJobsShouldStart;
96
108 IMetricFactory metricFactory,
109 ILoggerFactory loggerFactory,
110 ILogger<JobService> logger)
111 {
112 this.hub = hub ?? throw new ArgumentNullException(nameof(hub));
113 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
114 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
115 ArgumentNullException.ThrowIfNull(metricFactory);
116 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
117
118 runningJobs = metricFactory.CreateGauge("tgs_jobs_running", "The number of TGS jobs running across all instances");
119 processedJobs = metricFactory.CreateCounter("tgs_jobs_processed", "The number of TGS jobs that have run previously");
120
121 jobs = new Dictionary<long, JobHandler>();
122 hubUpdateActions = new Dictionary<long, Action>();
123 activationTcs = new TaskCompletionSource<IInstanceCoreProvider>();
124 synchronizationLock = new object();
125 addCancelLock = new object();
126 }
127
129 public void Dispose()
130 {
131 foreach (var job in jobs)
132 job.Value.Dispose();
133 }
134
136 public async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
137 {
138 ArgumentNullException.ThrowIfNull(job);
139 ArgumentNullException.ThrowIfNull(operation);
140
141 if (job.StartedBy != null && job.StartedBy.Name == null)
142 throw new InvalidOperationException("StartedBy User associated with job does not have a Name!");
143
144 if (job.Instance == null)
145 throw new InvalidOperationException("No Instance associated with job!");
146
147 job.StartedAt = DateTimeOffset.UtcNow;
148 job.Cancelled = false;
149
150 var originalStartedBy = job.StartedBy;
152 async databaseContext =>
153 {
154 job.Instance = new Models.Instance
155 {
156 Id = job.Instance.Require(x => x.Id),
157 };
158
159 databaseContext.Instances.Attach(job.Instance);
160
161 originalStartedBy ??= await databaseContext
162 .Users
163 .GetTgsUser(
164 dbUser => new User
165 {
166 Id = dbUser.Id!.Value,
167 Name = dbUser.Name,
168 },
169 cancellationToken);
170
171 job.StartedBy = new User
172 {
173 Id = originalStartedBy.Require(x => x.Id),
174 };
175
176 databaseContext.Users.Attach(job.StartedBy);
177
178 databaseContext.Jobs.Add(job);
179
180 await databaseContext.Save(cancellationToken);
181 });
182
183 job.StartedBy = originalStartedBy;
184
185 logger.LogDebug("Registering job {jobId}: {jobDesc}...", job.Id, job.Description);
186 var jobHandler = new JobHandler(jobCancellationToken => RunJob(job, operation, jobCancellationToken));
187 try
188 {
189 lock (addCancelLock)
190 {
191 bool jobShouldStart;
193 {
194 jobs.Add(job.Require(x => x.Id), jobHandler);
195 jobShouldStart = !noMoreJobsShouldStart;
196 }
197
198 if (jobShouldStart)
199 jobHandler.Start();
200 }
201 }
202 catch
203 {
204 jobHandler.Dispose();
205 throw;
206 }
207 }
208
210 public Task StartAsync(CancellationToken cancellationToken)
211 => databaseContextFactory.UseContext(async databaseContext =>
212 {
213 // mark all jobs as cancelled
214 var badJobIds = await databaseContext
215 .Jobs
216 .AsQueryable()
217 .Where(y => !y.StoppedAt.HasValue)
218 .Select(y => y.Id!.Value)
219 .ToListAsync(cancellationToken);
220 if (badJobIds.Count > 0)
221 {
222 logger.LogTrace("Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
223 foreach (var badJobId in badJobIds)
224 {
225 var job = new Job(badJobId);
226 databaseContext.Jobs.Attach(job);
227 job.Cancelled = true;
228 job.StoppedAt = DateTimeOffset.UtcNow;
229 }
230
231 await databaseContext.Save(cancellationToken);
232 }
233
234 noMoreJobsShouldStart = false;
235 })
236 .AsTask();
237
239 public Task StopAsync(CancellationToken cancellationToken)
240 {
241 List<ValueTask<Job?>> joinTasks;
242 lock (addCancelLock)
244 {
246 joinTasks = jobs.Select(x => CancelJob(
247 new Job(x.Key),
248 null,
249 true,
250 cancellationToken))
251 .ToList();
252 }
253
254 return ValueTaskExtensions.WhenAll(joinTasks).AsTask();
255 }
256
258 public async ValueTask<Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
259 {
260 ArgumentNullException.ThrowIfNull(job);
261
262 var jid = job.Require(x => x.Id);
263 JobHandler? handler;
264 lock (addCancelLock)
265 {
267 if (!jobs.TryGetValue(jid, out handler))
268 return null;
269
270 logger.LogDebug("Cancelling job ID {jobId}...", jid);
271 handler.Cancel(); // this will ensure the db update is only done once
272 }
273
274 await databaseContextFactory.UseContext(async databaseContext =>
275 {
276 var updatedJob = new Job(jid);
277 databaseContext.Jobs.Attach(updatedJob);
278 var attachedUser = user == null
279 ? await databaseContext
280 .Users
281 .GetTgsUser(
282 dbUser => new User
283 {
284 Id = dbUser.Id!.Value,
285 },
286 cancellationToken)
287 : new User
288 {
289 Id = user.Require(x => x.Id),
290 };
291
292 databaseContext.Users.Attach(attachedUser);
293 updatedJob.CancelledBy = attachedUser;
294
295 // let either startup or cancellation set job.cancelled
296 await databaseContext.Save(cancellationToken);
297 job.CancelledBy = user;
298 });
299
300 if (blocking)
301 {
302 logger.LogTrace("Waiting on cancelled job #{jobId}...", job.Id);
303 await handler.Wait(cancellationToken);
304 logger.LogTrace("Done waiting on job #{jobId}...", job.Id);
305 }
306
307 return job;
308 }
309
311 public void SetJobProgress(JobResponse apiResponse)
312 {
313 ArgumentNullException.ThrowIfNull(apiResponse);
315 {
316 if (!jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
317 return;
318 apiResponse.Progress = handler.Progress;
319 apiResponse.Stage = handler.Stage;
320 }
321 }
322
324 public async ValueTask<bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
325 {
326 ArgumentNullException.ThrowIfNull(job);
327
328 if (!cancellationToken.CanBeCanceled)
329 throw new ArgumentException("A cancellable CancellationToken should be provided!", nameof(cancellationToken));
330
331 JobHandler? handler;
334 {
335 if (!jobs.TryGetValue(job.Require(x => x.Id), out handler))
336 return null;
337
339 }
340
341 if (noMoreJobsShouldStart && !handler.Started)
342 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
343
344 var cancelTask = ValueTask.FromResult<Job?>(null);
345 bool result;
346 using (jobCancellationToken.Register(() => cancelTask = CancelJob(job, canceller, true, cancellationToken)))
347 result = await handler.Wait(cancellationToken);
348
349 await cancelTask;
350
351 return result;
352 }
353
355 public void Activate(IInstanceCoreProvider instanceCoreProvider)
356 {
357 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
358
359 logger.LogTrace("Activating job manager...");
360 activationTcs.SetResult(instanceCoreProvider);
361 }
362
365 {
366 lock (hubUpdateActions)
367 foreach (var action in hubUpdateActions.Values)
368 action();
369 }
370
378#pragma warning disable CA1506 // TODO: Decomplexify
379 async Task<bool> RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
380#pragma warning restore CA1506
381 {
382 var jid = job.Require(x => x.Id);
383 using (LogContext.PushProperty(SerilogContextHelper.JobIdContextProperty, jid))
384 using (runningJobs.TrackInProgress())
385 try
386 {
387 void LogException(Exception ex) => logger.LogDebug(ex, "Job {jobId} exited with error!", jid);
388
389 var hubUpdatesTask = Task.CompletedTask;
390 var result = false;
391 var firstLogHappened = false;
392 var hubGroupName = JobsHub.HubGroupName(job);
393
394 Stopwatch? stopwatch = null;
395 void QueueHubUpdate(JobResponse update, bool final)
396 {
397 void NextUpdate(bool bypassRate)
398 {
399 var currentUpdatesTask = hubUpdatesTask;
400 async Task ChainHubUpdate()
401 {
402 await currentUpdatesTask;
403
404 if (!firstLogHappened)
405 {
406 logger.LogTrace("Sending updates for job {id} to hub group {group}", jid, hubGroupName);
407 firstLogHappened = true;
408 }
409
410 // DCT: Cancellation token is for job, operation should always run
411 await hub
412 .Clients
413 .Group(hubGroupName)
414 .ReceiveJobUpdate(update, CancellationToken.None);
415 }
416
417 Stopwatch? enteredLock = null;
418 try
419 {
420 if (!bypassRate && stopwatch != null)
421 {
422 Monitor.Enter(stopwatch);
423 enteredLock = stopwatch;
424 if (stopwatch.ElapsedMilliseconds * MaxHubUpdatesPerSecond < 1)
425 return; // don't spam client
426 }
427
428 hubUpdatesTask = ChainHubUpdate();
429 stopwatch = Stopwatch.StartNew();
430 }
431 finally
432 {
433 if (enteredLock != null)
434 Monitor.Exit(enteredLock);
435 }
436 }
437
438 lock (hubUpdateActions)
439 if (final)
440 hubUpdateActions.Remove(jid);
441 else
442 hubUpdateActions[jid] = () => NextUpdate(true);
443
444 NextUpdate(false);
445 }
446
447 try
448 {
449 void UpdateProgress(string? stage, double? progress)
450 {
451 if (progress.HasValue
452 && (progress.Value < 0 || progress.Value > 1))
453 {
454 var exception = new ArgumentOutOfRangeException(nameof(progress), progress, "Progress must be a value from 0-1!");
455 logger.LogError(exception, "Invalid progress value!");
456 return;
457 }
458
459 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) : null;
461 if (jobs.TryGetValue(jid, out var handler))
462 {
463 handler.Stage = stage;
464 handler.Progress = newProgress;
465
466 var updatedJob = job.ToApi();
467 updatedJob.Stage = stage;
468 updatedJob.Progress = newProgress;
469 QueueHubUpdate(updatedJob, false);
470 }
471 }
472
473 var activationTask = activationTcs.Task;
474
475 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(), "Non-server startup job registered before activation!");
476
477 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
478
479 QueueHubUpdate(job.ToApi(), false);
480
481 logger.LogTrace("Starting job...");
482 using var progressReporter = new JobProgressReporter(
483 loggerFactory.CreateLogger<JobProgressReporter>(),
484 null,
485 UpdateProgress);
486 using var innerReporter = progressReporter.CreateSection(null, 1.0);
487 await operation(
488 instanceCoreProvider.GetInstance(job.Instance!),
490 job,
491 innerReporter,
492 cancellationToken);
493
494 logger.LogDebug("Job {jobId} completed!", job.Id);
495 result = true;
496 }
497 catch (OperationCanceledException ex)
498 {
499 logger.LogDebug(ex, "Job {jobId} cancelled!", job.Id);
500 job.Cancelled = true;
501 }
502 catch (JobException e)
503 {
504 job.ErrorCode = e.ErrorCode;
505 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $" (Inner exception: {e.InnerException?.Message})";
506 LogException(e);
507 }
508 catch (Exception e)
509 {
510 job.ExceptionDetails = e.ToString();
511 LogException(e);
512 }
513
514 try
515 {
516 await databaseContextFactory.UseContext(async databaseContext =>
517 {
518 var attachedJob = new Job(jid);
519
520 databaseContext.Jobs.Attach(attachedJob);
521 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
522 attachedJob.ExceptionDetails = job.ExceptionDetails;
523 attachedJob.ErrorCode = job.ErrorCode;
524 attachedJob.Cancelled = job.Cancelled;
525
526 // DCT: Cancellation token is for job, operation should always run
527 await databaseContext.Save(CancellationToken.None);
528 });
529
530 // Resetting the context here because I CBA to worry if the cache is being used
531 await databaseContextFactory.UseContext(async databaseContext =>
532 {
533 // Cancellation might be set in another async context
534 // Also, startedby could have been renamed
535 // forced to reload here for the final hub update
536 // DCT: Cancellation token is for job, operation should always run
537 var finalJob = await databaseContext
538 .Jobs
539 .AsQueryable()
540 .Include(x => x.Instance)
541 .Include(x => x.StartedBy)
542 .Include(x => x.CancelledBy)
543 .Where(dbJob => dbJob.Id == jid)
544 .FirstAsync(CancellationToken.None);
545 QueueHubUpdate(finalJob.ToApi(), true);
546 });
547 }
548 catch
549 {
550 lock (hubUpdateActions)
551 hubUpdateActions.Remove(jid);
552
553 throw;
554 }
555
556 try
557 {
558 await hubUpdatesTask;
559 }
560 catch (Exception ex)
561 {
562 logger.LogError(ex, "Error in hub updates chain task!");
563 }
564
565 return result;
566 }
567 finally
568 {
570 {
571 var handler = jobs[jid];
572 jobs.Remove(jid);
573 handler.Dispose();
574 }
575
576 processedJobs.Inc();
577 }
578 }
579 }
580}
virtual ? long Id
The ID of the entity.
Definition EntityId.cs:14
string? Description
English description of the Job.
Definition Job.cs:25
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
Definition Job.cs:31
bool? Cancelled
If the Job was cancelled.
Definition Job.cs:56
string? ExceptionDetails
Details of any exceptions caught during the Job.
Definition Job.cs:37
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Definition JobResponse.cs:7
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
Definition JobHandler.cs:11
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
Definition JobHandler.cs:60
bool Started
If the job has started.
Definition JobHandler.cs:15
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
Definition JobService.cs:40
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly Counter processedJobs
Total number of jobs processed.
Definition JobService.cs:75
readonly ILogger< JobService > logger
The ILogger for the JobService.
Definition JobService.cs:55
readonly object synchronizationLock
lock object for various operations.
Definition JobService.cs:85
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
readonly Gauge runningJobs
Jobs currently running.
Definition JobService.cs:80
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Definition JobService.cs:35
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
Definition JobService.cs:50
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
Definition JobService.cs:70
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
Definition JobService.cs:90
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
Definition JobService.cs:65
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Definition JobService.cs:45
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
Definition JobService.cs:95
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, IMetricFactory metricFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
Definition JobService.cs:60
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
Definition JobsHub.cs:16
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
Definition Instance.cs:11
User? StartedBy
See JobResponse.StartedBy.
Definition Job.cs:21
Instance? Instance
The Models.Instance the job belongs to if any.
Definition Job.cs:32
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Definition IJobService.cs:9
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.