Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous InfluxDB V2 output #15621

Open
LarsStegman opened this issue Jul 15, 2024 · 7 comments · May be fixed by #15869
Open

Asynchronous InfluxDB V2 output #15621

LarsStegman opened this issue Jul 15, 2024 · 7 comments · May be fixed by #15869
Labels
feature request Requests for new plugin and for new features to existing plugins

Comments

@LarsStegman
Copy link
Contributor

Use Case

We want to use Telegraf to parse a large binary package that comes in at 50Hz. The package contains about 34k data points. The problem in our case is that in the amount of time it takes to flush all the data from 1 package, another 150 packages have come in already. The machine that hosts Telegraf also runs the Influx instance, so there is little to no network delay. The machine has 32 cores, but only about 2 are being used at 100%. The others are twindling their thumbs.

What would be the best approach for fixing this. We see two ways forward for ourselves.

  1. Create a custom logging program and do not use Telegraf.
  2. Create a new Influx output plugin/extend the existing Influx output plugin to allow multithreaded writing. The only issue we see with this, is that the Write function will be called synchronously. We would need to immediately return while the results are still being written to the database async. This means, we cannot return an error anymore should the writing fail. In our case, this is not too big of an issue, but in general this is bad.

We understand that Telegraf is completely not designed for this use case, but we would like to discuss this with you anyway.

Expected behavior

All data is written to the database.

Actual behavior

Only 0.6% of all data is written to the database.

Additional info

No response

@LarsStegman LarsStegman added the feature request Requests for new plugin and for new features to existing plugins label Jul 15, 2024
@powersj
Copy link
Contributor

powersj commented Jul 15, 2024

Hi Lars,

After chatting with the team today, we wanted to better understand the situation and the behavior you are seeing.

The problem in our case is that in the amount of time it takes to flush all the data from 1 package, another 150 packages have come in already

Is the parsing and routing of the metrics internally the bottle neck or the actually HTTP request/response of the output the bottleneck?

Is this a situation where you could spread the messages across multiple outputs and route them using tags to send more requests at once?

Is increasing the batch size and reducing flush interval helpful?

In our case, this is not too big of an issue, but in general this is bad.

Agreed, I would not go down this path and would push you to the client libraries and their async clients.

@powersj powersj added the waiting for response waiting for response from contributor label Jul 15, 2024
@LarsStegman
Copy link
Contributor Author

LarsStegman commented Jul 16, 2024

Let me give a bit more context on how we want to use Telegraf in this case. We have the Topside Lift System (TLS) on our vessel the Pioneering Spirit that we use to install/remove offshore installations. We are currently using a custom written logger that writes to an SQL database, but we in the process of moving all our datalogging to InfluxDB.

The TLS control system sends out UDP packets to the logger that contain metrics with the system status/sensor measurements at 50Hz . We were trying to parse these UDP packets using Telegraf, to avoid creating a new custom logger.

Is the parsing and routing of the metrics internally the bottle neck or the actually HTTP request/response of the output the bottleneck?

It appears that the HTTP request/response is really the bottleneck in this case.

Is this a situation where you could spread the messages across multiple outputs and route them using tags to send more requests at once?

That might work, we will try that.

Is increasing the batch size and reducing flush interval helpful?

We tried this, but it didn't give us the performance we need.

Agreed, I would not go down this path and would push you to the client libraries and their async clients.

We are leaning more towards creating a custom logger using the client library.

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Jul 16, 2024
@powersj
Copy link
Contributor

powersj commented Jul 16, 2024

It appears that the HTTP request/response is really the bottleneck in this case.

What is the error condition you get due to this? Are you seeing that you overflow your buffer because this takes so long?

@powersj powersj added the waiting for response waiting for response from contributor label Jul 19, 2024
@LarsStegman
Copy link
Contributor Author

Yeah, the buffer is overflowing because the data cannot be pumped out quickly enough. We suspect this is because the output plugin write synchronously and single threaded. With our custom logger we run the output async and multithreaded and then we're able to pump data into Influx quickly enough to keep up.

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Jul 24, 2024
@srebhan
Copy link
Member

srebhan commented Jul 24, 2024

Hey @LarsStegman! Thanks for sharing the details and for using Telegraf in this super cool project!

Brainstorming a bit more in the team, how about the following, you can achieve the parallelization by using multiple InfluxDB outputs pointing to the same server (assuming the DB can keep up with the amount of data). To ease the splitting to multiple outputs, we can implement a new batching processor (please feel free to suggest a better name!) which gets a batch_size and a max_splitting setting. The processor will then create a new (user-specified) tag which gets a "batch-ID" in the range [0 .. max_splitting) using

    func (p *Batching) Apply(in ...telegraf.Metric) []telegraf.Metric {
        out := make([]telegraf.Metric, 0, len(in))
        for _, m := range in {
            batchID := strconv.FormatUint64((p.count % p.BatchSize) % max_splitting, 10)
            p.count++
            m.AddTag(p.BatchTag, batchID) 
            out = append(out, m)
        }
        return out
    }

So if you e.g. want 16 parallel output workers you setup the config like

[[processors.batching]]
  batch_tag = "_batch"
  batch_size = <your agent setting of batch size>
  max_splitting = 16

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["0"]

...

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["15"]

This has the advantage that you can apply this for all outputs and even processors...

What do you think?

@LarsStegman
Copy link
Contributor Author

Hey @LarsStegman! Thanks for sharing the details and for using Telegraf in this super cool project!

Hi @srebhan thanks for thinking more about this! We really like using Telegraf as it makes connecting to our IoT sensor much easy, scalable and reliable than all the custom connectors we had in the past. We have several custom plugins (that we can't open source, unfortunately), some that we have already open sourced and we make extensive use of the existing plugins.

For this use case, we decided to go with a custom logger because it is a bit more flexible and we had a deadline we needed to make.

Brainstorming a bit more in the team, how about the following, you can achieve the parallelization by using multiple InfluxDB outputs pointing to the same server (assuming the DB can keep up with the amount of data).

The DB is able to keep up with the data easily actually, but that is probably because we are running on dedicated hardware.

To ease the splitting to multiple outputs, we can implement a new batching processor (please feel free to suggest a better name!) which gets a batch_size and a max_splitting setting. The processor will then create a new (user-specified) tag which gets a "batch-ID" in the range [0 .. max_splitting) using

    func (p *Batching) Apply(in ...telegraf.Metric) []telegraf.Metric {
        out := make([]telegraf.Metric, 0, len(in))
        for _, m := range in {
            batchID := strconv.FormatUint64((p.count % p.BatchSize) % max_splitting, 10)
            p.count++
            m.AddTag(p.BatchTag, batchID) 
            out = append(out, m)
        }
        return out
    }

So if you e.g. want 16 parallel output workers you setup the config like

[[processors.batching]]
  batch_tag = "_batch"
  batch_size = <your agent setting of batch size>
  max_splitting = 16

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["0"]

...

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["15"]

This has the advantage that you can apply this for all outputs and even processors...

What do you think?

I think this is a really interesting approach to this problem and I think it might actually work in this use case as well. I like that it solves the problem in a much more generic way. It is a lot of manual configuration, though I think that is manageable since not a lot of people will run into this problem. Also, in our case the amount of data is very predictable, so there is no need to change the configuration often after the initial configuration.

@LarsStegman LarsStegman linked a pull request Sep 11, 2024 that will close this issue
1 task
@LarsStegman
Copy link
Contributor Author

@srebhan this solution works! We are working on the system again and I wanted to try this approach before adding new features to the custom logger we made that just duplicate logic already in Telegraf. Telegraf is now able to keep up quite easily! I made a PR with your suggested processor: #15869

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Requests for new plugin and for new features to existing plugins
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants