Skip to content

Commit

Permalink
[🍒 7325] Fix tracer freeze when CI Visibility is enabled (#7335)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog committed Jul 17, 2024
1 parent 1914f03 commit 75639e3
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public static class DDAgentWriterBuilder {
Monitoring monitoring = Monitoring.DISABLED;
boolean traceAgentV05Enabled = Config.get().isTraceAgentV05Enabled();
boolean metricsReportingEnabled = Config.get().isTracerMetricsEnabled();
private int flushTimeout = 1;
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
boolean alwaysFlush = false;

private DDAgentApi agentApi;
Expand Down Expand Up @@ -116,6 +118,12 @@ public DDAgentWriterBuilder featureDiscovery(DDAgentFeaturesDiscovery featureDis
return this;
}

public DDAgentWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
this.flushTimeout = flushTimeout;
this.flushTimeoutUnit = flushTimeoutUnit;
return this;
}

public DDAgentWriterBuilder alwaysFlush(boolean alwaysFlush) {
this.alwaysFlush = alwaysFlush;
return this;
Expand Down Expand Up @@ -157,15 +165,23 @@ public DDAgentWriter build() {
singleSpanSampler,
null);

return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
return new DDAgentWriter(
traceProcessingWorker,
dispatcher,
healthMetrics,
flushTimeout,
flushTimeoutUnit,
alwaysFlush);
}
}

DDAgentWriter(
TraceProcessingWorker worker,
PayloadDispatcher dispatcher,
HealthMetrics healthMetrics,
int flushTimeout,
TimeUnit flushTimeoutUnit,
boolean alwaysFlush) {
super(worker, dispatcher, healthMetrics, alwaysFlush);
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,16 @@ public TraceProcessingWorker(
spanSamplingWorker.getSpanSamplingQueue(),
droppingPolicy);

boolean runAsDaemon = !Config.get().isCiVisibilityEnabled();
this.serializingHandler =
runAsDaemon
? new DaemonTraceSerializingHandler(
primaryQueue,
secondaryQueue,
healthMetrics,
dispatcher,
flushInterval,
timeUnit,
spanPostProcessor)
: new NonDaemonTraceSerializingHandler(
primaryQueue,
secondaryQueue,
healthMetrics,
dispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
new TraceSerializingHandler(
primaryQueue,
secondaryQueue,
healthMetrics,
dispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler);
}

public void start() {
Expand Down Expand Up @@ -144,91 +134,7 @@ private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity)
return new MpscBlockingConsumerArrayQueue<>(capacity);
}

private static class DaemonTraceSerializingHandler extends TraceSerializingHandler {
public DaemonTraceSerializingHandler(
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
HealthMetrics healthMetrics,
PayloadDispatcher payloadDispatcher,
long flushInterval,
TimeUnit timeUnit,
SpanPostProcessor spanPostProcessor) {
super(
primaryQueue,
secondaryQueue,
healthMetrics,
payloadDispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
}

@Override
public void run() {
try {
runDutyCycle();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.debug("Datadog trace processor exited. Publishing traces stopped");
}

private void runDutyCycle() throws InterruptedException {
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
consumeFromPrimaryQueue();
consumeFromSecondaryQueue();
flushIfNecessary();
}
}
}

private static class NonDaemonTraceSerializingHandler extends TraceSerializingHandler {
private static final double SHUTDOWN_TIMEOUT_MILLIS = 5_000;
private Long shutdownSignalTimestamp;

public NonDaemonTraceSerializingHandler(
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
HealthMetrics healthMetrics,
PayloadDispatcher payloadDispatcher,
long flushInterval,
TimeUnit timeUnit,
SpanPostProcessor spanPostProcessor) {
super(
primaryQueue,
secondaryQueue,
healthMetrics,
payloadDispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
}

@Override
public void run() {
while (!shouldShutdown()) {
try {
consumeFromPrimaryQueue();
consumeFromSecondaryQueue();
flushIfNecessary();
} catch (InterruptedException e) {
if (shutdownSignalTimestamp == null) {
shutdownSignalTimestamp = System.currentTimeMillis();
}
}
}
log.debug("Datadog trace processor exited. Unpublished traces left: " + !queuesAreEmpty());
}

private boolean shouldShutdown() {
return shutdownSignalTimestamp != null
&& (shutdownSignalTimestamp + SHUTDOWN_TIMEOUT_MILLIS <= System.currentTimeMillis()
|| queuesAreEmpty());
}
}

public abstract static class TraceSerializingHandler implements Runnable {
public static class TraceSerializingHandler implements Runnable {

private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
Expand Down Expand Up @@ -261,6 +167,27 @@ public TraceSerializingHandler(
this.spanPostProcessor = spanPostProcessor;
}

@Override
public void run() {
try {
runDutyCycle();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.debug(
"Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: "
+ !queuesAreEmpty());
}

private void runDutyCycle() throws InterruptedException {
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
consumeFromPrimaryQueue();
consumeFromSecondaryQueue();
flushIfNecessary();
}
}

@SuppressWarnings("unchecked")
public void onEvent(Object event) {
// publish an incomplete batch if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import datadog.trace.common.writer.ddintake.DDIntakeTrackTypeResolver;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.Strings;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,6 +106,10 @@ public static Writer createWriter(
.singleSpanSampler(singleSpanSampler)
.flushIntervalMilliseconds(flushIntervalMilliseconds);

if (config.isCiVisibilityEnabled()) {
builder.flushTimeout(5, TimeUnit.SECONDS);
}

if (config.isCiVisibilityCodeCoverageEnabled()) {
final RemoteApi coverageApi =
createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.CITESTCOV);
Expand Down Expand Up @@ -140,7 +145,7 @@ public static Writer createWriter(
ddAgentApi.addResponseListener((RemoteResponseListener) sampler);
}

remoteWriter =
DDAgentWriter.DDAgentWriterBuilder builder =
DDAgentWriter.builder()
.agentApi(ddAgentApi)
.featureDiscovery(featuresDiscovery)
Expand All @@ -149,8 +154,13 @@ public static Writer createWriter(
.monitoring(commObjects.monitoring)
.alwaysFlush(alwaysFlush)
.spanSamplingRules(singleSpanSampler)
.flushIntervalMilliseconds(flushIntervalMilliseconds)
.build();
.flushIntervalMilliseconds(flushIntervalMilliseconds);

if (config.isCiVisibilityEnabled()) {
builder.flushTimeout(5, TimeUnit.SECONDS);
}

remoteWriter = builder.build();
}

return remoteWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
def dispatcher = new PayloadDispatcherImpl(new DDAgentMapperDiscovery(discovery), api, monitor, monitoring)

@Subject
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)

// Only used to create spans
def dummyTracer = tracerBuilder().writer(new ListWriter()).build()
Expand Down Expand Up @@ -176,7 +176,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
def worker = Mock(TraceProcessingWorker)
def monitor = Stub(HealthMetrics)
def dispatcher = Mock(PayloadDispatcherImpl)
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)
def p0 = newSpan()
p0.setSamplingPriority(PrioritySampling.SAMPLER_DROP)
def trace = [p0, newSpan()]
Expand Down

0 comments on commit 75639e3

Please sign in to comment.