tgstation-server 6.12.0
The /tg/station 13 server suite
Loading...
Searching...
No Matches
BufferedFileStreamProvider.cs
Go to the documentation of this file.
1using System;
2using System.IO;
3using System.Threading;
4using System.Threading.Tasks;
5
7
9{
14 {
16 public bool Disposed => buffer == null;
17
21 readonly Stream input;
22
26 readonly SemaphoreSlim semaphore;
27
31 volatile MemoryStream? buffer;
32
36 volatile bool buffered;
37
43 {
44 this.input = input ?? throw new ArgumentNullException(nameof(input));
45
46 semaphore = new SemaphoreSlim(1);
47 try
48 {
49 buffer = new MemoryStream();
50 }
51 catch
52 {
53 semaphore.Dispose();
54 throw;
55 }
56 }
57
59 public async ValueTask DisposeAsync()
60 {
61 MemoryStream? localBuffer;
62 lock (semaphore)
63 {
64 localBuffer = buffer;
65 if (localBuffer == null)
66 return;
67
68 // important to drop the reference so it can properly GC
69 // The implementation of MemoryStream doesn't fucking do this for some reason
70 buffer = null;
71 buffered = true;
72 }
73
74 var bufferDispose = localBuffer.DisposeAsync();
75 semaphore.Dispose();
76 await bufferDispose;
77 }
78
80 public async ValueTask<Stream> GetResult(CancellationToken cancellationToken)
81 {
82 var (sharedStream, _) = await GetResultInternal(cancellationToken);
83 return sharedStream;
84 }
85
87 public async ValueTask<MemoryStream> GetOwnedResult(CancellationToken cancellationToken)
88 {
89 var (sharedStream, length) = await GetResultInternal(cancellationToken);
90 return new MemoryStream(sharedStream.GetBuffer(), 0, (int)length, false, true);
91 }
92
98 public Task EnsureBuffered(CancellationToken cancellationToken) => GetResultInternal(cancellationToken).AsTask();
99
105 async ValueTask<(MemoryStream Stream, long StreamLength)> GetResultInternal(CancellationToken cancellationToken)
106 {
107 if (!buffered)
108 using (await SemaphoreSlimContext.Lock(semaphore, cancellationToken))
109 if (!buffered)
110 {
111 MemoryStream localBuffer;
112 lock (semaphore)
113 localBuffer = buffer ?? throw new ObjectDisposedException(nameof(BufferedFileStreamProvider));
114
115 await input.CopyToAsync(localBuffer, cancellationToken);
116 localBuffer.Seek(0, SeekOrigin.Begin);
117 buffered = true;
118 return (Stream: localBuffer, StreamLength: localBuffer.Length);
119 }
120
121 lock (semaphore)
122 {
123 var localBuffer = buffer ?? throw new ObjectDisposedException(nameof(BufferedFileStreamProvider));
124 return (
125 Stream: localBuffer,
126 StreamLength: localBuffer.Length);
127 }
128 }
129 }
130}
IFileStreamProvider that provides a ISeekableFileStreamProvider from an input Stream.
volatile bool buffered
If buffer has been populated.
async ValueTask< MemoryStream > GetOwnedResult(CancellationToken cancellationToken)
Gets the provided MemoryStream. May be called multiple times, though cancelling any may cause all cal...
async ValueTask<(MemoryStream Stream, long StreamLength)> GetResultInternal(CancellationToken cancellationToken)
Gets the shared MemoryStream and its Stream.Length.
bool Disposed
If the ISeekableFileStreamProvider has had global::System.IAsyncDisposable.DisposeAsync called on it.
async ValueTask< Stream > GetResult(CancellationToken cancellationToken)
Gets the provided Stream. May be called multiple times, though cancelling any may cause all calls to ...
readonly SemaphoreSlim semaphore
The SemaphoreSlim used to synchronize writes to buffer.
readonly Stream input
The original input Stream must remain valid for the lifetime of the BufferedFileStreamProvider or unt...
BufferedFileStreamProvider(Stream input)
Initializes a new instance of the BufferedFileStreamProvider class.
Task EnsureBuffered(CancellationToken cancellationToken)
Ensures the input Stream has been copied to the buffer.
static async ValueTask< SemaphoreSlimContext > Lock(SemaphoreSlim semaphore, CancellationToken cancellationToken, ILogger? logger=null)
Asyncronously locks a semaphore .
IFileStreamProvider that provides MemoryStreams.