tgstation-server 6.19.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
JobService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
11
12using Prometheus;
13
14using Serilog.Context;
15
26
28{
31 {
36
41
46
50 readonly ILoggerFactory loggerFactory;
51
55 readonly ILogger<JobService> logger;
56
60 readonly Dictionary<long, JobHandler> jobs;
61
65 readonly Dictionary<long, Action> hubUpdateActions;
66
70 readonly TaskCompletionSource<IInstanceCoreProvider> activationTcs;
71
75 readonly Counter processedJobs;
76
80 readonly Gauge runningJobs;
81
85 readonly object synchronizationLock;
86
90 readonly object addCancelLock;
91
95 volatile bool noMoreJobsShouldStart;
96
108 IMetricFactory metricFactory,
109 ILoggerFactory loggerFactory,
110 ILogger<JobService> logger)
111 {
112 this.hub = hub ?? throw new ArgumentNullException(nameof(hub));
113 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
114 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
115 ArgumentNullException.ThrowIfNull(metricFactory);
116 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
117
118 runningJobs = metricFactory.CreateGauge("tgs_jobs_running", "The number of TGS jobs running across all instances");
119 processedJobs = metricFactory.CreateCounter("tgs_jobs_processed", "The number of TGS jobs that have run previously");
120
121 jobs = new Dictionary<long, JobHandler>();
122 hubUpdateActions = new Dictionary<long, Action>();
123 activationTcs = new TaskCompletionSource<IInstanceCoreProvider>();
124 synchronizationLock = new object();
125 addCancelLock = new object();
126 }
127
129 public void Dispose()
130 {
131 foreach (var job in jobs)
132 job.Value.Dispose();
133 }
134
136 public async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
137 {
138 ArgumentNullException.ThrowIfNull(job);
139 ArgumentNullException.ThrowIfNull(operation);
140
141 if (job.StartedBy != null && job.StartedBy.Name == null)
142 throw new InvalidOperationException("StartedBy User associated with job does not have a Name!");
143
144 if (job.Instance == null)
145 throw new InvalidOperationException("No Instance associated with job!");
146
147 job.StartedAt = DateTimeOffset.UtcNow;
148 job.Cancelled = false;
149
150 var originalStartedBy = job.StartedBy;
152 async databaseContext =>
153 {
154 job.Instance = new Models.Instance
155 {
156 Id = job.Instance.Require(x => x.Id),
157 };
158
159 databaseContext.Instances.Attach(job.Instance);
160
161 originalStartedBy ??= await databaseContext
162 .Users
163 .GetTgsUser(
164 dbUser => new User
165 {
166 Id = dbUser.Id!.Value,
167 Name = dbUser.Name,
168 },
169 cancellationToken);
170
171 job.StartedBy = new User
172 {
173 Id = originalStartedBy.Require(x => x.Id),
174 };
175
176 databaseContext.Users.Attach(job.StartedBy);
177
178 databaseContext.Jobs.Add(job);
179
180 await databaseContext.Save(cancellationToken);
181 });
182
183 job.StartedBy = originalStartedBy;
184
185 logger.LogDebug("Registering job {jobId}: {jobDesc}...", job.Id, job.Description);
186 var jobHandler = new JobHandler(jobCancellationToken => RunJob(job, operation, jobCancellationToken));
187 try
188 {
189 lock (addCancelLock)
190 {
191 bool jobShouldStart;
193 {
194 jobs.Add(job.Require(x => x.Id), jobHandler);
195 jobShouldStart = !noMoreJobsShouldStart;
196 }
197
198 if (jobShouldStart)
199 jobHandler.Start();
200 }
201 }
202 catch
203 {
204 jobHandler.Dispose();
205 throw;
206 }
207 }
208
210 public Task StartAsync(CancellationToken cancellationToken)
211 => databaseContextFactory.UseContext(async databaseContext =>
212 {
213 // mark all jobs as cancelled
214 var badJobIds = await databaseContext
215 .Jobs
216 .Where(y => !y.StoppedAt.HasValue)
217 .Select(y => y.Id!.Value)
218 .ToListAsync(cancellationToken);
219 if (badJobIds.Count > 0)
220 {
221 logger.LogTrace("Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
222 foreach (var badJobId in badJobIds)
223 {
224 var job = new Job(badJobId);
225 databaseContext.Jobs.Attach(job);
226 job.Cancelled = true;
227 job.StoppedAt = DateTimeOffset.UtcNow;
228 }
229
230 await databaseContext.Save(cancellationToken);
231 }
232
233 noMoreJobsShouldStart = false;
234 })
235 .AsTask();
236
238 public Task StopAsync(CancellationToken cancellationToken)
239 {
240 List<ValueTask<Job?>> joinTasks;
241 lock (addCancelLock)
243 {
245 joinTasks = jobs.Select(x => CancelJob(
246 new Job(x.Key),
247 null,
248 true,
249 cancellationToken))
250 .ToList();
251 }
252
253 return ValueTaskExtensions.WhenAll(joinTasks).AsTask();
254 }
255
257 public async ValueTask<Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
258 {
259 ArgumentNullException.ThrowIfNull(job);
260
261 var jid = job.Require(x => x.Id);
262 JobHandler? handler;
263 lock (addCancelLock)
264 {
266 if (!jobs.TryGetValue(jid, out handler))
267 return null;
268
269 logger.LogDebug("Cancelling job ID {jobId}...", jid);
270 handler.Cancel(); // this will ensure the db update is only done once
271 }
272
273 await databaseContextFactory.UseContext(async databaseContext =>
274 {
275 var updatedJob = new Job(jid);
276 databaseContext.Jobs.Attach(updatedJob);
277 var attachedUser = user == null
278 ? await databaseContext
279 .Users
280 .GetTgsUser(
281 dbUser => new User
282 {
283 Id = dbUser.Id!.Value,
284 },
285 cancellationToken)
286 : new User
287 {
288 Id = user.Require(x => x.Id),
289 };
290
291 databaseContext.Users.Attach(attachedUser);
292 updatedJob.CancelledBy = attachedUser;
293
294 // let either startup or cancellation set job.cancelled
295 await databaseContext.Save(cancellationToken);
296 job.CancelledBy = user;
297 });
298
299 if (blocking)
300 {
301 logger.LogTrace("Waiting on cancelled job #{jobId}...", job.Id);
302 await handler.Wait(cancellationToken);
303 logger.LogTrace("Done waiting on job #{jobId}...", job.Id);
304 }
305
306 return job;
307 }
308
310 public void SetJobProgress(JobResponse apiResponse)
311 {
312 ArgumentNullException.ThrowIfNull(apiResponse);
314 {
315 if (!jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
316 return;
317 apiResponse.Progress = handler.Progress;
318 apiResponse.Stage = handler.Stage;
319 }
320 }
321
323 public async ValueTask<bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
324 {
325 ArgumentNullException.ThrowIfNull(job);
326
327 if (!cancellationToken.CanBeCanceled)
328 throw new ArgumentException("A cancellable CancellationToken should be provided!", nameof(cancellationToken));
329
330 JobHandler? handler;
333 {
334 if (!jobs.TryGetValue(job.Require(x => x.Id), out handler))
335 return null;
336
338 }
339
340 if (noMoreJobsShouldStart && !handler.Started)
341 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
342
343 var cancelTask = ValueTask.FromResult<Job?>(null);
344 bool result;
345 using (jobCancellationToken.Register(() => cancelTask = CancelJob(job, canceller, true, cancellationToken)))
346 result = await handler.Wait(cancellationToken);
347
348 await cancelTask;
349
350 return result;
351 }
352
354 public void Activate(IInstanceCoreProvider instanceCoreProvider)
355 {
356 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
357
358 activationTcs.SetResult(instanceCoreProvider);
359 }
360
363 {
364 lock (hubUpdateActions)
365 foreach (var action in hubUpdateActions.Values)
366 action();
367 }
368
376#pragma warning disable CA1506 // TODO: Decomplexify
377 async Task<bool> RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
378#pragma warning restore CA1506
379 {
380 var jid = job.Require(x => x.Id);
381 using (LogContext.PushProperty(SerilogContextHelper.JobIdContextProperty, jid))
382 using (runningJobs.TrackInProgress())
383 try
384 {
385 void LogException(Exception ex) => logger.LogDebug(ex, "Job {jobId} exited with error!", jid);
386
387 var hubUpdatesTask = Task.CompletedTask;
388 var result = false;
389 var firstLogHappened = false;
390 var hubGroupName = JobsHub.HubGroupName(job);
391
392 Stopwatch? stopwatch = null;
393 void QueueHubUpdate(JobResponse update, bool final)
394 {
395 void NextUpdate(bool bypassRate)
396 {
397 var currentUpdatesTask = hubUpdatesTask;
398 async Task ChainHubUpdate()
399 {
400 await currentUpdatesTask;
401
402 if (!firstLogHappened)
403 {
404 logger.LogTrace("Sending updates for job {id} to hub group {group}", jid, hubGroupName);
405 firstLogHappened = true;
406 }
407
408 // DCT: Cancellation token is for job, operation should always run
409 await hub
410 .Clients
411 .Group(hubGroupName)
412 .ReceiveJobUpdate(update, CancellationToken.None);
413 }
414
415 Stopwatch? enteredLock = null;
416 try
417 {
418 if (!bypassRate && stopwatch != null)
419 {
420 Monitor.Enter(stopwatch);
421 enteredLock = stopwatch;
422 if (stopwatch.ElapsedMilliseconds * MaxHubUpdatesPerSecond < 1)
423 return; // don't spam client
424 }
425
426 hubUpdatesTask = ChainHubUpdate();
427 stopwatch = Stopwatch.StartNew();
428 }
429 finally
430 {
431 if (enteredLock != null)
432 Monitor.Exit(enteredLock);
433 }
434 }
435
436 lock (hubUpdateActions)
437 if (final)
438 hubUpdateActions.Remove(jid);
439 else
440 hubUpdateActions[jid] = () => NextUpdate(true);
441
442 NextUpdate(false);
443 }
444
445 try
446 {
447 void UpdateProgress(string? stage, double? progress)
448 {
449 if (progress.HasValue
450 && (progress.Value < 0 || progress.Value > 1))
451 {
452 var exception = new ArgumentOutOfRangeException(nameof(progress), progress, "Progress must be a value from 0-1!");
453 logger.LogError(exception, "Invalid progress value!");
454 return;
455 }
456
457 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) : null;
459 if (jobs.TryGetValue(jid, out var handler))
460 {
461 handler.Stage = stage;
462 handler.Progress = newProgress;
463
464 var updatedJob = job.ToApi();
465 updatedJob.Stage = stage;
466 updatedJob.Progress = newProgress;
467 QueueHubUpdate(updatedJob, false);
468 }
469 }
470
471 var activationTask = activationTcs.Task;
472
473 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(), "Non-server startup job registered before activation!");
474
475 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
476
477 QueueHubUpdate(job.ToApi(), false);
478
479 logger.LogTrace("Starting job...");
480 using var progressReporter = new JobProgressReporter(
481 loggerFactory.CreateLogger<JobProgressReporter>(),
482 null,
483 UpdateProgress);
484 using var innerReporter = progressReporter.CreateSection(null, 1.0);
485 await operation(
486 instanceCoreProvider.GetInstance(job.Instance!),
488 job,
489 innerReporter,
490 cancellationToken);
491
492 logger.LogDebug("Job {jobId} completed!", job.Id);
493 result = true;
494 }
495 catch (OperationCanceledException ex)
496 {
497 logger.LogDebug(ex, "Job {jobId} cancelled!", job.Id);
498 job.Cancelled = true;
499 }
500 catch (JobException e)
501 {
502 job.ErrorCode = e.ErrorCode;
503 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $" (Inner exception: {e.InnerException?.Message})";
504 LogException(e);
505 }
506 catch (Exception e)
507 {
508 job.ExceptionDetails = e.ToString();
509 LogException(e);
510 }
511
512 try
513 {
514 await databaseContextFactory.UseContext(async databaseContext =>
515 {
516 var attachedJob = new Job(jid);
517
518 databaseContext.Jobs.Attach(attachedJob);
519 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
520 attachedJob.ExceptionDetails = job.ExceptionDetails;
521 attachedJob.ErrorCode = job.ErrorCode;
522 attachedJob.Cancelled = job.Cancelled;
523
524 // DCT: Cancellation token is for job, operation should always run
525 await databaseContext.Save(CancellationToken.None);
526 });
527
528 // Resetting the context here because I CBA to worry if the cache is being used
529 await databaseContextFactory.UseContext(async databaseContext =>
530 {
531 // Cancellation might be set in another async context
532 // Also, startedby could have been renamed
533 // forced to reload here for the final hub update
534 // DCT: Cancellation token is for job, operation should always run
535 var finalJob = await databaseContext
536 .Jobs
537 .Include(x => x.Instance)
538 .Include(x => x.StartedBy)
539 .Include(x => x.CancelledBy)
540 .Where(dbJob => dbJob.Id == jid)
541 .FirstAsync(CancellationToken.None);
542 QueueHubUpdate(finalJob.ToApi(), true);
543 });
544 }
545 catch
546 {
547 lock (hubUpdateActions)
548 hubUpdateActions.Remove(jid);
549
550 throw;
551 }
552
553 try
554 {
555 await hubUpdatesTask;
556 }
557 catch (Exception ex)
558 {
559 logger.LogError(ex, "Error in hub updates chain task!");
560 }
561
562 return result;
563 }
564 finally
565 {
567 {
568 var handler = jobs[jid];
569 jobs.Remove(jid);
570 handler.Dispose();
571 }
572
573 processedJobs.Inc();
574 }
575 }
576 }
577}
virtual ? long Id
The ID of the entity.
Definition EntityId.cs:14
string? Description
English description of the Job.
Definition Job.cs:25
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
Definition Job.cs:31
bool? Cancelled
If the Job was cancelled.
Definition Job.cs:56
string? ExceptionDetails
Details of any exceptions caught during the Job.
Definition Job.cs:37
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Definition JobResponse.cs:7
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
Definition JobHandler.cs:11
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
Definition JobHandler.cs:60
bool Started
If the job has started.
Definition JobHandler.cs:15
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
Definition JobService.cs:40
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly Counter processedJobs
Total number of jobs processed.
Definition JobService.cs:75
readonly ILogger< JobService > logger
The ILogger for the JobService.
Definition JobService.cs:55
readonly object synchronizationLock
lock object for various operations.
Definition JobService.cs:85
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
readonly Gauge runningJobs
Jobs currently running.
Definition JobService.cs:80
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Definition JobService.cs:35
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
Definition JobService.cs:50
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
Definition JobService.cs:70
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
Definition JobService.cs:90
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
Definition JobService.cs:65
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Definition JobService.cs:45
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
Definition JobService.cs:95
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, IMetricFactory metricFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
Definition JobService.cs:60
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
Definition JobsHub.cs:16
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
Definition Instance.cs:11
User? StartedBy
See JobResponse.StartedBy.
Definition Job.cs:21
Instance? Instance
The Models.Instance the job belongs to if any.
Definition Job.cs:32
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Definition IJobService.cs:9
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.