Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement StreamResponse #110

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using Elastic.Transport.Diagnostics;
using Elastic.Transport.Extensions;

using static Elastic.Transport.ResponseBuilderDefaults;

namespace Elastic.Transport;
Expand All @@ -26,7 +28,6 @@ internal static class ResponseBuilderDefaults
{
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
};

}

/// <summary>
Expand Down Expand Up @@ -225,7 +226,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,

using (responseStream)
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, requestData.MemoryStreamFactory, out var r)) return r;
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
Expand Down Expand Up @@ -288,7 +289,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
}
}

private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes,
private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse cs)
where TResponse : TransportResponse, new()
{
Expand All @@ -298,6 +299,8 @@ private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes,

if (responseType == typeof(StringResponse))
cs = new StringResponse(bytes.Utf8String()) as TResponse;
else if (responseType == typeof(StreamResponse))
cs = new StreamResponse(responseStream, mimeType) as TResponse;
else if (responseType == typeof(BytesResponse))
cs = new BytesResponse(bytes) as TResponse;
else if (responseType == typeof(VoidResponse))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult();
#endif

receive = responseMessage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

statusCode = (int)responseMessage.StatusCode;
}

Expand Down Expand Up @@ -152,8 +153,11 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
{
ex = e;
}
using (receive)
using (responseStream ??= Stream.Null)

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);

using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receive)
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
TResponse response;

Expand All @@ -165,6 +169,10 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Defer disposal of the response message
if (response is StreamResponse sr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugly but i think its better to pass receive (as IDisposable) to ResponseBuilder.ToResponse. To ensure we inject to to StreamResponse's constructor.

If an exception happens in ToResponse() we might never set the finalizer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine for me as well! The whole thing is a little bit "spaghetti" unfortunately due to the fact that the HttpRequestInvoker must know about StreamResponse etc. Will change this part later 🙂

sr.Finalizer = () => receive.Dispose();

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;

Expand Down
40 changes: 40 additions & 0 deletions src/Elastic.Transport/Responses/Special/StreamResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.IO;

namespace Elastic.Transport;

/// <summary>
/// A response that exposes the response <see cref="TransportResponse{T}.Body"/> as <see cref="Stream"/>.
/// <para>
/// Must be disposed after use.
/// </para>
/// </summary>
public sealed class StreamResponse :
TransportResponse<Stream>,
IDisposable
{
internal Action? Finalizer { get; set; }

/// <summary>
/// The MIME type of the response, if present.
/// </summary>
public string MimeType { get; }

/// <inheritdoc cref="StreamResponse"/>
public StreamResponse(Stream body, string? mimeType)
{
Body = body;
MimeType = mimeType ?? string.Empty;
}

/// <inheritdoc cref="IDisposable.Dispose"/>
public void Dispose()
{
Body.Dispose();
Finalizer?.Invoke();
}
}
Loading