Skip to content

Commit

Permalink
Ensure we eagerly return the outbound array to the array pool (#52)
Browse files Browse the repository at this point in the history
* Ensure we eagerly return the outbound array to the array pool

* localize using buffer to ExportBufferAsync

* license header

* remove BOM

* add Jsonpropertynames

* Add better defaults for MaxConcurrency and allow injection of CancellationToken
  • Loading branch information
Mpdreamz committed Apr 9, 2024
1 parent 6b52b7c commit 2cdc188
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 42 deletions.
7 changes: 7 additions & 0 deletions elastic-ingest-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Ingest.Elasticsearc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Performance.Common", "benchmarks\Performance.Common\Performance.Common.csproj", "{077B0564-26D1-4FF1-98A5-633EAA9BE051}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "playground", "examples\playground\playground.csproj", "{C0E73713-EA13-403C-BE43-F2164AB6CF73}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -138,6 +140,10 @@ Global
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Debug|Any CPU.Build.0 = Debug|Any CPU
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.ActiveCfg = Release|Any CPU
{077B0564-26D1-4FF1-98A5-633EAA9BE051}.Release|Any CPU.Build.0 = Release|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C0E73713-EA13-403C-BE43-F2164AB6CF73}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -159,6 +165,7 @@ Global
{1609BC01-FEA6-4818-9C86-9DB7B82FCB48} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{DE05C98F-C410-4ED0-A77B-36B66C784105} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{077B0564-26D1-4FF1-98A5-633EAA9BE051} = {1C174D50-20D2-4006-9681-FDDB39351D99}
{C0E73713-EA13-403C-BE43-F2164AB6CF73} = {B67CBB46-74C1-47EB-9E41-D55C5E0E0D85}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {87AD3256-8FD8-4C94-AB66-5ADBAC939722}
Expand Down
17 changes: 11 additions & 6 deletions examples/Elastic.Channels.Continuous/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
using Elastic.Channels;
using Elastic.Channels.Diagnostics;

var ctxs = new CancellationTokenSource();
Console.CancelKeyPress += (sender, eventArgs) => {
ctxs.Cancel();
eventArgs.Cancel = true;
};

var options = new NoopBufferedChannel.NoopChannelOptions
{
BufferOptions = new BufferOptions()
Expand All @@ -19,19 +25,18 @@

};
var channel = new DiagnosticsBufferedChannel(options);
for (long i = 0; i < long.MaxValue; i++)
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
var written = false;
var ready = await channel.WaitToWriteAsync();
//Console.Write('.');
var ready = await channel.WaitToWriteAsync(ctx);
if (ready) written = channel.TryWrite(e);
if (!written || channel.BufferMismatches > 0)
if (!written)
{
Console.WriteLine();
Console.WriteLine(channel);
Console.WriteLine(i);
Environment.Exit(1);
}

}

});
95 changes: 95 additions & 0 deletions examples/playground/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Text.Json.Serialization;
using Elastic.Channels;
using Elastic.Elasticsearch.Ephemeral;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Transport;

var random = new Random();
var ctxs = new CancellationTokenSource();
var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token };
const int numDocs = 1_000_000;
var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs, OutboundBufferMaxSize = 10_000 };
var config = new EphemeralClusterConfiguration("8.13.0");
using var cluster = new EphemeralCluster(config);
using var channel = SetupElasticsearchChannel();

Console.CancelKeyPress += (sender, eventArgs) =>
{
ctxs.Cancel();
cluster.Dispose();
eventArgs.Cancel = true;
};


using var started = cluster.Start();

var memoryBefore = GC.GetTotalMemory(false);

await PushToChannel(channel);

// This is not really indicative because the channel is still draining at this point in time
var memoryAfter = GC.GetTotalMemory(false);
Console.WriteLine($"Memory before: {memoryBefore} bytes");
Console.WriteLine($"Memory after: {memoryAfter} bytes");
var memoryUsed = memoryAfter - memoryBefore;
Console.WriteLine($"Memory used: {memoryUsed} bytes");

Console.WriteLine($"Press any key...");
Console.ReadKey();


async Task PushToChannel(DataStreamChannel<EcsDocument> c)
{
if (c == null) throw new ArgumentNullException(nameof(c));

await c.BootstrapElasticsearchAsync(BootstrapMethod.Failure);
await Parallel.ForEachAsync(Enumerable.Range(0, numDocs), parallelOpts, async (i, ctx) =>
{
await DoChannelWrite(i, ctx);
});

async Task DoChannelWrite(int i, CancellationToken cancellationToken)
{
var message = $"Logging information {i} - Random value: {random.NextDouble()}";
var doc = new EcsDocument { Timestamp = DateTimeOffset.UtcNow, Message = message };
if (await c.WaitToWriteAsync(cancellationToken) && c.TryWrite(doc))
return;

Console.WriteLine("Failed To write");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}

DataStreamChannel<EcsDocument> SetupElasticsearchChannel()
{
var transportConfiguration = new TransportConfiguration(new Uri("http://localhost:9200"));
var c = new DataStreamChannel<EcsDocument>(
new DataStreamChannelOptions<EcsDocument>(new DistributedTransport(transportConfiguration))
{
BufferOptions = bufferOptions,
CancellationToken = ctxs.Token
, ExportResponseCallback = (c, t) =>
{
var error = c.Items.Select(i=>i.Error).FirstOrDefault(e => e != null);
if (error == null) return;
Console.WriteLine(error.ToString());
}
});

return c;
}

public class EcsDocument
{
[JsonPropertyName("@timestamp")]
public DateTimeOffset Timestamp { init; get; }

[JsonPropertyName("message")]
public string Message { init; get; } = null!;
}

18 changes: 18 additions & 0 deletions examples/playground/playground.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Ingest.Elasticsearch\Elastic.Ingest.Elasticsearch.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Elasticsearch.Ephemeral" Version="0.5.0" />
</ItemGroup>

</Project>
9 changes: 7 additions & 2 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ public class BufferOptions

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
/// <para>Defaults to <c>1</c>, increase to introduce concurrency.</para>
/// <para>Defaults to the lesser of:</para>
/// <list type="bullet">
/// <item><see cref="InboundBufferMaxSize"/>/<see cref="OutboundBufferMaxSize"/></item>
/// <item>OR <see cref="Environment.ProcessorCount"/></item>
/// </list>
/// <para>, increase to introduce concurrency.</para>
/// </summary>
public int ExportMaxConcurrency { get; set; } = 1;
public int? ExportMaxConcurrency { get; set; }

/// <summary>
/// The times to retry an export if <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields items to retry.
Expand Down
74 changes: 41 additions & 33 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
{
private readonly Task _inTask;
private readonly Task _outTask;
private readonly int _maxConcurrency;
private readonly SemaphoreSlim _throttleTasks;
private readonly CountdownEvent? _signal;

Expand All @@ -74,7 +75,9 @@ protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }
/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
{
TokenSource = new CancellationTokenSource();
TokenSource = options.CancellationToken.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
: new CancellationTokenSource();
Options = options;

var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
Expand All @@ -90,10 +93,17 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency);
_throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers);
_signal = options.BufferOptions.WaitHandle;
var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize);
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut);
_maxConcurrency =
BufferOptions.ExportMaxConcurrency.HasValue
? BufferOptions.ExportMaxConcurrency.Value
: Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2);

_throttleTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
_signal = options.BufferOptions.WaitHandle;
InChannel = Channel.CreateBounded<TEvent>(new BoundedChannelOptions(maxIn)
{
SingleReader = false,
Expand All @@ -105,8 +115,6 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
});
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
new BoundedChannelOptions(maxOut)
{
Expand All @@ -123,16 +131,16 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
InboundBuffer = new InboundBuffer<TEvent>(maxOut, BufferOptions.OutboundBufferMaxLifetime);

_outTask = Task.Factory.StartNew(async () =>
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);

_inTask = Task.Factory.StartNew(async () =>
await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
}

/// <summary>
Expand All @@ -144,7 +152,9 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime)
/// <summary>The channel options currently in use</summary>
public TChannelOptions Options { get; }

private CancellationTokenSource TokenSource { get; }
/// <summary> An overall cancellation token that may be externally provided </summary>
protected CancellationTokenSource TokenSource { get; }

private Channel<IOutboundBuffer<TEvent>> OutChannel { get; }
private Channel<TEvent> InChannel { get; }
private BufferOptions BufferOptions => Options.BufferOptions;
Expand Down Expand Up @@ -173,6 +183,7 @@ public override bool TryWrite(TEvent item)
/// <inheritdoc cref="IBufferedChannel{TEvent}.WaitToWriteManyAsync"/>
public async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
var allWritten = true;
foreach (var e in events)
{
Expand All @@ -187,7 +198,7 @@ public bool TryWriteMany(IEnumerable<TEvent> events)
{
var written = true;

foreach (var @event in events)
foreach (var @event in events)
{
written = TryWrite(@event);
}
Expand Down Expand Up @@ -222,31 +233,27 @@ private async Task ConsumeOutboundEventsAsync()
{
_callbacks.OutboundChannelStartedCallback?.Invoke();

var maxConsumers = Options.BufferOptions.ExportMaxConcurrency;
var taskList = new List<Task>(maxConsumers);
var taskList = new List<Task>(_maxConcurrency);

while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
// ReSharper disable once RemoveRedundantBraces
// ReSharper disable once RemoveRedundantBraces
{
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;

while (OutChannel.Reader.TryRead(out var buffer))
{
using (buffer)
var items = buffer.GetArraySegment();
await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
taskList.Add(t);

if (taskList.Count >= _maxConcurrency)
{
var items = buffer.GetArraySegment();
await _throttleTasks.WaitAsync().ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
taskList.Add(t);

if (taskList.Count >= maxConsumers)
{
var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false);
taskList.Remove(completedTask);
}
_throttleTasks.Release();
var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false);
taskList.Remove(completedTask);
}
_throttleTasks.Release();
}
}
await Task.WhenAll(taskList).ConfigureAwait(false);
Expand All @@ -255,6 +262,7 @@ private async Task ConsumeOutboundEventsAsync()

private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer<TEvent> buffer)
{
using var outboundBuffer = buffer;
var maxRetries = Options.BufferOptions.ExportMaxRetries;
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
{
Expand All @@ -269,7 +277,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer
try
{
response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportResponseCallback?.Invoke(response, buffer);
_callbacks.ExportResponseCallback?.Invoke(response, outboundBuffer);
}
catch (Exception e)
{
Expand All @@ -280,7 +288,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer

items = response == null
? EmptyArraySegments<TEvent>.Empty
: RetryBuffer(response, items, buffer);
: RetryBuffer(response, items, outboundBuffer);
if (items.Count > 0 && i == 0)
_callbacks.ExportRetryableCountCallback?.Invoke(items.Count);

Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public abstract class ChannelOptionsBase<TEvent, TResponse> : IChannelCallbacks<
/// </summary>
public bool DisableDiagnostics { get; set; }

/// <summary> Provide an external cancellation token </summary>
public CancellationToken? CancellationToken { get; set; }

/// <summary>
/// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public virtual async Task<bool> BootstrapElasticsearchAsync(BootstrapMethod boot
{
if (bootstrapMethod == BootstrapMethod.None) return true;

ctx = ctx == default ? TokenSource.Token : ctx;

var name = TemplateName;
var match = TemplateWildcard;
if (await IndexTemplateExistsAsync(name, ctx).ConfigureAwait(false)) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
/// <inheritdoc cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.ExportAsync(Elastic.Transport.ITransport,System.ArraySegment{TEvent},System.Threading.CancellationToken)"/>
protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySegment<TEvent> page, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
#if NETSTANDARD2_1
// Option is obsolete to prevent external users to set it.
#pragma warning disable CS0618
Expand Down
Loading

0 comments on commit 2cdc188

Please sign in to comment.