Skip to content

Commit

Permalink
Merge branch 'release/v1.7.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Oct 11, 2021
2 parents 9521e6e + 54db0ac commit 3da6238
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 35 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ resgate [options]
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--putmethod &lt;methodName&gt;</code> | Call method name mapped to HTTP PUT requests |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--deletemethod &lt;methodName&gt;</code> | Call method name mapped to HTTP DELETE requests |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--patchmethod &lt;methodName&gt;</code> | Call method name mapped to HTTP PATCH requests |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--wscompression</code> | Enable WebSocket per message compression |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--resetthrottle &lt;limit&gt;</coce> | Limit on parallel requests sent in response to a system reset |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--referencethrottle &lt;limit&gt;</coce> | Limit on parallel requests sent when following resource references |
| <code>-c, --config &lt;file&gt;</code> | Configuration file in JSON format |

### Security options
Expand Down Expand Up @@ -134,71 +137,100 @@ Configuration is a JSON encoded file. If no config file is found at the given pa
{
// URL to the NATS server.
"natsUrl": "nats://127.0.0.1:4222",

// Bind to HOST IPv4 or IPv6 address.
// Empty string ("") means all IPv4 and IPv6 addresses.
// Invalid or missing IP address defaults to 0.0.0.0.
"addr": "0.0.0.0",

// Port for the http server to listen on.
// If the port value is missing or 0, standard http(s) port is used.
"port": 8080,

// Path for accessing the RES API WebSocket.
"wsPath": "/",

// Path prefix for accessing web resources.
"apiPath": "/api",

// Timeout in milliseconds for NATS requests.
"requestTimeout": 3000,

// Header authentication resource method for web resources.
// Prior to accessing the resource, this resource method will be
// called, allowing an auth service to set a token using
// information such as the request headers.
// Missing value or null will disable header authentication.
// Eg. "authService.headerLogin"
"headerAuth": null,

// Encoding for web resources.
// Available encodings are:
// * json - JSON encoding with resource reference meta data.
// * jsonflat - JSON encoding without resource reference meta data.
"apiEncoding": "json",

// Call method name to map HTTP PUT method requests to.
// Eg. "put"
"putMethod": null,

// Call method name to map HTTP DELETE method requests to.
// Eg. "delete"
"deleteMethod": null,

// Call method name to map HTTP PATCH method requests to.
// Eg. "patch"
"patchMethod": null,

// Flag enabling WebSocket per message compression (RFC 7692).
"wsCompression": false,

// Throttle on how many requests are sent in response to a system reset.
// Once that the number of requests are sent, the server will await
// responses before sending more requests. Zero (0) means no throttling.
// Eg. 32
"resetThrottle": 0,

// Throttle on how many requests are sent when recursively following
// resource references for a subscription.
// Once that the number of requests are sent, the server will await
// responses before sending more requests. Zero (0) means no throttling.
// Eg. 32
"referenceThrottle": 0,

// Flag enabling tls encryption.
"tls": false,

// Certificate file path for tls encryption.
"tlsCert": "",

// Key file path for tls encryption.
"tlsKey": "",

// NATS User Credentials file.
// Eg. "ngs.creds"
"natsCreds": "",

// NATS Client certificate file.
// Eg. "client-cert.pem"
"natsCert": "",

// NATS Client certificate key file.
// Eg. "client-key.pem"
"natsKey": "",

// NATS Root CA files.
// Eg. ["rootCA.pem"]
"natsRootCAs": [],

// Allowed origin for CORS requests, or * to allow all origins.
// Multiple origins are separated by semicolon.
// Eg. "https://example.com;https://api.example.com"
"allowOrigin": "*",

// Flag enabling debug logging.
"debug": false,

// Flag enabling trace logging.
"trace": false
}
Expand Down
27 changes: 13 additions & 14 deletions docs/res-service-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,42 +553,41 @@ May be omitted.
}
```

### Resource name pattern
A resource name pattern is a string used for matching resource names.
The pattern may use the following wild cards:

* The asterisk (`*`) matches any part at any level of the resource name.
Eg. `userService.user.*.roles` - Pattern that matches the roles collection of all users.
* The greater than symbol (`>`) matches one or more parts at the end of a resource name, and must be the last part.
Eg. `messageService.>` - Pattern that matches all resources owned by *messageService*.

## System token reset event

**Subject**
**Subject**
`system.tokenReset`

Signals that tokens matching one or more *token IDs* (tid) are to be considered out of date.
A service MUST immediately send an [auth request](#auth-request) to the provided subject for each connection with a token matching any of the token IDs. The *auth request* should not contain any params, and the response may be discarded by the service.
The event payload has the following parameters:

**tids**
**tids**
An array of token ID (tid) strings.
MUST be an array of strings.

**subject**
**subject**
A subject string to which the [auth requests](#auth-request) should be sent.
May have the subject pattern of an *auth request*, but it is not required.
MUST be a string.

**Example payload**
**Example payload**
```json
{
"tids": [ "12", "42" ],
"subject": "auth.authentication.renewToken"
}
```

### Resource name pattern
A resource name pattern is a string used for matching resource names.
The pattern may use the following wild cards:

* The asterisk (`*`) matches any part at any level of the resource name.
Eg. `userService.user.*.roles` - Pattern that matches the roles collection of all users.
* The greater than symbol (`>`) matches one or more parts at the end of a resource name, and must be the last part.
Eg. `messageService.>` - Pattern that matches all resources owned by *messageService*.


# Query resources

A query resource is a resource where its model properties or collection values may vary based on the query. It is used to request partial or filtered resources, such as for searches, sorting, or pagination.
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Server Options:
--putmethod <methodName> Call method name mapped to HTTP PUT requests
--deletemethod <methodName> Call method name mapped to HTTP DELETE requests
--patchmethod <methodName> Call method name mapped to HTTP PATCH requests
--wscompression Enable WebSocket per message compression
--resetthrottle <limit> Limit on parallel requests sent in response to a system reset
--referencethrottle <limit> Limit on parallel requests sent when following resource references
-c, --config <file> Configuration file
Security Options:
Expand Down Expand Up @@ -158,6 +161,9 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
fs.StringVar(&putMethod, "putmethod", "", "Call method name mapped to HTTP PUT requests.")
fs.StringVar(&deleteMethod, "deletemethod", "", "Call method name mapped to HTTP DELETE requests.")
fs.StringVar(&patchMethod, "patchmethod", "", "Call method name mapped to HTTP PATCH requests.")
fs.BoolVar(&c.WSCompression, "wscompression", false, "Enable WebSocket per message compression.")
fs.IntVar(&c.ResetThrottle, "resetthrottle", 0, "Limit on parallel requests sent in response to a system reset.")
fs.IntVar(&c.ReferenceThrottle, "referencethrottle", 0, "Limit on parallel requests sent when following resource references.")
fs.BoolVar(&c.Debug, "D", false, "Enable debugging output.")
fs.BoolVar(&c.Debug, "debug", false, "Enable debugging output.")
fs.BoolVar(&c.Trace, "V", false, "Enable trace logging.")
Expand Down
3 changes: 2 additions & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type Config struct {

WSCompression bool `json:"wsCompression"`

ResetThrottle int `json:"resetThrottle"`
ResetThrottle int `json:"resetThrottle"`
ReferenceThrottle int `json:"referenceThrottle"`

NoHTTP bool `json:"-"` // Disable start of the HTTP server. Used for testing

Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "time"

const (
// Version is the current version for the server.
Version = "1.7.0"
Version = "1.7.1"

// ProtocolVersion is the implemented RES protocol version.
ProtocolVersion = "1.2.2"
Expand Down
18 changes: 14 additions & 4 deletions server/rescache/eventSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *EventSubscription) getResourceSubscription(q string) (rs *ResourceSubsc
return
}

func (e *EventSubscription) addSubscriber(sub Subscriber) {
func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle) {
e.Enqueue(func() {
var rs *ResourceSubscription
q := sub.ResourceQuery()
Expand All @@ -85,9 +85,19 @@ func (e *EventSubscription) addSubscriber(sub Subscriber) {
// Create request
subj := "get." + e.ResourceName
payload := codec.CreateGetRequest(q)
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
})
// Request directly if we don't throttle, or else add to throttle
if t == nil {
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
})
} else {
t.Add(func() {
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
t.Done()
})
})
}

// If a request has already been sent
// In that case the subscriber will be handled
Expand Down
4 changes: 2 additions & 2 deletions server/rescache/rescache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ func (c *Cache) Errorf(format string, v ...interface{}) {

// Subscribe fetches a resource from the cache, and if it is
// not cached, starts subscribing to the resource and sends a get request
func (c *Cache) Subscribe(sub Subscriber) {
func (c *Cache) Subscribe(sub Subscriber, t *Throttle) {
eventSub, err := c.getSubscription(sub.ResourceName(), true)
if err != nil {
sub.Loaded(nil, err)
return
}

eventSub.addSubscriber(sub)
eventSub.addSubscriber(sub, t)
}

// Access sends an access request
Expand Down
10 changes: 7 additions & 3 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ConnSubscriber interface {
Errorf(format string, v ...interface{})
CID() string
Token() json.RawMessage
Subscribe(rid string, direct bool) (*Subscription, error)
Subscribe(rid string, direct bool, throttle *rescache.Throttle) (*Subscription, error)
Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool)
Access(sub *Subscription, callback func(*rescache.Access))
Send(data []byte)
Expand Down Expand Up @@ -53,6 +53,7 @@ type Subscription struct {
access *rescache.Access
accessCallbacks []func(*rescache.Access)
flags uint8
throttle *rescache.Throttle

// Protected by conn
direct int // Number of direct subscriptions
Expand Down Expand Up @@ -96,7 +97,7 @@ var (
)

// NewSubscription creates a new Subscription
func NewSubscription(c ConnSubscriber, rid string) *Subscription {
func NewSubscription(c ConnSubscriber, rid string, throttle *rescache.Throttle) *Subscription {
name, query := parseRID(c.ExpandCID(rid))

sub := &Subscription{
Expand All @@ -106,6 +107,7 @@ func NewSubscription(c ConnSubscriber, rid string) *Subscription {
c: c,
state: stateLoading,
queueFlag: queueReasonLoading,
throttle: throttle,
}

return sub
Expand Down Expand Up @@ -509,7 +511,7 @@ func (s *Subscription) addReference(rid string) (*Subscription, error) {
}

if ref == nil {
sub, err := s.c.Subscribe(rid, false)
sub, err := s.c.Subscribe(rid, false, s.throttle)

if err != nil {
return nil, err
Expand Down Expand Up @@ -776,6 +778,7 @@ func (s *Subscription) Dispose() {
s.state = stateDisposed
s.readyCallbacks = nil
s.eventQueue = nil
s.throttle = nil

if s.resourceSub != nil {
s.unsubscribeRefs()
Expand All @@ -792,6 +795,7 @@ func (s *Subscription) doneLoading() {
s.state = stateReady
rcbs := s.readyCallbacks
s.readyCallbacks = nil
s.throttle = nil

for _, rcb := range rcbs {
rcb.loading--
Expand Down
Loading

0 comments on commit 3da6238

Please sign in to comment.