68 Func<TCommunication, CancellationToken, ValueTask<TResponse?>> completionCallback,
69 Func<string, TResponse?> chunkErrorCallback,
71 CancellationToken cancellationToken)
72 where TCommunication :
class
76 return chunkErrorCallback(
"Missing chunk!");
78 if (!chunk.PayloadId.HasValue)
79 return chunkErrorCallback(
"Missing chunk payloadId!");
81 if (!chunk.SequenceId.HasValue)
82 return chunkErrorCallback(
"Missing chunk sequenceId!");
84 if (chunk.Payload ==
null)
85 return chunkErrorCallback(
"Missing chunk payload!");
92 if (!
chunkSets.TryGetValue(chunk.PayloadId.Value, out var tuple))
95 if (chunk.TotalChunks == 0)
96 return chunkErrorCallback(
"Receieved chunked request with 0 totalChunks!");
98 tuple = Tuple.Create<
ChunkSetInfo,
string[]>(chunk,
new string[chunk.TotalChunks]);
99 chunkSets.Add(chunk.PayloadId.Value, tuple);
102 requestInfo = tuple.Item1;
103 payloads = tuple.Item2;
105 Logger.LogTrace(
"Received chunk P{payloadId}: {sequenceId}/{totalChunks}", requestInfo.
PayloadId, chunk.SequenceId + 1, requestInfo.
TotalChunks);
110 return chunkErrorCallback(
"Received differing total chunks for same payloadId! Invalidating payloadId!");
113 if (payloads[chunk.SequenceId.Value] !=
null && payloads[chunk.SequenceId.Value] != chunk.Payload)
116 return chunkErrorCallback(
"Received differing payload for same sequenceId! Invalidating payloadId!");
119 payloads[chunk.SequenceId.Value] = chunk.Payload;
120 var missingPayloads =
new List<uint>();
121 for (var i = 0U; i < payloads.Length; ++i)
122 if (payloads[i] ==
null)
123 missingPayloads.Add(i);
125 if (missingPayloads.Count > 0)
128 MissingChunks = missingPayloads,
131 Logger.LogTrace(
"Received all chunks for P{payloadId}, processing request...", requestInfo.
PayloadId);
135 TCommunication? completedCommunication;
136 var fullCommunicationJson = String.Concat(payloads);
143 Logger.LogDebug(ex,
"Bad chunked communication for payload {payloadId}!", requestInfo.
PayloadId);
144 completedCommunication =
null;
147 if (completedCommunication ==
null)
148 return chunkErrorCallback(
"Chunked request completed with bad JSON!");
150 return await completionCallback(completedCommunication, cancellationToken);