Skip to content

Commit

Permalink
fix!: gateway: limit stateful calls to websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 5, 2024
1 parent b1c75b4 commit d9ce493
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 24 deletions.
64 changes: 44 additions & 20 deletions gateway/proxy_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (gw *Node) checkEthBlockParam(ctx context.Context, blkParam ethtypes.EthBlo
var num ethtypes.EthUint64
if blkParam.PredefinedBlock != nil {
if *blkParam.PredefinedBlock == "earliest" {
return fmt.Errorf("block param \"earliest\" is not supported")
return xerrors.New("block param \"earliest\" is not supported")
} else if *blkParam.PredefinedBlock == "pending" || *blkParam.PredefinedBlock == "latest" {
// Head is always ok.
if lookback == 0 {
Expand All @@ -127,13 +127,13 @@ func (gw *Node) checkEthBlockParam(ctx context.Context, blkParam ethtypes.EthBlo
return gw.checkBlkHash(ctx, *blkParam.BlockHash)
}

return fmt.Errorf("invalid block param")
return xerrors.New("invalid block param")
}

func (gw *Node) checkBlkParam(ctx context.Context, blkParam string, lookback ethtypes.EthUint64) error {
if blkParam == "earliest" {
// also not supported in node impl
return fmt.Errorf("block param \"earliest\" is not supported")
return xerrors.New("block param \"earliest\" is not supported")
}

head, err := gw.target.ChainHead(ctx)
Expand Down Expand Up @@ -363,7 +363,7 @@ func (gw *Node) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (ethtype
}

if params.BlkCount > ethtypes.EthUint64(EthFeeHistoryMaxBlockCount) {
return ethtypes.EthFeeHistory{}, fmt.Errorf("block count too high")
return ethtypes.EthFeeHistory{}, xerrors.New("block count too high")
}

return gw.target.EthFeeHistory(ctx, p)
Expand Down Expand Up @@ -437,14 +437,15 @@ func (gw *Node) EthGetLogs(ctx context.Context, filter *ethtypes.EthFilterSpec)
return gw.target.EthGetLogs(ctx, filter)
}

/* FILTERS: Those are stateful.. figure out how to properly either bind them to users, or time out? */

func (gw *Node) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
}

ft := statefulCallFromContext(ctx)
ft, err := getStatefulTracker(ctx)
if err != nil {
return nil, xerrors.Errorf("EthGetFilterChanges not supported: %w", err)
}
ft.lk.Lock()
_, ok := ft.userFilters[id]
ft.lk.Unlock()
Expand All @@ -461,7 +462,10 @@ func (gw *Node) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) (
return nil, err
}

ft := statefulCallFromContext(ctx)
ft, err := getStatefulTracker(ctx)
if err != nil {
return nil, xerrors.Errorf("EthGetFilterLogs not supported: %w", err)
}
ft.lk.Lock()
_, ok := ft.userFilters[id]
ft.lk.Unlock()
Expand All @@ -478,7 +482,7 @@ func (gw *Node) EthNewFilter(ctx context.Context, filter *ethtypes.EthFilterSpec
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) {
return addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewFilter(ctx, filter)
})
}
Expand All @@ -488,7 +492,7 @@ func (gw *Node) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, er
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) {
return addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewBlockFilter(ctx)
})
}
Expand All @@ -498,7 +502,7 @@ func (gw *Node) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.Et
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) {
return addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewPendingTransactionFilter(ctx)
})
}
Expand All @@ -509,7 +513,10 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)
}

// check if the filter belongs to this connection
ft := statefulCallFromContext(ctx)
ft, err := getStatefulTracker(ctx)
if err != nil {
return false, xerrors.Errorf("EthUninstallFilter not supported: %w", err)
}
ft.lk.Lock()
defer ft.lk.Unlock()

Expand Down Expand Up @@ -546,12 +553,15 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
}

ft := statefulCallFromContext(ctx)
ft, err := getStatefulTracker(ctx)
if err != nil {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("EthSubscribe not supported: %w", err)
}
ft.lk.Lock()
defer ft.lk.Unlock()

if len(ft.userSubscriptions) >= EthMaxFiltersPerConn {
return ethtypes.EthSubscriptionID{}, fmt.Errorf("too many subscriptions")
return ethtypes.EthSubscriptionID{}, xerrors.New("too many subscriptions")
}

sub, err := gw.target.EthSubscribe(ctx, p)
Expand Down Expand Up @@ -582,7 +592,10 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI
}

// check if the filter belongs to this connection
ft := statefulCallFromContext(ctx)
ft, err := getStatefulTracker(ctx)
if err != nil {
return false, xerrors.Errorf("EthUnsubscribe not supported: %w", err)
}
ft.lk.Lock()
defer ft.lk.Unlock()

Expand Down Expand Up @@ -666,13 +679,16 @@ func (gw *Node) EthTraceFilter(ctx context.Context, filter ethtypes.EthTraceFilt

var EthMaxFiltersPerConn = 16 // todo make this configurable

func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) {
ft := statefulCallFromContext(ctx)
func addUserFilterLimited(ctx context.Context, callName string, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) {
ft, err := getStatefulTracker(ctx)
if err != nil {
return ethtypes.EthFilterID{}, xerrors.Errorf("%s not supported: %w", callName, err)
}
ft.lk.Lock()
defer ft.lk.Unlock()

if len(ft.userFilters) >= EthMaxFiltersPerConn {
return ethtypes.EthFilterID{}, fmt.Errorf("too many filters")
return ethtypes.EthFilterID{}, xerrors.New("too many filters")
}

id, err := cb()
Expand All @@ -685,8 +701,16 @@ func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID,
return id, nil
}

func statefulCallFromContext(ctx context.Context) *statefulCallTracker {
return ctx.Value(statefulCallTrackerKey).(*statefulCallTracker)
func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) {
if jsonrpc.GetConnectionType(ctx) != jsonrpc.ConnectionTypeWS {
return nil, xerrors.New("stateful tracking is only available for websockets connections")
}

if ct, ok := ctx.Value(statefulCallTrackerKey).(*statefulCallTracker); !ok {
return nil, xerrors.New("stateful tracking is not available for this call")
} else {
return ct, nil
}
}

type statefulCallTracker struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
github.com/filecoin-project/go-f3 v0.0.7
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0
github.com/filecoin-project/go-jsonrpc v0.3.2
github.com/filecoin-project/go-jsonrpc v0.6.0
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4
github.com/filecoin-project/go-state-types v0.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 h1:nYs6OPUF8KbZ3E8o9p9HJnQaE8iugjHR5WYVMcicDJc=
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0/go.mod h1:s0qiHRhFyrgW0SvdQMSJFQxNa4xEIG5XvqCBZUEgcbc=
github.com/filecoin-project/go-jsonrpc v0.3.2 h1:uuAWTZe6B3AUUta+O26HlycGoej/yiaI1fXp3Du+D3I=
github.com/filecoin-project/go-jsonrpc v0.3.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY=
github.com/filecoin-project/go-jsonrpc v0.6.0/go.mod h1:/n/niXcS4ZQua6i37LcVbY1TmlJR0UIK9mDFQq2ICek=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
Expand Down
39 changes: 38 additions & 1 deletion itests/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/itests/multisig"
Expand Down Expand Up @@ -389,7 +390,6 @@ func TestGatewayRateLimits(t *testing.T) {
response, err := client.Do(request)
req.NoError(err)
defer func() { _ = response.Body.Close() }()
req.NoError(err)
if http.StatusOK == response.StatusCode {
body, err := io.ReadAll(response.Body)
req.NoError(err)
Expand All @@ -409,3 +409,40 @@ func TestGatewayRateLimits(t *testing.T) {
}
req.True(failed, "expected requests to fail due to rate limiting")
}

func TestStatefulCallHandling(t *testing.T) {
req := require.New(t)

kit.QuietMiningLogs()
ctx := context.Background()
nodes := startNodes(ctx, t)

// not available over plain http
client := &http.Client{}
url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr)
jsonPayload := []byte(`{"method":"Filecoin.EthNewBlockFilter","params":[],"id":1,"jsonrpc":"2.0"}`)
request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload))
req.NoError(err)
request.Header.Set("Content-Type", "application/json")
response, err := client.Do(request)
req.NoError(err)
body, err := io.ReadAll(response.Body)
req.NoError(err)
defer func() { _ = response.Body.Close() }()
req.Equal(http.StatusOK, response.StatusCode)
req.Contains(
string(body),
`{"error":{"code":1,"message":"EthNewBlockFilter not supported: stateful tracking is only available for websockets connections"},"id":1,"jsonrpc":"2.0"}`,
)

// available over websocket
for i := 0; i < gateway.EthMaxFiltersPerConn; i++ {
_, err := nodes.lite.EthNewBlockFilter(ctx)
req.NoError(err)
}
// but only up to max
_, err = nodes.lite.EthNewBlockFilter(ctx)
require.ErrorContains(t, err, "too many filters")
_, err = nodes.lite.EthNewFilter(ctx, &ethtypes.EthFilterSpec{})
require.ErrorContains(t, err, "too many filters")
}

0 comments on commit d9ce493

Please sign in to comment.