diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs b/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs index 3768370..892bd64 100644 --- a/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs +++ b/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs @@ -12,8 +12,10 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using Elastic.Transport.Diagnostics; using Elastic.Transport.Extensions; + using static Elastic.Transport.ResponseBuilderDefaults; namespace Elastic.Transport; @@ -26,7 +28,6 @@ internal static class ResponseBuilderDefaults { typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse) }; - } /// @@ -225,7 +226,7 @@ private async ValueTask SetBodyCoreAsync(bool isAsync, using (responseStream) { - if (SetSpecialTypes(mimeType, bytes, requestData.MemoryStreamFactory, out var r)) return r; + if (SetSpecialTypes(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r; if (details.HttpStatusCode.HasValue && requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value)) @@ -288,7 +289,7 @@ private async ValueTask SetBodyCoreAsync(bool isAsync, } } - private static bool SetSpecialTypes(string mimeType, byte[] bytes, + private static bool SetSpecialTypes(string mimeType, byte[] bytes, Stream responseStream, MemoryStreamFactory memoryStreamFactory, out TResponse cs) where TResponse : TransportResponse, new() { @@ -298,6 +299,8 @@ private static bool SetSpecialTypes(string mimeType, byte[] bytes, if (responseType == typeof(StringResponse)) cs = new StringResponse(bytes.Utf8String()) as TResponse; + else if (responseType == typeof(StreamResponse)) + cs = new StreamResponse(responseStream, mimeType) as TResponse; else if (responseType == typeof(BytesResponse)) cs = new BytesResponse(bytes) as TResponse; else if (responseType == typeof(VoidResponse)) diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs index 4e563d6..a466b78 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs @@ -118,6 +118,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult(); #endif + receive = responseMessage; statusCode = (int)responseMessage.StatusCode; } @@ -152,8 +153,11 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req { ex = e; } - using (receive) - using (responseStream ??= Stream.Null) + + var isStreamResponse = typeof(TResponse) == typeof(StreamResponse); + + using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receive) + using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null) { TResponse response; @@ -165,6 +169,10 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse (requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats); + // Defer disposal of the response message + if (response is StreamResponse sr) + sr.Finalizer = () => receive.Dispose(); + if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false))) return response; diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs new file mode 100644 index 0000000..5781f53 --- /dev/null +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.IO; + +namespace Elastic.Transport; + +/// +/// A response that exposes the response as . +/// +/// Must be disposed after use. +/// +/// +public sealed class StreamResponse : + TransportResponse, + IDisposable +{ + internal Action? Finalizer { get; set; } + + /// + /// The MIME type of the response, if present. + /// + public string MimeType { get; } + + /// + public StreamResponse(Stream body, string? mimeType) + { + Body = body; + MimeType = mimeType ?? string.Empty; + } + + /// + public void Dispose() + { + Body.Dispose(); + Finalizer?.Invoke(); + } +}