Skip to content

Commit

Permalink
fix: addressing the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
behzad-mir committed Sep 17, 2024
1 parent 8b42075 commit 2d55822
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 59 deletions.
3 changes: 2 additions & 1 deletion cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
// if stateless CNI fail to get the endpoint from CNS for any reason other than Endpoint Not found
if err != nil {
if errors.Is(err, network.ErrConnectionFailure) {
logger.Info("failed to connect to CNS", zap.String("containerID", args.ContainerID), zap.Error(err))
addErr := fsnotify.AddFile(args.ContainerID, args.ContainerID, watcherPath)
logger.Info("add containerid file for Asynch delete", zap.String("containerID", args.ContainerID), zap.Error(addErr))
if addErr != nil {
Expand All @@ -1150,7 +1151,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {

// for when the endpoint is not created, but the ips are already allocated (only works if single network, single infra)
// this block is not applied to stateless CNI
if len(epInfos) == 0 && !plugin.nm.IsStatelessCNIMode() {
if len(epInfos) == 0 {
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
if !nwCfg.MultiTenancy {
logger.Error("Failed to query endpoint",
Expand Down
31 changes: 11 additions & 20 deletions cns/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ func (e *ConnectionFailureErr) Error() string {
return e.cause.Error()
}

type EndpointStateNotFoundErr struct {
cause error
}

func (e *EndpointStateNotFoundErr) Error() string {
return e.cause.Error()
}

// New returns a new CNS client configured with the passed URL and timeout.
func New(baseURL string, requestTimeout time.Duration) (*Client, error) {
if baseURL == "" {
Expand Down Expand Up @@ -1036,34 +1028,33 @@ func (c *Client) GetEndpoint(ctx context.Context, endpointID string) (*restserve
// build the request
u := c.routes[cns.EndpointAPI]
uString := u.String() + endpointID
var response restserver.GetEndpointResponse
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uString, http.NoBody)
if err != nil {
return nil, errors.Wrap(err, "failed to build request")
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Wrap(err, "failed to build request")
}

req.Header.Set(headerContentType, contentTypeJSON)
res, err := c.client.Do(req)
if err != nil {
return nil, &ConnectionFailureErr{cause: err}
response.Response.ReturnCode = types.ConnectionError
return &response, &ConnectionFailureErr{cause: err}
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, errors.Errorf("http response %d", res.StatusCode)
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Errorf("http response %d", res.StatusCode)
}

var response restserver.GetEndpointResponse
err = json.NewDecoder(res.Body).Decode(&response)
if err != nil {
return nil, errors.Wrap(err, "failed to decode GetEndpointResponse")
}

if response.Response.ReturnCode == types.NotFound {
return nil, &EndpointStateNotFoundErr{cause: err}
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Wrap(err, "failed to decode GetEndpointResponse")
}
if response.Response.ReturnCode != 0 {

return nil, errors.New(response.Response.Message)
return &response, errors.New(response.Response.Message)
}

return &response, nil
Expand Down
60 changes: 30 additions & 30 deletions cns/fsnotify/fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,38 @@ import (
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/configuration"
"github.com/Azure/azure-container-networking/cns/hnsclient"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/restserver"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const DefaultNetworkID = "azure"

type IPAMClient interface {
type endpointManager interface {
ReleaseIPs(ctx context.Context, ipconfig cns.IPConfigsRequest) error
GetEndpoint(ctx context.Context, endpointID string) (*restserver.GetEndpointResponse, error)
}

type watcher struct {
cli IPAMClient
path string
log *zap.Logger

cli endpointManager
path string
pendingDelete map[string]struct{}
lock sync.Mutex
cnsconfig *configuration.CNSConfig
}

// Create the AsyncDelete watcher.
func New(cnsconfig *configuration.CNSConfig, cli IPAMClient, path string, zlogger *zap.Logger) (*watcher, error) { //nolint
func New(cnsconfig *configuration.CNSConfig, cli endpointManager, path string) (*watcher, error) { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
zlogger.Error("error making directory", zap.String("path", path), zap.Error(err))
logger.Errorf("error making directory %s , %s", path, err.Error())
return nil, errors.Wrapf(err, "failed to create dir %s", path)
}
return &watcher{
cli: cli,
path: path,
log: zlogger,
pendingDelete: make(map[string]struct{}),
cnsconfig: cnsconfig,
}, nil
Expand All @@ -63,39 +60,39 @@ func New(cnsconfig *configuration.CNSConfig, cli IPAMClient, path string, zlogge
func (w *watcher) releaseAll(ctx context.Context) {
w.lock.Lock()
defer w.lock.Unlock()
w.log.Info("deleting Endpoint asynchronously")
for containerID := range w.pendingDelete {
w.log.Info("deleting Endpoint asynchronously")
logger.Printf("deleting Endpoint asynchronously")
// read file contents
filepath := w.path + "/" + containerID
file, err := os.Open(filepath)
if err != nil {
w.log.Error("failed to open file", zap.Error(err))
logger.Errorf("failed to open file %s", err.Error())
}

data, errReadingFile := io.ReadAll(file)
if errReadingFile != nil {
w.log.Error("failed to read file content", zap.Error(errReadingFile))
logger.Errorf("failed to read file content %s", errReadingFile)
}
file.Close()
podInterfaceID := string(data)
// in case of stateless CNI for Windows, CNS needs to remove HNS endpoitns first
if isStalessCNIMode(w.cnsconfig) {
logger.Printf("deleting HNS Endpoint asynchronously")
// remove HNS endpoint
w.log.Info("deleting HNS Endpoint asynchronously")
if err := w.deleteEndpoint(ctx, containerID); err != nil {
w.log.Error("failed to remove HNS endpoint", zap.Error(err))
logger.Errorf("failed to remove HNS endpoint %s", err.Error())
continue
}
}
w.log.Info("releasing IP for missed delete", zap.String("podInterfaceID", podInterfaceID), zap.String("containerID", containerID))
logger.Printf("releasing IP for missed delete: podInterfaceID :%s containerID:%s", podInterfaceID, containerID)
if err := w.releaseIP(ctx, podInterfaceID, containerID); err != nil {
w.log.Error("failed to release IP for missed delete", zap.String("containerID", containerID), zap.Error(err))
logger.Errorf("failed to release IP for missed delete: podInterfaceID :%s containerID:%s", podInterfaceID, containerID)
continue
}
w.log.Info("successfully released IP for missed delete", zap.String("containerID", containerID))
logger.Printf("successfully released IP for missed delete: podInterfaceID :%s containerID:%s", podInterfaceID, containerID)
delete(w.pendingDelete, containerID)
if err := removeFile(containerID, w.path); err != nil {
w.log.Error("failed to remove file for missed delete", zap.Error(err))
logger.Errorf("failed to remove file for missed delete %s", err.Error())
}
}
}
Expand All @@ -114,7 +111,7 @@ func (w *watcher) watchPendingDelete(ctx context.Context) error {
if n == 0 {
continue
}
w.log.Info("processing pending missed deletes", zap.Int("count", n))
logger.Printf("processing pending missed deletes, count: %v", n)
w.releaseAll(ctx)
}
}
Expand All @@ -135,28 +132,28 @@ func (w *watcher) watchFS(ctx context.Context) error {
// Start watching the directory, so that we don't miss any events.
err = watcher.Add(w.path)
if err != nil {
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
logger.Errorf("failed to add path %s to fsnotify watcher %s", w.path, err.Error())
return errors.Wrap(err, "failed to add path to fsnotify watcher")
}
// List the directory and creates synthetic events for any existing items.
w.log.Info("listing directory", zap.String("path", w.path))
logger.Printf("listing directory:%s", w.path)
dirContents, err := os.ReadDir(w.path)
if err != nil {
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
logger.Errorf("error reading deleteID directory %s, %s", w.path, err.Error())
return errors.Wrapf(err, "failed to read %s", w.path)
}
if len(dirContents) == 0 {
w.log.Info("no missed deletes found")
logger.Printf("no missed deletes found")
}
w.lock.Lock()
for _, file := range dirContents {
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
logger.Printf("adding missed delete from file %s", file.Name())
w.pendingDelete[file.Name()] = struct{}{}
}
w.lock.Unlock()

// Start listening for events.
w.log.Info("listening for events from fsnotify watcher")
logger.Printf("listening for events from fsnotify watcher")
for {
select {
case <-ctx.Done():
Expand All @@ -169,12 +166,12 @@ func (w *watcher) watchFS(ctx context.Context) error {
// discard any event that is not a file Create
continue
}
w.log.Info("received create event", zap.String("event", event.Name))
logger.Printf("received create event %s", event.Name)
w.lock.Lock()
w.pendingDelete[event.Name] = struct{}{}
w.lock.Unlock()
case watcherErr := <-watcher.Errors:
w.log.Error("fsnotify watcher error", zap.Error(watcherErr))
logger.Errorf("fsnotify watcher error %s", watcherErr.Error())
}
}
}
Expand Down Expand Up @@ -235,11 +232,14 @@ func (w *watcher) deleteEndpoint(ctx context.Context, containerid string) error
hnsEndpointID := ipInfo.HnsEndpointID
// we need to get the HNSENdpoint via the IP address if the HNSEndpointID is not present in the statefile
if ipInfo.HnsEndpointID == "" {
// TODO: the HSN client for windows needs to be refactored:
// remove hnsclient_linux.go and hnsclient_windows.go and instead have endpoint_linux.go and endpoint_windows.go
// and abstract hns changes in endpoint_windows.go
if hnsEndpointID, err = hnsclient.GetHNSEndpointbyIP(ipInfo.IPv4, ipInfo.IPv6, DefaultNetworkID); err != nil {
return errors.Wrap(err, "failed to find HNS endpoint with id")
}
}
w.log.Info("deleting HNS Endpoint with id ", zap.String("id", hnsEndpointID))
logger.Printf("deleting HNS Endpoint with id %v", hnsEndpointID)
if err := hnsclient.DeleteHNSEndpointbyID(hnsEndpointID); err != nil {
return errors.Wrap(err, "failed to delete HNS endpoint with id "+ipInfo.HnsEndpointID)
}
Expand All @@ -248,7 +248,7 @@ func (w *watcher) deleteEndpoint(ctx context.Context, containerid string) error
}

// isStalessCNIMode verify if the CNI is running stateless mode
func isStalessCNIWindows(cnsconfig *configuration.CNSConfig) bool {
func isStalessCNIMode(cnsconfig *configuration.CNSConfig) bool {
if !cnsconfig.InitializeFromCNI && cnsconfig.ManageEndpointState && runtime.GOOS == "windows" {
return true
}
Expand Down
2 changes: 1 addition & 1 deletion cns/restserver/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (service *HTTPRestService) GetEndpointHelper(endpointID string) (*EndpointI
} else {
logger.Errorf("[GetEndpointState] Failed to retrieve state, err:%v", err)
}
return nil, errors.Wrap(err, "[GetEndpointState] Failed to retrieve state")
return nil, ErrEndpointStateNotFound
}
if endpointInfo, ok := service.EndpointState[endpointID]; ok {
logger.Warnf("[GetEndpointState] Found existing endpoint state for container %s", endpointID)
Expand Down
2 changes: 1 addition & 1 deletion cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func main() {
_ = retry.Do(func() error {
z.Info("starting fsnotify watcher to process missed Pod deletes")
logger.Printf("starting fsnotify watcher to process missed Pod deletes")
w, err := fsnotify.New(cnsconfig, cnsclient, cnsconfig.AsyncPodDeletePath, z)
w, err := fsnotify.New(cnsconfig, cnsclient, cnsconfig.AsyncPodDeletePath)
if err != nil {
z.Error("failed to create fsnotify watcher", zap.Error(err))
return errors.Wrap(err, "failed to create fsnotify watcher, will retry")
Expand Down
1 change: 1 addition & 0 deletions cns/types/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
UnsupportedAPI ResponseCode = 43
FailedToAllocateBackendConfig ResponseCode = 44
UnexpectedError ResponseCode = 99
ConnectionError ResponseCode = 404
)

// nolint:gocyclo
Expand Down
11 changes: 5 additions & 6 deletions network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Azure/azure-container-networking/cns"
cnsclient "github.com/Azure/azure-container-networking/cns/client"
"github.com/Azure/azure-container-networking/cns/restserver"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/netio"
Expand Down Expand Up @@ -453,18 +454,16 @@ func validateUpdateEndpointState(endpointID string, ifNameToIPInfoMap map[string
// In stateless cni, container id is the endpoint id, so you can pass in either
func (nm *networkManager) GetEndpointState(networkID, containerID string) ([]*EndpointInfo, error) {
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), containerID)

if err != nil {
var connectionErr *cnsclient.ConnectionFailureErr
var EndpointStateNotFoundErr *cnsclient.EndpointStateNotFoundErr
if errors.As(err, &EndpointStateNotFoundErr) {
//var connectionErr *cnsclient.ConnectionFailureErr
//var EndpointStateNotFoundErr *cnsclient.EndpointStateNotFoundErr
if endpointResponse.Response.ReturnCode == types.NotFound {
return nil, ErrEndpointStateNotFound
}
if errors.As(err, &connectionErr) {
if endpointResponse.Response.ReturnCode == types.ConnectionError {
return nil, ErrConnectionFailure
}
return nil, ErrGetEndpointStateFailure

}
epInfos := cnsEndpointInfotoCNIEpInfos(endpointResponse.EndpointInfo, containerID)

Expand Down

0 comments on commit 2d55822

Please sign in to comment.