tgstation-server 6.12.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
JobService.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7
8using Microsoft.AspNetCore.SignalR;
9using Microsoft.EntityFrameworkCore;
10using Microsoft.Extensions.Logging;
11
12using Serilog.Context;
13
24
26{
29 {
34
39
44
48 readonly ILoggerFactory loggerFactory;
49
53 readonly ILogger<JobService> logger;
54
58 readonly Dictionary<long, JobHandler> jobs;
59
63 readonly Dictionary<long, Action> hubUpdateActions;
64
68 readonly TaskCompletionSource<IInstanceCoreProvider> activationTcs;
69
73 readonly object synchronizationLock;
74
78 readonly object addCancelLock;
79
83 volatile bool noMoreJobsShouldStart;
84
92 public JobService(
95 ILoggerFactory loggerFactory,
96 ILogger<JobService> logger)
97 {
98 this.hub = hub ?? throw new ArgumentNullException(nameof(hub));
99 this.databaseContextFactory = databaseContextFactory ?? throw new ArgumentNullException(nameof(databaseContextFactory));
100 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
101 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
102
103 jobs = new Dictionary<long, JobHandler>();
104 hubUpdateActions = new Dictionary<long, Action>();
105 activationTcs = new TaskCompletionSource<IInstanceCoreProvider>();
106 synchronizationLock = new object();
107 addCancelLock = new object();
108 }
109
111 public void Dispose()
112 {
113 foreach (var job in jobs)
114 job.Value.Dispose();
115 }
116
118 public async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
119 {
120 ArgumentNullException.ThrowIfNull(job);
121 ArgumentNullException.ThrowIfNull(operation);
122
123 if (job.StartedBy != null && job.StartedBy.Name == null)
124 throw new InvalidOperationException("StartedBy User associated with job does not have a Name!");
125
126 if (job.Instance == null)
127 throw new InvalidOperationException("No Instance associated with job!");
128
129 job.StartedAt = DateTimeOffset.UtcNow;
130 job.Cancelled = false;
131
132 var originalStartedBy = job.StartedBy;
134 async databaseContext =>
135 {
136 job.Instance = new Models.Instance
137 {
138 Id = job.Instance.Require(x => x.Id),
139 };
140
141 databaseContext.Instances.Attach(job.Instance);
142
143 originalStartedBy ??= await databaseContext
144 .Users
145 .GetTgsUser(
146 dbUser => new User
147 {
148 Id = dbUser.Id!.Value,
149 Name = dbUser.Name,
150 },
151 cancellationToken);
152
153 job.StartedBy = new User
154 {
155 Id = originalStartedBy.Require(x => x.Id),
156 };
157
158 databaseContext.Users.Attach(job.StartedBy);
159
160 databaseContext.Jobs.Add(job);
161
162 await databaseContext.Save(cancellationToken);
163 });
164
165 job.StartedBy = originalStartedBy;
166
167 logger.LogDebug("Registering job {jobId}: {jobDesc}...", job.Id, job.Description);
168 var jobHandler = new JobHandler(jobCancellationToken => RunJob(job, operation, jobCancellationToken));
169 try
170 {
171 lock (addCancelLock)
172 {
173 bool jobShouldStart;
175 {
176 jobs.Add(job.Require(x => x.Id), jobHandler);
177 jobShouldStart = !noMoreJobsShouldStart;
178 }
179
180 if (jobShouldStart)
181 jobHandler.Start();
182 }
183 }
184 catch
185 {
186 jobHandler.Dispose();
187 throw;
188 }
189 }
190
192 public Task StartAsync(CancellationToken cancellationToken)
193 => databaseContextFactory.UseContext(async databaseContext =>
194 {
195 // mark all jobs as cancelled
196 var badJobIds = await databaseContext
197 .Jobs
198 .AsQueryable()
199 .Where(y => !y.StoppedAt.HasValue)
200 .Select(y => y.Id!.Value)
201 .ToListAsync(cancellationToken);
202 if (badJobIds.Count > 0)
203 {
204 logger.LogTrace("Cleaning {unfinishedJobCount} unfinished jobs...", badJobIds.Count);
205 foreach (var badJobId in badJobIds)
206 {
207 var job = new Job(badJobId);
208 databaseContext.Jobs.Attach(job);
209 job.Cancelled = true;
210 job.StoppedAt = DateTimeOffset.UtcNow;
211 }
212
213 await databaseContext.Save(cancellationToken);
214 }
215
216 noMoreJobsShouldStart = false;
217 })
218 .AsTask();
219
221 public Task StopAsync(CancellationToken cancellationToken)
222 {
223 List<ValueTask<Job?>> joinTasks;
224 lock (addCancelLock)
226 {
228 joinTasks = jobs.Select(x => CancelJob(
229 new Job(x.Key),
230 null,
231 true,
232 cancellationToken))
233 .ToList();
234 }
235
236 return ValueTaskExtensions.WhenAll(joinTasks).AsTask();
237 }
238
240 public async ValueTask<Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
241 {
242 ArgumentNullException.ThrowIfNull(job);
243
244 var jid = job.Require(x => x.Id);
245 JobHandler? handler;
246 lock (addCancelLock)
247 {
249 if (!jobs.TryGetValue(jid, out handler))
250 return null;
251
252 logger.LogDebug("Cancelling job ID {jobId}...", jid);
253 handler.Cancel(); // this will ensure the db update is only done once
254 }
255
256 await databaseContextFactory.UseContext(async databaseContext =>
257 {
258 var updatedJob = new Job(jid);
259 databaseContext.Jobs.Attach(updatedJob);
260 var attachedUser = user == null
261 ? await databaseContext
262 .Users
263 .GetTgsUser(
264 dbUser => new User
265 {
266 Id = dbUser.Id!.Value,
267 },
268 cancellationToken)
269 : new User
270 {
271 Id = user.Require(x => x.Id),
272 };
273
274 databaseContext.Users.Attach(attachedUser);
275 updatedJob.CancelledBy = attachedUser;
276
277 // let either startup or cancellation set job.cancelled
278 await databaseContext.Save(cancellationToken);
279 job.CancelledBy = user;
280 });
281
282 if (blocking)
283 {
284 logger.LogTrace("Waiting on cancelled job #{jobId}...", job.Id);
285 await handler.Wait(cancellationToken);
286 logger.LogTrace("Done waiting on job #{jobId}...", job.Id);
287 }
288
289 return job;
290 }
291
293 public void SetJobProgress(JobResponse apiResponse)
294 {
295 ArgumentNullException.ThrowIfNull(apiResponse);
297 {
298 if (!jobs.TryGetValue(apiResponse.Require(x => x.Id), out var handler))
299 return;
300 apiResponse.Progress = handler.Progress;
301 apiResponse.Stage = handler.Stage;
302 }
303 }
304
306 public async ValueTask<bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
307 {
308 ArgumentNullException.ThrowIfNull(job);
309
310 if (!cancellationToken.CanBeCanceled)
311 throw new ArgumentException("A cancellable CancellationToken should be provided!", nameof(cancellationToken));
312
313 JobHandler? handler;
316 {
317 if (!jobs.TryGetValue(job.Require(x => x.Id), out handler))
318 return null;
319
321 }
322
323 if (noMoreJobsShouldStart && !handler.Started)
324 await Extensions.TaskExtensions.InfiniteTask.WaitAsync(cancellationToken);
325
326 var cancelTask = ValueTask.FromResult<Job?>(null);
327 bool result;
328 using (jobCancellationToken.Register(() => cancelTask = CancelJob(job, canceller, true, cancellationToken)))
329 result = await handler.Wait(cancellationToken);
330
331 await cancelTask;
332
333 return result;
334 }
335
337 public void Activate(IInstanceCoreProvider instanceCoreProvider)
338 {
339 ArgumentNullException.ThrowIfNull(instanceCoreProvider);
340
341 logger.LogTrace("Activating job manager...");
342 activationTcs.SetResult(instanceCoreProvider);
343 }
344
347 {
348 lock (hubUpdateActions)
349 foreach (var action in hubUpdateActions.Values)
350 action();
351 }
352
360#pragma warning disable CA1506 // TODO: Decomplexify
361 async Task<bool> RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
362#pragma warning restore CA1506
363 {
364 var jid = job.Require(x => x.Id);
365 using (LogContext.PushProperty(SerilogContextHelper.JobIdContextProperty, jid))
366 try
367 {
368 void LogException(Exception ex) => logger.LogDebug(ex, "Job {jobId} exited with error!", jid);
369
370 var hubUpdatesTask = Task.CompletedTask;
371 var result = false;
372 var firstLogHappened = false;
373 var hubGroupName = JobsHub.HubGroupName(job);
374
375 Stopwatch? stopwatch = null;
376 void QueueHubUpdate(JobResponse update, bool final)
377 {
378 void NextUpdate(bool bypassRate)
379 {
380 var currentUpdatesTask = hubUpdatesTask;
381 async Task ChainHubUpdate()
382 {
383 await currentUpdatesTask;
384
385 if (!firstLogHappened)
386 {
387 logger.LogTrace("Sending updates for job {id} to hub group {group}", jid, hubGroupName);
388 firstLogHappened = true;
389 }
390
391 // DCT: Cancellation token is for job, operation should always run
392 await hub
393 .Clients
394 .Group(hubGroupName)
395 .ReceiveJobUpdate(update, CancellationToken.None);
396 }
397
398 Stopwatch? enteredLock = null;
399 try
400 {
401 if (!bypassRate && stopwatch != null)
402 {
403 Monitor.Enter(stopwatch);
404 enteredLock = stopwatch;
405 if (stopwatch.ElapsedMilliseconds * MaxHubUpdatesPerSecond < 1)
406 return; // don't spam client
407 }
408
409 hubUpdatesTask = ChainHubUpdate();
410 stopwatch = Stopwatch.StartNew();
411 }
412 finally
413 {
414 if (enteredLock != null)
415 Monitor.Exit(enteredLock);
416 }
417 }
418
419 lock (hubUpdateActions)
420 if (final)
421 hubUpdateActions.Remove(jid);
422 else
423 hubUpdateActions[jid] = () => NextUpdate(true);
424
425 NextUpdate(false);
426 }
427
428 try
429 {
430 void UpdateProgress(string? stage, double? progress)
431 {
432 if (progress.HasValue
433 && (progress.Value < 0 || progress.Value > 1))
434 {
435 var exception = new ArgumentOutOfRangeException(nameof(progress), progress, "Progress must be a value from 0-1!");
436 logger.LogError(exception, "Invalid progress value!");
437 return;
438 }
439
440 int? newProgress = progress.HasValue ? (int)Math.Floor(progress.Value * 100) : null;
442 if (jobs.TryGetValue(jid, out var handler))
443 {
444 handler.Stage = stage;
445 handler.Progress = newProgress;
446
447 var updatedJob = job.ToApi();
448 updatedJob.Stage = stage;
449 updatedJob.Progress = newProgress;
450 QueueHubUpdate(updatedJob, false);
451 }
452 }
453
454 var activationTask = activationTcs.Task;
455
456 Debug.Assert(activationTask.IsCompleted || job.Require(x => x.JobCode).IsServerStartupJob(), "Non-server startup job registered before activation!");
457
458 var instanceCoreProvider = await activationTask.WaitAsync(cancellationToken);
459
460 QueueHubUpdate(job.ToApi(), false);
461
462 logger.LogTrace("Starting job...");
463 using var progressReporter = new JobProgressReporter(
464 loggerFactory.CreateLogger<JobProgressReporter>(),
465 null,
466 UpdateProgress);
467 using var innerReporter = progressReporter.CreateSection(null, 1.0);
468 await operation(
469 instanceCoreProvider.GetInstance(job.Instance!),
471 job,
472 innerReporter,
473 cancellationToken);
474
475 logger.LogDebug("Job {jobId} completed!", job.Id);
476 result = true;
477 }
478 catch (OperationCanceledException ex)
479 {
480 logger.LogDebug(ex, "Job {jobId} cancelled!", job.Id);
481 job.Cancelled = true;
482 }
483 catch (JobException e)
484 {
485 job.ErrorCode = e.ErrorCode;
486 job.ExceptionDetails = String.IsNullOrWhiteSpace(e.Message) ? e.InnerException?.Message : e.Message + $" (Inner exception: {e.InnerException?.Message})";
487 LogException(e);
488 }
489 catch (Exception e)
490 {
491 job.ExceptionDetails = e.ToString();
492 LogException(e);
493 }
494
495 try
496 {
497 await databaseContextFactory.UseContext(async databaseContext =>
498 {
499 var attachedJob = new Job(jid);
500
501 databaseContext.Jobs.Attach(attachedJob);
502 attachedJob.StoppedAt = DateTimeOffset.UtcNow;
503 attachedJob.ExceptionDetails = job.ExceptionDetails;
504 attachedJob.ErrorCode = job.ErrorCode;
505 attachedJob.Cancelled = job.Cancelled;
506
507 // DCT: Cancellation token is for job, operation should always run
508 await databaseContext.Save(CancellationToken.None);
509 });
510
511 // Resetting the context here because I CBA to worry if the cache is being used
512 await databaseContextFactory.UseContext(async databaseContext =>
513 {
514 // Cancellation might be set in another async context
515 // Also, startedby could have been renamed
516 // forced to reload here for the final hub update
517 // DCT: Cancellation token is for job, operation should always run
518 var finalJob = await databaseContext
519 .Jobs
520 .AsQueryable()
521 .Include(x => x.Instance)
522 .Include(x => x.StartedBy)
523 .Include(x => x.CancelledBy)
524 .Where(dbJob => dbJob.Id == jid)
525 .FirstAsync(CancellationToken.None);
526 QueueHubUpdate(finalJob.ToApi(), true);
527 });
528 }
529 catch
530 {
531 lock (hubUpdateActions)
532 hubUpdateActions.Remove(jid);
533
534 throw;
535 }
536
537 try
538 {
539 await hubUpdatesTask;
540 }
541 catch (Exception ex)
542 {
543 logger.LogError(ex, "Error in hub updates chain task!");
544 }
545
546 return result;
547 }
548 finally
549 {
551 {
552 var handler = jobs[jid];
553 jobs.Remove(jid);
554 handler.Dispose();
555 }
556 }
557 }
558 }
559}
virtual ? long Id
The ID of the entity.
Definition EntityId.cs:13
string? Description
English description of the Job.
Definition Job.cs:24
ErrorCode? ErrorCode
The Models.ErrorCode associated with the Job if any.
Definition Job.cs:30
bool? Cancelled
If the Job was cancelled.
Definition Job.cs:54
string? ExceptionDetails
Details of any exceptions caught during the Job.
Definition Job.cs:36
Represents a long running job on the server. Model is read-only, updates attempt to cancel the job.
Definition JobResponse.cs:7
Extension methods for the ValueTask and ValueTask<TResult> classes.
static async ValueTask WhenAll(IEnumerable< ValueTask > tasks)
Fully await a given list of tasks .
Operation exceptions thrown from the context of a Models.Job.
ErrorCode? ErrorCode
The Api.Models.ErrorCode associated with the JobException.
Class for pairing Tasks with CancellationTokenSources.
Definition JobHandler.cs:11
Task< bool > Wait(CancellationToken cancellationToken)
Wait for task to complete.
Definition JobHandler.cs:60
bool Started
If the job has started.
Definition JobHandler.cs:15
readonly IConnectionMappedHubContext< JobsHub, IJobsHub > hub
The IHubContext for the JobsHub.
Definition JobService.cs:38
async Task< bool > RunJob(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Runner for JobHandlers.
readonly ILogger< JobService > logger
The ILogger for the JobService.
Definition JobService.cs:53
JobService(IConnectionMappedHubContext< JobsHub, IJobsHub > hub, IDatabaseContextFactory databaseContextFactory, ILoggerFactory loggerFactory, ILogger< JobService > logger)
Initializes a new instance of the JobService class.
Definition JobService.cs:92
readonly object synchronizationLock
lock object for various operations.
Definition JobService.cs:73
void Activate(IInstanceCoreProvider instanceCoreProvider)
Activate the IJobManager.
const int MaxHubUpdatesPerSecond
The maximum rate at which hub clients can receive updates.
Definition JobService.cs:33
Task StopAsync(CancellationToken cancellationToken)
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the JobService.
Definition JobService.cs:48
void QueueActiveJobUpdates()
Queue a message to be sent to all clients with the current state of active jobs.
readonly TaskCompletionSource< IInstanceCoreProvider > activationTcs
TaskCompletionSource<TResult> to delay starting jobs until the server is ready.
Definition JobService.cs:68
readonly object addCancelLock
Prevents a really REALLY rare race condition between add and cancel operations.
Definition JobService.cs:78
readonly Dictionary< long, Action > hubUpdateActions
Dictionary<TKey, TValue> of running Job Api.Models.EntityId.Ids to Actions that will push immediate h...
Definition JobService.cs:63
readonly IDatabaseContextFactory databaseContextFactory
The IServiceProvider for the JobService.
Definition JobService.cs:43
Task StartAsync(CancellationToken cancellationToken)
async ValueTask< Job?> CancelJob(Job job, User? user, bool blocking, CancellationToken cancellationToken)
Cancels a give job .A ValueTask<TResult> resulting in the updated job if it was cancelled,...
volatile bool noMoreJobsShouldStart
Prevents jobs that are registered after shutdown from activating.
Definition JobService.cs:83
async ValueTask RegisterOperation(Job job, JobEntrypoint operation, CancellationToken cancellationToken)
Registers a given Job and begins running it.A ValueTask representing a running operation.
void SetJobProgress(JobResponse apiResponse)
Set the JobResponse.Progress and JobResponse.Stage for a given apiResponse .
readonly Dictionary< long, JobHandler > jobs
Dictionary<TKey, TValue> of Job Api.Models.EntityId.Ids to running JobHandlers.
Definition JobService.cs:58
async ValueTask< bool?> WaitForJobCompletion(Job job, User? canceller, CancellationToken jobCancellationToken, CancellationToken cancellationToken)
Wait for a given job to complete.A ValueTask<TResult> representing the Job. Results in true if the J...
A SignalR Hub for pushing job updates.
Definition JobsHub.cs:16
static string HubGroupName(long instanceId)
Get the group name for a given instanceId .
Represents an Api.Models.Instance in the database.
Definition Instance.cs:11
User? StartedBy
See JobResponse.StartedBy.
Definition Job.cs:21
Instance? Instance
The Models.Instance the job belongs to if any.
Definition Job.cs:32
Helpers for manipulating the Serilog.Context.LogContext.
const string JobIdContextProperty
The Serilog.Context.LogContext property name for Models.Job Api.Models.EntityId.Ids.
Factory for scoping usage of IDatabaseContexts. Meant for use by Components.
ValueTask UseContext(Func< IDatabaseContext, ValueTask > operation)
Run an operation in the scope of an IDatabaseContext.
The service that manages everything to do with jobs.
Definition IJobService.cs:9
Allows manually triggering jobs hub updates.
A IHubContext<THub> that maps Users to their connection IDs.
delegate ValueTask JobEntrypoint(IInstanceCore? instance, IDatabaseContextFactory databaseContextFactory, Job job, JobProgressReporter progressReporter, CancellationToken cancellationToken)
Entrypoint for running a given job.