Skip to content

Commit

Permalink
feat(stagger): add rollback flow for edge agent
Browse files Browse the repository at this point in the history
  • Loading branch information
oscarzhou-portainer committed Jul 20, 2023
1 parent 15f18b6 commit f4015c6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
4 changes: 2 additions & 2 deletions edge/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
type PortainerClient interface {
GetEnvironmentID() (portainer.EndpointID, error)
GetEnvironmentStatus(flags ...string) (*PollStatusResponse, error)
GetEdgeStackConfig(edgeStackID int) (*edge.StackPayload, error)
SetEdgeStackStatus(edgeStackID int, edgeStackStatus portainer.EdgeStackStatusType, error string) error
GetEdgeStackConfig(edgeStackID int, version *int) (*edge.StackPayload, error)
SetEdgeStackStatus(edgeStackID int, edgeStackStatus portainer.EdgeStackStatusType, rollbackTo int, error string) error
SetEdgeJobStatus(edgeJobStatus agent.EdgeJobStatus) error
SetTimeout(t time.Duration)
SetLastCommandTimestamp(timestamp time.Time)
Expand Down
7 changes: 4 additions & 3 deletions edge/client/portainer_edge_async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (client *PortainerAsyncClient) executeAsyncRequest(payload AsyncRequest, po
}

// SetEdgeStackStatus updates the status of an Edge stack on the Portainer server
func (client *PortainerAsyncClient) SetEdgeStackStatus(edgeStackID int, edgeStackStatus portainer.EdgeStackStatusType, err string) error {
func (client *PortainerAsyncClient) SetEdgeStackStatus(edgeStackID int, edgeStackStatus portainer.EdgeStackStatusType, rollbackTo int, err string) error {
client.nextSnapshotMutex.Lock()
defer client.nextSnapshotMutex.Unlock()

Expand All @@ -447,7 +447,8 @@ func (client *PortainerAsyncClient) SetEdgeStackStatus(edgeStackID int, edgeStac
status = append(status, portainer.EdgeStackDeploymentStatus{
Type: edgeStackStatus,
Error: err,
Time: time.Now().Unix(),
// todo: should add rollbackTo for async agent?
Time: time.Now().Unix(),
})

client.nextSnapshot.StackStatus[portainer.EdgeStackID(edgeStackID)] = status
Expand Down Expand Up @@ -478,7 +479,7 @@ func (client *PortainerAsyncClient) DeleteEdgeStackStatus(edgeStackID int) error
}

// GetEdgeStackConfig retrieves the configuration associated to an Edge stack
func (client *PortainerAsyncClient) GetEdgeStackConfig(edgeStackID int) (*edge.StackPayload, error) {
func (client *PortainerAsyncClient) GetEdgeStackConfig(edgeStackID int, version *int) (*edge.StackPayload, error) {
// Async mode MUST NOT make any extra requests to Portainer, all the
// information exchange needs to happen via the async polling loop, which
// uses /endpoints/edge/async. This is a strict requirement.
Expand Down
9 changes: 8 additions & 1 deletion edge/client/portainer_edge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS
}

// GetEdgeStackConfig retrieves the configuration associated to an Edge stack
func (client *PortainerEdgeClient) GetEdgeStackConfig(edgeStackID int) (*edge.StackPayload, error) {
func (client *PortainerEdgeClient) GetEdgeStackConfig(edgeStackID int, version *int) (*edge.StackPayload, error) {
requestURL := fmt.Sprintf("%s/api/endpoints/%d/edge/stacks/%d", client.serverAddress, client.getEndpointIDFn(), edgeStackID)

if version != nil {
requestURL += fmt.Sprintf("?version=%d", *version)
}

req, err := http.NewRequest(http.MethodGet, requestURL, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,19 +205,22 @@ type setEdgeStackStatusPayload struct {
Error string
Status portainer.EdgeStackStatusType
EndpointID portainer.EndpointID
RollbackTo int `json:",omitempty"`
Time int64
}

// SetEdgeStackStatus updates the status of an Edge stack on the Portainer server
func (client *PortainerEdgeClient) SetEdgeStackStatus(
edgeStackID int,
edgeStackStatus portainer.EdgeStackStatusType,
rollbackTo int,
error string,
) error {
payload := setEdgeStackStatusPayload{
Error: error,
Status: edgeStackStatus,
EndpointID: client.getEndpointIDFn(),
RollbackTo: rollbackTo,
Time: time.Now().Unix(),
}

Expand Down
14 changes: 7 additions & 7 deletions edge/poll_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,41 +234,41 @@ func (service *PollService) processStackCommand(ctx context.Context, command cli
return newOperationError("stack", command.Operation, err)
}

err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusAcknowledged, "")
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusAcknowledged, stackData.Version, "")
if err != nil {
return newOperationError("stack", command.Operation, err)
}

switch command.Operation {
case "add", "replace":
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusDeploymentReceived, "")
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusDeploymentReceived, stackData.Version, "")
if err != nil {
return newOperationError("stack", command.Operation, err)
}

err = service.edgeStackManager.DeployStack(ctx, stackData)

if err != nil {
return service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusError, err.Error())
return service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusError, stackData.Version, err.Error())
}

err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusDeploying, "")
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusDeploying, stackData.Version, "")
if err != nil {
return newOperationError("stack", command.Operation, err)
}

case "remove":
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusRemoving, "")
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusRemoving, stackData.Version, "")
if err != nil {
return newOperationError("stack", command.Operation, err)
}

err = service.edgeStackManager.DeleteStack(ctx, stackData)
if err != nil {
return service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusError, err.Error())
return service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusError, stackData.Version, err.Error())
}

err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusRemoved, "")
err = service.portainerClient.SetEdgeStackStatus(stackData.ID, portainer.EdgeStackStatusRemoved, stackData.Version, "")
if err != nil {
return newOperationError("stack", command.Operation, err)
}
Expand Down
24 changes: 12 additions & 12 deletions edge/stack/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (manager *StackManager) processStack(stackID int, version int) error {
}
}

stackPayload, err := manager.portainerClient.GetEdgeStackConfig(stackID)
stackPayload, err := manager.portainerClient.GetEdgeStackConfig(stackID, &version)
if err != nil {
return err
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func (manager *StackManager) processStack(stackID int, version int) error {
Str("namespace", stack.Namespace).
Msg("stack acknowledged")

return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusAcknowledged, "")
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusAcknowledged, stack.Version, "")
}

func (manager *StackManager) processRemovedStacks(pollResponseStacks map[int]int) {
Expand Down Expand Up @@ -425,17 +425,17 @@ func (manager *StackManager) checkStackStatus(ctx context.Context, stackName str

if status == libstack.StatusError {
stack.Status = StatusError
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, statusMessage)
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, stack.Version, statusMessage)
}

if status == libstack.StatusRunning {
stack.Status = StatusDeployed
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRunning, "")
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRunning, stack.Version, "")
}

if status == libstack.StatusRemoved {
delete(manager.stacks, edgeStackID(stack.ID))
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRemoved, "")
return manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRemoved, stack.Version, "")
}

return nil
Expand Down Expand Up @@ -479,7 +479,7 @@ func (manager *StackManager) validateStackFile(ctx context.Context, stack *edgeS
log.Error().Int("stack_identifier", int(stack.ID)).Err(err).Msg("stack validation failed")
stack.Status = StatusError

statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, err.Error())
statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, stack.Version, err.Error())
if statusUpdateErr != nil {
log.Error().Err(statusUpdateErr).Msg("unable to update Edge stack status")
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (manager *StackManager) pullImages(ctx context.Context, stack *edgeStack, s

log.Debug().Int("stack_identifier", int(stack.ID)).Int("stack_version", stack.Version).Msg("stack images pulled")

statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusImagesPulled, "")
statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusImagesPulled, stack.Version, "")
if statusUpdateErr != nil {
log.Error().Err(statusUpdateErr).Msg("unable to update Edge stack status")
}
Expand All @@ -525,7 +525,7 @@ func (manager *StackManager) pullImages(ctx context.Context, stack *edgeStack, s
} else {
stack.Status = StatusError

statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, err.Error())
statusUpdateErr := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, stack.Version, err.Error())
if statusUpdateErr != nil {
log.Error().Err(statusUpdateErr).Msg("unable to update Edge stack status")
}
Expand All @@ -547,7 +547,7 @@ func (manager *StackManager) deployStack(ctx context.Context, stack *edgeStack,

stack.DeployCount += 1

err := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusDeploying, "")
err := manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusDeploying, stack.Version, "")
if err != nil {
log.Error().Err(err).Msg("unable to update Edge stack status")
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func (manager *StackManager) deployStack(ctx context.Context, stack *edgeStack,

log.Debug().Int("stack_identifier", int(stack.ID)).Int("stack_version", stack.Version).Msg("stack deployed")

err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusDeploymentReceived, "")
err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusDeploymentReceived, stack.Version, "")
if err != nil {
log.Error().Err(err).Msg("unable to update Edge stack status")
}
Expand All @@ -595,7 +595,7 @@ func (manager *StackManager) deployStack(ctx context.Context, stack *edgeStack,
} else {
stack.Status = StatusError

err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, "failed to redeploy stack")
err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusError, stack.Version, "failed to redeploy stack")
if err != nil {
log.Error().Err(err).Msg("unable to update Edge stack status")
}
Expand Down Expand Up @@ -637,7 +637,7 @@ func (manager *StackManager) deleteStack(ctx context.Context, stack *edgeStack,
return
}

err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRemoving, "")
err = manager.portainerClient.SetEdgeStackStatus(int(stack.ID), portainer.EdgeStackStatusRemoving, stack.Version, "")
if err != nil {
log.Error().Err(err).Msg("unable to delete Edge stack status")

Expand Down

0 comments on commit f4015c6

Please sign in to comment.