tgstation-server 6.17.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
JobService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
11
12using Prometheus;
13
14using Serilog.Context;
15
26
28{
31 {
36
41
46
50 readonly ILoggerFactory loggerFactory;
51
55 readonly ILogger<JobService> logger;
56
60 readonly Dictionary<long, JobHandler> jobs;
61
65 readonly Dictionary<long, Action> hubUpdateActions;
66
70 readonly TaskCompletionSource<IInstanceCoreProvider> activationTcs;
71
75 readonly Counter processedJobs;
76
80 readonly Gauge runningJobs;
81
85 readonly object synchronizationLock;
86
90 readonly object addCancelLock;
91
95 volatile bool noMoreJobsShouldStart;
96
108 IMetricFactory metricFactory,
109 ILoggerFactory loggerFactory,
110 ILogger<JobService> logger)
111 {
112 this.hub = hub ?? throw new ArgumentNullException(nameof(hub));
113 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
114 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
115 ArgumentNullException.ThrowIfNull(metricFactory);
116 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
117
118 runningJobs = metricFactory.CreateGauge("tgs_jobs_running", "The number of TGS jobs running across all instances");
119 processedJobs = metricFactory.CreateCounter("tgs_jobs_processed", "The number of TGS jobs that have run previously");
120
121 jobs = new Dictionary<long, JobHandler>();
122 hubUpdateActions = new Dictionary<long, Action>();
123 activationTcs = new TaskCompletionSource<IInstanceCoreProvider>();
124 synchronizationLock = new object();
125 addCancelLock = new object();
126 }
127
129 public void Dispose()
130 {
131 foreach (var job in jobs)
132 job.Value.Dispose();
133 }
134
136 public async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
137 {
138 ArgumentNullException.ThrowIfNull(job);
139 ArgumentNullException.ThrowIfNull(operation);
140
141 if (job.StartedBy != null && job.StartedBy.Name == null)
142 throw new InvalidOperationException("StartedBy User associated with job does not have a Name!");
143
144 if (job.Instance == null)
145 throw new InvalidOperationException("No Instance associated with job!");
146
147 job.StartedAt = DateTimeOffset.UtcNow;
148 job.Cancelled = false;
149
150 var originalStartedBy = job.StartedBy;
152 async databaseContext =>
153 {
154 job.Instance = new Models.Instance
155 {
156 Id = job.Instance.Require(x => x.Id),
157 };
158
159 databaseContext.Instances.Attach(job.Instance);
160
161 originalStartedBy ??= await databaseContext
162 .Users
163 .GetTgsUser(
164 dbUser => new User
165 {
166 Id = dbUser.Id!.Value,
167 Name = dbUser.Name,
168 },
169 cancellationToken);
170
171 job.StartedBy = new User
172 {
173 Id = originalStartedBy.Require(x => x.Id),
174 };
175
176 databaseContext.Users.Attach(job.StartedBy);
177
178 databaseContext.Jobs.Add(job);
179
180 await databaseContext.Save(cancellationToken);
181 });
182
183 job.StartedBy = originalStartedBy;
184
185 logger.LogDebug("Registering job {jobId}: {jobDesc}...", job.Id, job.Description);
186 var jobHandler = new JobHandler(jobCancellationToken => RunJob(job, operation, jobCancellationToken));
187 try
188 {
189 lock (addCancelLock)
190 {
191 bool jobShouldStart;
193 {
194 jobs.Add(job.Require(x => x.Id), jobHandler);
195 jobShouldStart = !noMoreJobsShouldStart;
196 }
197
198 if (jobShouldStart)
199 jobHandler.Start();
200 }
201 }
202 catch
203 {
204 jobHandler.Dispose();
205 throw;
206 }
207 }
208
210 public Task StartAsync(CancellationToken cancellationToken)
211 => databaseContextFactory.UseContext(async databaseContext =>
212 {
213 // mark all jobs as cancelled
214 var badJobIds = await databaseContext
215 .Jobs
216 .AsQueryable()
217 .Where(y => !y.StoppedAt.HasValue)
218 .Select(y => y.Id!.Value)
219 .ToListAsync(cancellationToken);
220 if (badJobIds.Count > 0)
221 {
222 logger.LogTrace("Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
223 foreach (var badJobId in badJobIds)
224 {
225 var job = new Job(badJobId);
226 databaseContext.Jobs.Attach(job);
227 job.Cancelled = true;
228 job.StoppedAt = DateTimeOffset.UtcNow;
229 }
230
231 await databaseContext.Save(cancellationToken);
232 }
233
234 noMoreJobsShouldStart = false;
235 })
236 .AsTask();
237
239 public Task StopAsync(CancellationToken cancellationToken)
240 {
241 List<ValueTask<Job?>> joinTasks;
242 lock (addCancelLock)
244 {
246 joinTasks = jobs.Select(x => CancelJob(
247 new Job(x.Key),
248 null,
249 true,
250 cancellationToken))
251 .ToList();
252 }
253
254 return ValueTaskExtensions.WhenAll(joinTasks).AsTask();
255 }
256
258 public async ValueTask<Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
259 {
260 ArgumentNullException.ThrowIfNull(job);
261
262 var jid = job.Require(x => x.Id);
263 JobHandler? handler;
264 lock (addCancelLock)
265 {
267 if (!jobs.TryGetValue(jid, out handler))
268 return null;
269
270 logger.LogDebug("Cancelling job ID {jobId}...", jid);
271 handler.Cancel(); // this will ensure the db update is only done once
272 }
273
274 await databaseContextFactory.UseContext(async databaseContext =>
275 {
276 var updatedJob = new Job(jid);
277 databaseContext.Jobs.Attach(updatedJob);
278 var attachedUser = user == null
279 ? await databaseContext
280 .Users
281 .GetTgsUser(
282 dbUser => new User
283 {
284 Id = dbUser.Id!.Value,
285 },
286 cancellationToken)
287 : new User
288 {
289 Id = user.Require(x => x.Id),
290 };
291
292 databaseContext.Users.Attach(attachedUser);
293 updatedJob.CancelledBy = attachedUser;
294
295 // let either startup or cancellation set job.cancelled
296 await databaseContext.Save(cancellationToken);
297 job.CancelledBy = user;
298 });
299
300 if (blocking)
301 {
302 logger.LogTrace("Waiting on cancelled job #{jobId}...", job.Id);
303 await handler.Wait(cancellationToken);
304 logger.LogTrace("Done waiting on job #{jobId}...", job.Id);
305 }
306
307 return job;
308 }
309
311 public void SetJobProgress(JobResponse apiResponse)
312 {
313 ArgumentNullException.ThrowIfNull(apiResponse);
315 {
316 if (!jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
317 return;
318 apiResponse.Progress = handler.Progress;
319 apiResponse.Stage = handler.Stage;
320 }
321 }
322
324 public async ValueTask<bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
325 {
326 ArgumentNullException.ThrowIfNull(job);
327
328 if (!cancellationToken.CanBeCanceled)
329 throw new ArgumentException("A cancellable CancellationToken should be provided!", nameof(cancellationToken));
330
331 JobHandler? handler;
334 {
335 if (!jobs.TryGetValue(job.Require(x => x.Id), out handler))
336 return null;
337
339 }
340
341 if (noMoreJobsShouldStart && !handler.Started)
342 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
343
344 var cancelTask = ValueTask.FromResult<Job?>(null);
345 bool result;
346 using (jobCancellationToken.Register(() => cancelTask = CancelJob(job, canceller, true, cancellationToken)))
347 result = await handler.Wait(cancellationToken);
348
349 await cancelTask;
350
351 return result;
352 }
353
355 public void Activate(IInstanceCoreProvider instanceCoreProvider)
356 {
357 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
358
359 activationTcs.SetResult(instanceCoreProvider);
360 }
361
364 {
365 lock (hubUpdateActions)
366 foreach (var action in hubUpdateActions.Values)
367 action();
368 }
369
377#pragma warning disable CA1506 // TODO: Decomplexify
378 async Task<bool> RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
379#pragma warning restore CA1506
380 {
381 var jid = job.Require(x => x.Id);
382 using (LogContext.PushProperty(SerilogContextHelper.JobIdContextProperty, jid))
383 using (runningJobs.TrackInProgress())
384 try
385 {
386 void LogException(Exception ex) => logger.LogDebug(ex, "Job {jobId} exited with error!", jid);
387
388 var hubUpdatesTask = Task.CompletedTask;
389 var result = false;
390 var firstLogHappened = false;
391 var hubGroupName = JobsHub.HubGroupName(job);
392
393 Stopwatch? stopwatch = null;
394 void QueueHubUpdate(JobResponse update, bool final)
395 {
396 void NextUpdate(bool bypassRate)
397 {
398 var currentUpdatesTask = hubUpdatesTask;
399 async Task ChainHubUpdate()
400 {
401 await currentUpdatesTask;
402
403 if (!firstLogHappened)
404 {
405 logger.LogTrace("Sending updates for job {id} to hub group {group}", jid, hubGroupName);
406 firstLogHappened = true;
407 }
408
409 // DCT: Cancellation token is for job, operation should always run
410 await hub
411 .Clients
412 .Group(hubGroupName)
413 .ReceiveJobUpdate(update, CancellationToken.None);
414 }
415
416 Stopwatch? enteredLock = null;
417 try
418 {
419 if (!bypassRate && stopwatch != null)
420 {
421 Monitor.Enter(stopwatch);
422 enteredLock = stopwatch;
423 if (stopwatch.ElapsedMilliseconds * MaxHubUpdatesPerSecond < 1)
424 return; // don't spam client
425 }
426
427 hubUpdatesTask = ChainHubUpdate();
428 stopwatch = Stopwatch.StartNew();
429 }
430 finally
431 {
432 if (enteredLock != null)
433 Monitor.Exit(enteredLock);
434 }
435 }
436
437 lock (hubUpdateActions)
438 if (final)
439 hubUpdateActions.Remove(jid);
440 else
441 hubUpdateActions[jid] = () => NextUpdate(true);
442
443 NextUpdate(false);
444 }
445
446 try
447 {
448 void UpdateProgress(string? stage, double? progress)
449 {
450 if (progress.HasValue
451 && (progress.Value < 0 || progress.Value > 1))
452 {
453 var exception = new ArgumentOutOfRangeException(nameof(progress), progress, "Progress must be a value from 0-1!");
454 logger.LogError(exception, "Invalid progress value!");
455 return;
456 }
457
458 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) : null;
460 if (jobs.TryGetValue(jid, out var handler))
461 {
462 handler.Stage = stage;
463 handler.Progress = newProgress;
464
465 var updatedJob = job.ToApi();
466 updatedJob.Stage = stage;
467 updatedJob.Progress = newProgress;
468 QueueHubUpdate(updatedJob, false);
469 }
470 }
471
472 var activationTask = activationTcs.Task;
473
474 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(), "Non-server startup job registered before activation!");
475
476 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
477
478 QueueHubUpdate(job.ToApi(), false);
479
480 logger.LogTrace("Starting job...");
481 using var progressReporter = new JobProgressReporter(
482 loggerFactory.CreateLogger<JobProgressReporter>(),
483 null,
484 UpdateProgress);
485 using var innerReporter = progressReporter.CreateSection(null, 1.0);
486 await operation(
487 instanceCoreProvider.GetInstance(job.Instance!),
489 job,
490 innerReporter,
491 cancellationToken);
492
493 logger.LogDebug("Job {jobId} completed!", job.Id);
494 result = true;
495 }
496 catch (OperationCanceledException ex)
497 {
498 logger.LogDebug(ex, "Job {jobId} cancelled!", job.Id);
499 job.Cancelled = true;
500 }
501 catch (JobException e)
502 {
503 job.ErrorCode = e.ErrorCode;
504 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $" (Inner exception: {e.InnerException?.Message})";
505 LogException(e);
506 }
507 catch (Exception e)
508 {
509 job.ExceptionDetails = e.ToString();
510 LogException(e);
511 }
512
513 try
514 {
515 await databaseContextFactory.UseContext(async databaseContext =>
516 {
517 var attachedJob = new Job(jid);
518
519 databaseContext.Jobs.Attach(attachedJob);
520 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
521 attachedJob.ExceptionDetails = job.ExceptionDetails;
522 attachedJob.ErrorCode = job.ErrorCode;
523 attachedJob.Cancelled = job.Cancelled;
524
525 // DCT: Cancellation token is for job, operation should always run
526 await databaseContext.Save(CancellationToken.None);
527 });
528
529 // Resetting the context here because I CBA to worry if the cache is being used
530 await databaseContextFactory.UseContext(async databaseContext =>
531 {
532 // Cancellation might be set in another async context
533 // Also, startedby could have been renamed
534 // forced to reload here for the final hub update
535 // DCT: Cancellation token is for job, operation should always run
536 var finalJob = await databaseContext
537 .Jobs
538 .AsQueryable()
539 .Include(x => x.Instance)
540 .Include(x => x.StartedBy)
541 .Include(x => x.CancelledBy)
542 .Where(dbJob => dbJob.Id == jid)
543 .FirstAsync(CancellationToken.None);
544 QueueHubUpdate(finalJob.ToApi(), true);
545 });
546 }
547 catch
548 {
549 lock (hubUpdateActions)
550 hubUpdateActions.Remove(jid);
551
552 throw;
553 }
554
555 try
556 {
557 await hubUpdatesTask;
558 }
559 catch (Exception ex)
560 {
561 logger.LogError(ex, "Error in hub updates chain task!");
562 }
563
564 return result;
565 }
566 finally
567 {
569 {
570 var handler = jobs[jid];
571 jobs.Remove(jid);
572 handler.Dispose();
573 }
574
575 processedJobs.Inc();
576 }
577 }
578 }
579}
virtual ? long Id
The ID of the entity.
Definition EntityId.cs:14
string? Description
English description of the Job.
Definition Job.cs:25
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
Definition Job.cs:31
bool? Cancelled
If the Job was cancelled.
Definition Job.cs:56
string? ExceptionDetails
Details of any exceptions caught during the Job.
Definition Job.cs:37
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Definition JobResponse.cs:7
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
Definition JobHandler.cs:11
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
Definition JobHandler.cs:60
bool Started
If the job has started.
Definition JobHandler.cs:15
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
Definition JobService.cs:40
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly Counter processedJobs
Total number of jobs processed.
Definition JobService.cs:75
readonly ILogger< JobService > logger
The ILogger for the JobService.
Definition JobService.cs:55
readonly object synchronizationLock
lock object for various operations.
Definition JobService.cs:85
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
readonly Gauge runningJobs
Jobs currently running.
Definition JobService.cs:80
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Definition JobService.cs:35
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
Definition JobService.cs:50
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
Definition JobService.cs:70
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
Definition JobService.cs:90
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
Definition JobService.cs:65
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Definition JobService.cs:45
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
Definition JobService.cs:95
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, IMetricFactory metricFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
Definition JobService.cs:60
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
Definition JobsHub.cs:16
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
Definition Instance.cs:11
User? StartedBy
See JobResponse.StartedBy.
Definition Job.cs:21
Instance? Instance
The Models.Instance the job belongs to if any.
Definition Job.cs:32
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Definition IJobService.cs:9
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.