tgstation-server 6.12.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
ProcessExecutor.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Generic;
3using System.Diagnostics;
4using System.IO;
5using System.Linq;
6using System.Text;
7using System.Threading;
8using System.Threading.Channels;
9using System.Threading.Tasks;
10
11using Microsoft.Extensions.Logging;
12
14
16{
19 {
23 static readonly ReaderWriterLockSlim ExclusiveProcessLaunchLock = new();
24
29
34
38 readonly ILogger<ProcessExecutor> logger;
39
43 readonly ILoggerFactory loggerFactory;
44
49 public static void WithProcessLaunchExclusivity(Action action)
50 {
51 ExclusiveProcessLaunchLock.EnterWriteLock();
52 try
53 {
54 action();
55 }
56 finally
57 {
58 ExclusiveProcessLaunchLock.ExitWriteLock();
59 }
60 }
61
72 ILogger<ProcessExecutor> logger,
73 ILoggerFactory loggerFactory)
74 {
75 this.processFeatures = processFeatures ?? throw new ArgumentNullException(nameof(processFeatures));
76 this.ioManager = ioManager ?? throw new ArgumentNullException(nameof(ioManager));
77 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
78 this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
79 }
80
82 public IProcess? GetProcess(int id)
83 {
84 logger.LogDebug("Attaching to process {pid}...", id);
85 global::System.Diagnostics.Process handle;
86 try
87 {
88 handle = global::System.Diagnostics.Process.GetProcessById(id);
89 }
90 catch (Exception e)
91 {
92 logger.LogDebug(e, "Unable to get process {pid}!", id);
93 return null;
94 }
95
96 return CreateFromExistingHandle(handle);
97 }
98
101 {
102 logger.LogTrace("Getting current process...");
103 var handle = global::System.Diagnostics.Process.GetCurrentProcess();
104 return CreateFromExistingHandle(handle);
105 }
106
108 public async ValueTask<IProcess> LaunchProcess(
109 string fileName,
110 string workingDirectory,
111 string arguments,
112 CancellationToken cancellationToken,
113 IReadOnlyDictionary<string, string>? environment,
114 string? fileRedirect,
115 bool readStandardHandles,
116 bool noShellExecute)
117 {
118 ArgumentNullException.ThrowIfNull(fileName);
119 ArgumentNullException.ThrowIfNull(workingDirectory);
120 ArgumentNullException.ThrowIfNull(arguments);
121
122 var enviromentLogLines = environment == null
123 ? String.Empty
124 : String.Concat(environment.Select(kvp => $"{Environment.NewLine}\t- {kvp.Key}={kvp.Value}"));
125 if (noShellExecute)
126 logger.LogDebug(
127 "Launching process in {workingDirectory}: {exe} {arguments}{environment}",
128 workingDirectory,
129 fileName,
130 arguments,
131 enviromentLogLines);
132 else
133 logger.LogDebug(
134 "Shell launching process in {workingDirectory}: {exe} {arguments}{environment}",
135 workingDirectory,
136 fileName,
137 arguments,
138 enviromentLogLines);
139
140 var handle = new global::System.Diagnostics.Process();
141 try
142 {
143 handle.StartInfo.FileName = fileName;
144 handle.StartInfo.Arguments = arguments;
145 if (environment != null)
146 foreach (var kvp in environment)
147 handle.StartInfo.Environment.Add(kvp!);
148
149 handle.StartInfo.WorkingDirectory = workingDirectory;
150
151 handle.StartInfo.UseShellExecute = !noShellExecute;
152
153 Task<string?>? readTask = null;
154 CancellationTokenSource? disposeCts = null;
155 try
156 {
157 TaskCompletionSource<int>? processStartTcs = null;
158 if (readStandardHandles)
159 {
160 processStartTcs = new TaskCompletionSource<int>();
161 disposeCts = new CancellationTokenSource();
162 readTask = ConsumeReaders(handle, processStartTcs.Task, fileRedirect, disposeCts.Token);
163 }
164
165 int pid;
166 try
167 {
168 ExclusiveProcessLaunchLock.EnterReadLock();
169 try
170 {
171 handle.Start();
172 }
173 finally
174 {
175 ExclusiveProcessLaunchLock.ExitReadLock();
176 }
177
178 try
179 {
180 pid = await processFeatures.HandleProcessStart(handle, cancellationToken);
181 }
182 catch
183 {
184 handle.Kill();
185 throw;
186 }
187
188 processStartTcs?.SetResult(pid);
189 }
190 catch (Exception ex)
191 {
192 processStartTcs?.SetException(ex);
193 throw;
194 }
195
196 var process = new Process(
198 handle,
199 disposeCts,
200 readTask,
201 loggerFactory.CreateLogger<Process>(),
202 false);
203
204 return process;
205 }
206 catch
207 {
208 disposeCts?.Dispose();
209 throw;
210 }
211 }
212 catch
213 {
214 handle.Dispose();
215 throw;
216 }
217 }
218
220 public IProcess? GetProcessByName(string name)
221 {
222 logger.LogTrace("GetProcessByName: {processName}...", name ?? throw new ArgumentNullException(nameof(name)));
223 var procs = global::System.Diagnostics.Process.GetProcessesByName(name);
224 global::System.Diagnostics.Process? handle = null;
225 foreach (var proc in procs)
226 if (handle == null)
227 handle = proc;
228 else
229 {
230 logger.LogTrace("Disposing extra found PID: {pid}...", proc.Id);
231 proc.Dispose();
232 }
233
234 if (handle == null)
235 return null;
236
237 return CreateFromExistingHandle(handle);
238 }
239
248 async Task<string?> ConsumeReaders(global::System.Diagnostics.Process handle, Task<int> startupAndPid, string? fileRedirect, CancellationToken cancellationToken)
249 {
250 handle.StartInfo.RedirectStandardOutput = true;
251 handle.StartInfo.RedirectStandardError = true;
252
253 bool writingToFile;
254 await using var fileStream = (writingToFile = fileRedirect != null) ? ioManager.CreateAsyncSequentialWriteStream(fileRedirect!) : null;
255 await using var fileWriter = fileStream != null ? new StreamWriter(fileStream) : null;
256
257 var stringBuilder = fileStream == null ? new StringBuilder() : null;
258
259 var dataChannel = Channel.CreateUnbounded<string>(
260 new UnboundedChannelOptions
261 {
262 AllowSynchronousContinuations = !writingToFile,
263 SingleReader = true,
264 SingleWriter = false,
265 });
266
267 var handlesOpen = 2;
268 async void DataReceivedHandler(object sender, DataReceivedEventArgs eventArgs)
269 {
270 var line = eventArgs.Data;
271 if (line == null)
272 {
273 var handlesRemaining = Interlocked.Decrement(ref handlesOpen);
274 if (handlesRemaining == 0)
275 dataChannel.Writer.Complete();
276
277 return;
278 }
279
280 try
281 {
282 await dataChannel.Writer.WriteAsync(line, cancellationToken);
283 }
284 catch (OperationCanceledException ex)
285 {
286 logger.LogWarning(ex, "Handle channel write interrupted!");
287 }
288 }
289
290 handle.OutputDataReceived += DataReceivedHandler;
291 handle.ErrorDataReceived += DataReceivedHandler;
292
293 async ValueTask OutputWriter()
294 {
295 var enumerable = dataChannel.Reader.ReadAllAsync(cancellationToken);
296 if (writingToFile)
297 {
298 var enumerator = enumerable.GetAsyncEnumerator(cancellationToken);
299 var nextEnumeration = enumerator.MoveNextAsync();
300 while (await nextEnumeration)
301 {
302 var text = enumerator.Current;
303 nextEnumeration = enumerator.MoveNextAsync();
304 await fileWriter!.WriteLineAsync(text.AsMemory(), cancellationToken);
305
306 if (!nextEnumeration.IsCompleted)
307 await fileWriter.FlushAsync(cancellationToken);
308 }
309 }
310 else
311 await foreach (var text in enumerable)
312 stringBuilder!.AppendLine(text);
313 }
314
315 var pid = await startupAndPid;
316 logger.LogTrace("Starting read for PID {pid}...", pid);
317
318 using (cancellationToken.Register(() => dataChannel.Writer.TryComplete()))
319 {
320 handle.BeginOutputReadLine();
321 using (cancellationToken.Register(handle.CancelOutputRead))
322 {
323 handle.BeginErrorReadLine();
324 using (cancellationToken.Register(handle.CancelErrorRead))
325 {
326 try
327 {
328 await OutputWriter();
329
330 logger.LogTrace("Finished read for PID {pid}", pid);
331 }
332 catch (OperationCanceledException ex)
333 {
334 logger.LogWarning(ex, "PID {pid} stream reading interrupted!", pid);
335 if (writingToFile)
336 await fileWriter!.WriteLineAsync("-- Process detached, log truncated. This is likely due a to TGS restart --");
337 }
338 }
339 }
340 }
341
342 return stringBuilder?.ToString();
343 }
344
350 Process CreateFromExistingHandle(global::System.Diagnostics.Process handle)
351 {
352 try
353 {
354 var pid = handle.Id;
355 return new Process(
357 handle,
358 null,
359 null,
360 loggerFactory.CreateLogger<Process>(),
361 true);
362 }
363 catch
364 {
365 handle.Dispose();
366 throw;
367 }
368 }
369 }
370}
async ValueTask< IProcess > LaunchProcess(string fileName, string workingDirectory, string arguments, CancellationToken cancellationToken, IReadOnlyDictionary< string, string >? environment, string? fileRedirect, bool readStandardHandles, bool noShellExecute)
Launch a IProcess.A ValueTask<TResult> resulting in the new IProcess.
async Task< string?> ConsumeReaders(global::System.Diagnostics.Process handle, Task< int > startupAndPid, string? fileRedirect, CancellationToken cancellationToken)
Consume the stdout/stderr streams into a Task.
Process CreateFromExistingHandle(global::System.Diagnostics.Process handle)
Create a IProcess given an existing handle .
readonly IProcessFeatures processFeatures
The IProcessFeatures for the ProcessExecutor.
readonly IIOManager ioManager
The IIOManager for the ProcessExecutor.
static void WithProcessLaunchExclusivity(Action action)
Runs a given action making sure to not launch any processes while its running.
IProcess? GetProcess(int id)
Get a IProcess by id .The IProcess represented by id on success, null on failure.
IProcess? GetProcessByName(string name)
Get a IProcess with a given name .The IProcess represented by name on success, null on failure.
readonly ILogger< ProcessExecutor > logger
The ILogger for the ProcessExecutor.
IProcess GetCurrentProcess()
Get a IProcess representing the running executable.The current IProcess.
readonly ILoggerFactory loggerFactory
The ILoggerFactory for the ProcessExecutor.
ProcessExecutor(IProcessFeatures processFeatures, IIOManager ioManager, ILogger< ProcessExecutor > logger, ILoggerFactory loggerFactory)
Initializes a new instance of the ProcessExecutor class.
static readonly ReaderWriterLockSlim ExclusiveProcessLaunchLock
ReaderWriterLockSlim for WithProcessLaunchExclusivity(Action).
Interface for using filesystems.
Definition IIOManager.cs:13
FileStream CreateAsyncSequentialWriteStream(string path)
Creates an asynchronous FileStream for sequential writing.
Abstraction for suspending and resuming processes.
ValueTask< int > HandleProcessStart(global::System.Diagnostics.Process process, CancellationToken cancellationToken)
Run events on starting a process.
Abstraction over a global::System.Diagnostics.Process.
Definition IProcess.cs:11