Skip to content

Commit

Permalink
fix(inputs.modbus): Fix optimization of overlapping requests and add …
Browse files Browse the repository at this point in the history
…warning (influxdata#13486)
  • Loading branch information
srebhan committed Jun 23, 2023
1 parent c459d7b commit 56aac4f
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 4 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/modbus/configuration_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package modbus
import (
_ "embed"
"fmt"

"github.com/influxdata/telegraf"
)

//go:embed sample_register.conf
Expand All @@ -24,6 +26,7 @@ type ConfigurationOriginal struct {
HoldingRegisters []fieldDefinition `toml:"holding_registers"`
InputRegisters []fieldDefinition `toml:"input_registers"`
workarounds ModbusWorkarounds
logger telegraf.Logger
}

func (c *ConfigurationOriginal) SampleConfigPart() string {
Expand Down Expand Up @@ -99,6 +102,7 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua
MaxBatchSize: maxQuantity,
Optimization: "none",
EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
Log: c.logger,
}

return groupFieldsToRequests(fields, params), nil
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/modbus/configuration_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"hash/maphash"

"github.com/influxdata/telegraf"
)

//go:embed sample_request.conf
Expand Down Expand Up @@ -34,6 +36,7 @@ type requestDefinition struct {
type ConfigurationPerRequest struct {
Requests []requestDefinition `toml:"request"`
workarounds ModbusWorkarounds
logger telegraf.Logger
}

func (c *ConfigurationPerRequest) SampleConfigPart() string {
Expand Down Expand Up @@ -191,6 +194,7 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
MaxExtraRegisters: def.MaxExtraRegisters,
Optimization: def.Optimization,
Tags: def.Tags,
Log: c.logger,
}
switch def.RegisterType {
case "coil":
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/modbus/modbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ func (m *Modbus) Init() error {
switch m.ConfigurationType {
case "", "register":
m.ConfigurationOriginal.workarounds = m.Workarounds
m.ConfigurationOriginal.logger = m.Log
cfg = &m.ConfigurationOriginal
case "request":
m.ConfigurationPerRequest.workarounds = m.Workarounds
m.ConfigurationPerRequest.logger = m.Log
cfg = &m.ConfigurationPerRequest
default:
return fmt.Errorf("unknown configuration type %q", m.ConfigurationType)
Expand Down
59 changes: 59 additions & 0 deletions plugins/inputs/modbus/modbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -4937,3 +4938,61 @@ func TestRequestsWorkaroundsReadCoilsStartingAtZeroRegister(t *testing.T) {
require.Equal(t, maxQuantityCoils, plugin.requests[1].coil[1].address)
require.Equal(t, uint16(1), plugin.requests[1].coil[1].length)
}

func TestRequestsOverlap(t *testing.T) {
logger := &testutil.CaptureLogger{}
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: logger,
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
}
plugin.Requests = []requestDefinition{
{
SlaveID: 1,
RegisterType: "holding",
Optimization: "max_insert",
MaxExtraRegisters: 16,
Fields: []requestFieldDefinition{
{
Name: "field-1",
InputType: "UINT32",
Address: uint16(1),
},
{
Name: "field-2",
InputType: "UINT64",
Address: uint16(3),
},
{
Name: "field-3",
InputType: "UINT32",
Address: uint16(5),
},
{
Name: "field-4",
InputType: "UINT32",
Address: uint16(7),
},
},
},
}
require.NoError(t, plugin.Init())

require.Eventually(t, func() bool {
return len(logger.Warnings()) > 0
}, 3*time.Second, 100*time.Millisecond)

var found bool
for _, w := range logger.Warnings() {
if strings.Contains(w, "Request at 3 with length 4 overlaps with next request at 5") {
found = true
break
}
}
require.True(t, found, "Overlap warning not found!")

require.Len(t, plugin.requests, 1)
require.Len(t, plugin.requests[1].holding, 1)
}
19 changes: 15 additions & 4 deletions plugins/inputs/modbus/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package modbus

import (
"sort"

"github.com/influxdata/telegraf"
)

type request struct {
Expand Down Expand Up @@ -125,7 +127,7 @@ func optimizeGroup(g request, maxBatchSize uint16) []request {
return requests
}

func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegisters uint16) []request {
func optimitzeGroupWithinLimits(g request, params groupingParams) []request {
if len(g.fields) == 0 {
return nil
}
Expand All @@ -139,8 +141,15 @@ func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegister
for i := 1; i <= len(g.fields)-1; i++ {
// Check if we need to interrupt the current chunk and require a new one
holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length)
needInterrupt := holeSize > maxExtraRegisters // too far apart
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > maxBatchSize // too large
if g.fields[i].address < g.fields[i-1].address+g.fields[i-1].length {
params.Log.Warnf(
"Request at %d with length %d overlaps with next request at %d",
g.fields[i-1].address, g.fields[i-1].length, g.fields[i].address,
)
holeSize = 0
}
needInterrupt := holeSize > params.MaxExtraRegisters // too far apart
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.MaxBatchSize // too large
if !needInterrupt {
// Still safe to add the field to the current request
currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address
Expand Down Expand Up @@ -171,6 +180,8 @@ type groupingParams struct {
EnforceFromZero bool
// Tags to add for the requests
Tags map[string]string
// Log facility to inform the user
Log telegraf.Logger
}

func groupFieldsToRequests(fields []field, params groupingParams) []request {
Expand Down Expand Up @@ -264,7 +275,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
total.fields = append(total.fields, g.fields...)
}
}
requests = optimitzeGroupWithinLimits(total, params.MaxBatchSize, params.MaxExtraRegisters)
requests = optimitzeGroupWithinLimits(total, params)
default:
// no optimization
for _, g := range groups {
Expand Down

0 comments on commit 56aac4f

Please sign in to comment.