From fc40df37cfb7744a1a9143a41ae74e5a7bfb9c07 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 15 Jun 2023 22:27:11 -0400 Subject: [PATCH] Add checks in validation webhook to prevent container ports from colliding with others --- pkg/webhooks/errors.go | 16 +-- pkg/webhooks/kafkacluster_validator.go | 50 +++++++- pkg/webhooks/kafkacluster_validator_test.go | 122 +++++++++++++++++++- 3 files changed, 172 insertions(+), 16 deletions(-) diff --git a/pkg/webhooks/errors.go b/pkg/webhooks/errors.go index 3fcf6d7a07..58f3d9ef16 100644 --- a/pkg/webhooks/errors.go +++ b/pkg/webhooks/errors.go @@ -21,14 +21,14 @@ import ( ) const ( - cantConnectErrorMsg = "failed to connect to kafka cluster" - cantConnectAPIServerMsg = "failed to connect to Kubernetes API server" - invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster" - outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" - outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" - unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" - invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" - invalidExternalListenerTargetPortErrMsg = "invalid external listener ingress controller target port number" + cantConnectErrorMsg = "failed to connect to kafka cluster" + cantConnectAPIServerMsg = "failed to connect to Kubernetes API server" + invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster" + outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" + outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" + unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" + invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" + invalidContainerPortForIngressControllerErrMsg = "invalid trarget port number for ingress controller deployment" // errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors errorDuringValidationMsg = "error during validation" diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index 95dd22a7bc..9a03e17152 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -178,6 +178,8 @@ func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpe allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) + allErrs = append(allErrs, checkTargetPortsCollisionForEnvoy(kafkaClusterSpec)...) + return allErrs } @@ -224,13 +226,17 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id) } + if externalPort == extListener.GetIngressControllerTargetPort() { + collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) + } + if kafkaClusterSpec.GetIngressController() == "envoy" { if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() { collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) } } } - + if len(outOfRangeBrokerIDs) > 0 { errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, outOfRangeBrokerIDs) fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) @@ -239,11 +245,49 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk if len(collidingPortsBrokerIDs) > 0 { errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), collidingPortsBrokerIDs) + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), or the ingressControllerTargetPort ('%d') for brokers %v", + extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), extListener.GetIngressControllerTargetPort(), collidingPortsBrokerIDs) fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) allErrs = append(allErrs, fldErr) } } return allErrs } + +// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment +func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + if kafkaClusterSpec.GetIngressController() != "envoy" { + return nil + } + + var allErrs field.ErrorList + + ap := kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() + hcp := kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() + + if ap == hcp { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": The envoy configuration uses an admin port number that collides with the health-check port number" + fldErr := field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + + if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil { + for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.GetIngressControllerTargetPort() == ap { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf( + "ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's admin port", extListener.Name) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + + if extListener.GetIngressControllerTargetPort() == hcp { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf( + "ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's health-check port", extListener.Name) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + } + } + + return allErrs +} diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 9e6f03898d..0d27bd2558 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -18,6 +18,7 @@ import ( "fmt" "testing" + "github.com/banzaicloud/koperator/pkg/util" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/validation/field" @@ -682,7 +683,7 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { ), }, { - testName: "invalid config: brokers with in-range external port numbers, but they are colliding with the envoy ports", + testName: "invalid config: brokers with in-range external port numbers, but they collide with the envoy ports", kafkaClusterSpec: v1beta1.KafkaClusterSpec{ Brokers: []v1beta1.Broker{{Id: 0}, {Id: 11}, {Id: 102}}, ListenersConfig: v1beta1.ListenersConfig{ @@ -701,12 +702,14 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { expected: append(field.ErrorList{}, field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("externalStartingPort"), int32(8080), invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - "test-external1", int32(8081), int32(8080), []int32{0})), + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+ + "or the ingressControllerTargetPort ('%d') for brokers %v", + "test-external1", int32(8081), int32(8080), int32(29092), []int32{0})), field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("externalStartingPort"), int32(8070), invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - "test-external2", int32(8081), int32(8080), []int32{11})), + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+ + "or the ingressControllerTargetPort ('%d') for brokers %v", + "test-external2", int32(8081), int32(8080), int32(29092), []int32{11})), ), }, } @@ -719,3 +722,112 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { } } + +func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpec v1beta1.KafkaClusterSpec + expected field.ErrorList + }{ + { + testName: "valid config: envoy admin port, envoy health-check port, and ingress controller target port are not defined", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + }, + }, + }, + }, + expected: nil, + }, + { + testName: "invalid config: user-specified envoy admin port collides with default envoy health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + AdminPort: util.Int32Pointer(8080), + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8080), + invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"), + ), + }, + { + testName: "invalid config: default envoy admin port collides with user-specified envoy health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + HealthCheckPort: util.Int32Pointer(8081), + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8081), + invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"), + ), + }, + { + testName: "invalid config: user-specified ingress controller target port collided with user-specified envoy admin port and default health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + AdminPort: util.Int32Pointer(29000), + }, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(29000), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(8080), + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(29000), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's admin port"), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8080), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's health-check"+ + " port"), + ), + }, + { + testName: "invalid config: user-specified ingress controller target port collided with default envoy admin port and user-specified health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + HealthCheckPort: util.Int32Pointer(19090), + }, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(19090), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(8081), + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(19090), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's health-check port"), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8081), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's admin port"), + ), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.testName, func(t *testing.T) { + got := checkTargetPortsCollisionForEnvoy(&testCase.kafkaClusterSpec) + require.Equal(t, testCase.expected, got) + }) + } +}