From 19177e8e27691d6d17bf1a4818aad199c1f3f5a2 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 14 Jun 2023 11:16:09 -0400 Subject: [PATCH 1/9] Add ingressControllerTargetPort to KafkaCluster API --- api/v1beta1/kafkacluster_types.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 1badd3aa7..73bf8fd16 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -557,10 +557,18 @@ type ExternalListenerConfig struct { // configuring AnyCastPort allows kafka cluster access without specifying the exact broker // If not defined, 29092 will be used for external clients to reach the kafka cluster AnyCastPort *int32 `json:"anyCastPort,omitempty"` +<<<<<<< HEAD // +kubebuilder:validation:Minimum=1024 // +kubebuilder:validation:Maximum=65535 // +optional // IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. +======= + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:validation:Maximum=65535 + // +optional + // IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. + // If defined, IngressControllerTargetPort should be >= 1024 when using IstioIngress as the ingress controller. +>>>>>>> 934237b (Add ingressControllerTargetPort to KafkaCluster API) // If not defined, 29092 will be used as the default IngressControllerTargetPort value. IngressControllerTargetPort *int32 `json:"ingressControllerTargetPort,omitempty"` // +kubebuilder:validation:Enum=LoadBalancer;NodePort From 9a5046e5bf1a67a27da8b835a1ceb71adf285f54 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 14 Jun 2023 16:36:34 -0400 Subject: [PATCH 2/9] Add validation for ingress controlelr target port --- pkg/webhooks/errors.go | 1 + pkg/webhooks/kafkacluster_validator.go | 26 +++++- pkg/webhooks/kafkacluster_validator_test.go | 87 ++++++++++++++++++++- 3 files changed, 111 insertions(+), 3 deletions(-) diff --git a/pkg/webhooks/errors.go b/pkg/webhooks/errors.go index 33195389e..3fcf6d7a0 100644 --- a/pkg/webhooks/errors.go +++ b/pkg/webhooks/errors.go @@ -28,6 +28,7 @@ const ( outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" + invalidExternalListenerTargetPortErrMsg = "invalid external listener ingress controller target port number" // errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors errorDuringValidationMsg = "error during validation" diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index d2af7a5fa..54617c309 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -24,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" + "github.com/banzaicloud/koperator/pkg/util/istioingress" + "github.com/go-logr/logr" banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" @@ -99,7 +101,7 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic if err != nil { return nil, err } - // checking broukerConfigGroup existence + // checking brokerConfigGroup existence if brokerNew.BrokerConfigGroup != "" { if _, exists := kafkaClusterSpecNew.BrokerConfigGroups[brokerNew.BrokerConfigGroup]; !exists { return field.Invalid(field.NewPath("spec").Child("brokers").Index(int(brokerNew.Id)).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, unsupportedRemovingStorageMsg+", provided brokerConfigGroup not found"), nil @@ -166,6 +168,8 @@ func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.Kafk allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) + allErrs = append(allErrs, checkExternalListenerIngressControllerTargetPort(kafkaClusterSpec)...) + return allErrs } @@ -219,3 +223,23 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk } return allErrs } + +func checkExternalListenerIngressControllerTargetPort(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + // if there are no externalListeners, there is no need to perform the rest of the checks in this function + if kafkaClusterSpec.ListenersConfig.ExternalListeners == nil || kafkaClusterSpec.GetIngressController() != istioingress.IngressControllerName { + return nil + } + + var allErrs field.ErrorList + for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.GetIngressControllerTargetPort() < banzaicloudv1beta1.MaxWellKnownPort { + errmsg := invalidExternalListenerTargetPortErrMsg + ": " + fmt.Sprintf( + "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", + extListener.Name, extListener.GetIngressControllerTargetPort(), banzaicloudv1beta1.MaxWellKnownPort) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index( + i).Child("ingressControllerTargetPort"), *extListener.IngressControllerTargetPort, errmsg) + allErrs = append(allErrs, fldErr) + } + } + return allErrs +} diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 927d1f388..380b60ee2 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -16,12 +16,15 @@ package webhooks import ( "fmt" + "math/rand" "testing" - "github.com/banzaicloud/koperator/api/v1beta1" - "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/validation/field" + + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/util" + "github.com/banzaicloud/koperator/pkg/util/istioingress" ) // nolint: funlen @@ -690,3 +693,83 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { }) } } + +func TestCheckExternalListenerIngressControllerTargetPort(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpec v1beta1.KafkaClusterSpec + expected field.ErrorList + }{ + { + testName: "valid config: ingressControllerTargetPort is not defined", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + }, + }, + }, + }, + expected: nil, + }, + { + testName: "valid config: ingressControllerTargetPort is defined and is valid for IstioIngress controller", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(int32(v1beta1.MaxWellKnownPort + rand.Intn(65536-v1beta1.MaxWellKnownPort))), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(int32(v1beta1.MaxWellKnownPort + rand.Intn(65536-v1beta1.MaxWellKnownPort))), + }, + }, + }, + }, + expected: nil, + }, + { + testName: "invalid config: ingressControllerTargetPort is defined and is invalid for IstioIngress controller", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + IngressController: istioingress.IngressControllerName, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(1), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(1023), + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0). + Child("ingressControllerTargetPort"), int32(1), + invalidExternalListenerTargetPortErrMsg+": "+fmt.Sprintf( + "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", + "test-external1", int32(1), v1beta1.MaxWellKnownPort)), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1). + Child("ingressControllerTargetPort"), int32(1023), + invalidExternalListenerTargetPortErrMsg+": "+fmt.Sprintf( + "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", + "test-external2", int32(1023), v1beta1.MaxWellKnownPort)), + ), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.testName, func(t *testing.T) { + got := checkExternalListenerIngressControllerTargetPort(&testCase.kafkaClusterSpec) + require.Equal(t, testCase.expected, got) + }) + } +} From f168d4e3e079f9c567bbe29e31fcb95a84b26320 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 14 Jun 2023 16:38:53 -0400 Subject: [PATCH 3/9] Use newly introduced ingressControllerTargetPort field for ingress controller resources creation --- pkg/resources/envoy/configmap.go | 2 +- pkg/resources/envoy/deployment.go | 8 ++++++-- pkg/resources/envoy/service.go | 2 +- pkg/resources/istioingress/meshgateway.go | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/resources/envoy/configmap.go b/pkg/resources/envoy/configmap.go index dd03ae940..a2e2a325b 100644 --- a/pkg/resources/envoy/configmap.go +++ b/pkg/resources/envoy/configmap.go @@ -341,7 +341,7 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis SocketAddress: &envoycore.SocketAddress{ Address: "0.0.0.0", PortSpecifier: &envoycore.SocketAddress_PortValue{ - PortValue: uint32(elistener.GetAnyCastPort()), + PortValue: uint32(elistener.GetIngressControllerTargetPort()), }, }, }, diff --git a/pkg/resources/envoy/deployment.go b/pkg/resources/envoy/deployment.go index e3afb79df..fad72b873 100644 --- a/pkg/resources/envoy/deployment.go +++ b/pkg/resources/envoy/deployment.go @@ -145,8 +145,8 @@ func getExposedContainerPorts(extListener v1beta1.ExternalListenerConfig, broker } } exposedPorts = append(exposedPorts, corev1.ContainerPort{ - Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"), - ContainerPort: extListener.GetAnyCastPort(), + Name: getAllBrokerContainerPortName(), + ContainerPort: extListener.GetIngressControllerTargetPort(), Protocol: corev1.ProtocolTCP, }) @@ -163,3 +163,7 @@ func generatePodAnnotations(kafkaCluster *v1beta1.KafkaCluster, } return util.MergeAnnotations(ingressConfig.EnvoyConfig.GetAnnotations(), annotations) } + +func getAllBrokerContainerPortName() string { + return fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp") +} diff --git a/pkg/resources/envoy/service.go b/pkg/resources/envoy/service.go index 89ca1bda7..09ddc71c9 100644 --- a/pkg/resources/envoy/service.go +++ b/pkg/resources/envoy/service.go @@ -82,7 +82,7 @@ func getExposedServicePorts(extListener v1beta1.ExternalListenerConfig, brokersI // append anycast port exposedPorts = append(exposedPorts, corev1.ServicePort{ Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"), - TargetPort: intstr.FromInt(int(extListener.GetAnyCastPort())), + TargetPort: intstr.FromString(getAllBrokerContainerPortName()), Port: extListener.GetAnyCastPort(), Protocol: corev1.ProtocolTCP, }) diff --git a/pkg/resources/istioingress/meshgateway.go b/pkg/resources/istioingress/meshgateway.go index 18510368b..f1a1ec30e 100644 --- a/pkg/resources/istioingress/meshgateway.go +++ b/pkg/resources/istioingress/meshgateway.go @@ -113,7 +113,7 @@ func generateExternalPorts(kc *v1beta1.KafkaCluster, brokerIds []int, Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"), Protocol: string(corev1.ProtocolTCP), Port: externalListenerConfig.GetAnyCastPort(), - TargetPort: &istioOperatorApi.IntOrString{IntOrString: intstr.FromInt(int(externalListenerConfig.GetAnyCastPort()))}, + TargetPort: &istioOperatorApi.IntOrString{IntOrString: intstr.FromInt(int(externalListenerConfig.GetIngressControllerTargetPort()))}, }) return generatedPorts From a8926d33c03763650b8235711fadf1089cd877c9 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 15 Jun 2023 14:16:00 -0400 Subject: [PATCH 4/9] Bump up koperator/api version to v0.28.3 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 801c40f83..7db1a1d59 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/banzaicloud/istio-client-go v0.0.17 github.com/banzaicloud/istio-operator/api/v2 v2.15.1 github.com/banzaicloud/k8s-objectmatcher v1.8.0 - github.com/banzaicloud/koperator/api v0.28.2 + github.com/banzaicloud/koperator/api v0.28.3 github.com/banzaicloud/koperator/properties v0.4.1 github.com/cert-manager/cert-manager v1.11.2 github.com/cisco-open/cluster-registry-controller/api v0.2.5 diff --git a/go.sum b/go.sum index a4e5e6c98..e5a0c882b 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,8 @@ github.com/banzaicloud/istio-operator/api/v2 v2.15.1 h1:BZg8COvoOJtfx/dgN7KpoOnc github.com/banzaicloud/istio-operator/api/v2 v2.15.1/go.mod h1:5qCpwWlIfxiLvBfTvT2mD2wp5RlFCDEt8Xql4sYPNBc= github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc05MDPmpJnd1N2A= github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg= -github.com/banzaicloud/koperator/api v0.28.2 h1:x3n9dl+zhePgfwXFZlbzqeoI8veF4gL7JJAdKvfc61M= -github.com/banzaicloud/koperator/api v0.28.2/go.mod h1:MpHo+CO1TmVyHooJ3n3+o6ibL/AKlBp7L9bdyKy5Ec0= +github.com/banzaicloud/koperator/api v0.28.3 h1:bD4vBJr0Bsw+oX1kDV+hp/XR5hWp+g3YwIJi7TuHh6o= +github.com/banzaicloud/koperator/api v0.28.3/go.mod h1:fo0y8UdiH9YPE+sIK5LcJWG6hd0pIA13F4li6DOIal4= github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc= github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA= github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM= From e8317a6108e894a8b0e40d0ba90fe2b4bf0b9fe0 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 15 Jun 2023 14:56:31 -0400 Subject: [PATCH 5/9] Remove unnecessary kafkacluster valdiation due to changes in the latest kafkacluster API --- pkg/webhooks/kafkacluster_validator.go | 24 ------ pkg/webhooks/kafkacluster_validator_test.go | 83 --------------------- 2 files changed, 107 deletions(-) diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index 54617c309..c30d0f3c6 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -24,8 +24,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" - "github.com/banzaicloud/koperator/pkg/util/istioingress" - "github.com/go-logr/logr" banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" @@ -168,8 +166,6 @@ func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.Kafk allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) - allErrs = append(allErrs, checkExternalListenerIngressControllerTargetPort(kafkaClusterSpec)...) - return allErrs } @@ -223,23 +219,3 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk } return allErrs } - -func checkExternalListenerIngressControllerTargetPort(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { - // if there are no externalListeners, there is no need to perform the rest of the checks in this function - if kafkaClusterSpec.ListenersConfig.ExternalListeners == nil || kafkaClusterSpec.GetIngressController() != istioingress.IngressControllerName { - return nil - } - - var allErrs field.ErrorList - for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { - if extListener.GetIngressControllerTargetPort() < banzaicloudv1beta1.MaxWellKnownPort { - errmsg := invalidExternalListenerTargetPortErrMsg + ": " + fmt.Sprintf( - "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", - extListener.Name, extListener.GetIngressControllerTargetPort(), banzaicloudv1beta1.MaxWellKnownPort) - fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index( - i).Child("ingressControllerTargetPort"), *extListener.IngressControllerTargetPort, errmsg) - allErrs = append(allErrs, fldErr) - } - } - return allErrs -} diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 380b60ee2..36ab3a2cc 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -16,15 +16,12 @@ package webhooks import ( "fmt" - "math/rand" "testing" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/validation/field" "github.com/banzaicloud/koperator/api/v1beta1" - "github.com/banzaicloud/koperator/pkg/util" - "github.com/banzaicloud/koperator/pkg/util/istioingress" ) // nolint: funlen @@ -693,83 +690,3 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { }) } } - -func TestCheckExternalListenerIngressControllerTargetPort(t *testing.T) { - testCases := []struct { - testName string - kafkaClusterSpec v1beta1.KafkaClusterSpec - expected field.ErrorList - }{ - { - testName: "valid config: ingressControllerTargetPort is not defined", - kafkaClusterSpec: v1beta1.KafkaClusterSpec{ - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, - }, - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, - }, - }, - }, - }, - expected: nil, - }, - { - testName: "valid config: ingressControllerTargetPort is defined and is valid for IstioIngress controller", - kafkaClusterSpec: v1beta1.KafkaClusterSpec{ - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, - IngressControllerTargetPort: util.Int32Pointer(int32(v1beta1.MaxWellKnownPort + rand.Intn(65536-v1beta1.MaxWellKnownPort))), - }, - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, - IngressControllerTargetPort: util.Int32Pointer(int32(v1beta1.MaxWellKnownPort + rand.Intn(65536-v1beta1.MaxWellKnownPort))), - }, - }, - }, - }, - expected: nil, - }, - { - testName: "invalid config: ingressControllerTargetPort is defined and is invalid for IstioIngress controller", - kafkaClusterSpec: v1beta1.KafkaClusterSpec{ - IngressController: istioingress.IngressControllerName, - ListenersConfig: v1beta1.ListenersConfig{ - ExternalListeners: []v1beta1.ExternalListenerConfig{ - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, - IngressControllerTargetPort: util.Int32Pointer(1), - }, - { - CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, - IngressControllerTargetPort: util.Int32Pointer(1023), - }, - }, - }, - }, - expected: append(field.ErrorList{}, - field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0). - Child("ingressControllerTargetPort"), int32(1), - invalidExternalListenerTargetPortErrMsg+": "+fmt.Sprintf( - "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", - "test-external1", int32(1), v1beta1.MaxWellKnownPort)), - field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1). - Child("ingressControllerTargetPort"), int32(1023), - invalidExternalListenerTargetPortErrMsg+": "+fmt.Sprintf( - "ExternalListener '%s' would use a well-known port number '%d'(<%d) as the ingress controller container port, and this is not permitted", - "test-external2", int32(1023), v1beta1.MaxWellKnownPort)), - ), - }, - } - - for _, testCase := range testCases { - t.Run(testCase.testName, func(t *testing.T) { - got := checkExternalListenerIngressControllerTargetPort(&testCase.kafkaClusterSpec) - require.Equal(t, testCase.expected, got) - }) - } -} From e773b2a6352d3ae3e17afa00996ebc09312accac Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 15 Jun 2023 16:16:56 -0400 Subject: [PATCH 6/9] Add check in validation webhook for colliding ports while using enovy --- pkg/util/util.go | 4 +++ pkg/webhooks/kafkacluster_validator.go | 40 +++++++++++++++++---- pkg/webhooks/kafkacluster_validator_test.go | 29 +++++++++++++++ 3 files changed, 67 insertions(+), 6 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 7f637ecb7..34e7a5912 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -548,3 +548,7 @@ func RetryOnError(backoff wait.Backoff, fn func() error, isRetryableError func(e func RetryOnConflict(backoff wait.Backoff, fn func() error) error { return RetryOnError(backoff, fn, apierrors.IsConflict) } + +func GetExternalPortForBroker(externalStartingPort, brokerId int32) int32 { + return externalStartingPort + brokerId +} diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index c30d0f3c6..95dd22a7b 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -162,7 +162,19 @@ func getMissingMounthPathLocation(mounthPath string, kafkaClusterSpec *banzaiclo func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { var allErrs field.ErrorList - allErrs = append(allErrs, checkUniqueListenerContainerPort(kafkaClusterSpec.ListenersConfig)...) + allErrs = append(allErrs, checkInternalListeners(kafkaClusterSpec)...) + + allErrs = append(allErrs, checkExternalListeners(kafkaClusterSpec)...) + + return allErrs +} + +func checkInternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + return checkUniqueListenerContainerPort(kafkaClusterSpec.ListenersConfig) +} + +func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + var allErrs field.ErrorList allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) @@ -205,14 +217,30 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk var allErrs field.ErrorList const maxPort int32 = 65535 for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { - var invalidBrokerIDs []int32 + var outOfRangeBrokerIDs, collidingPortsBrokerIDs []int32 for _, broker := range kafkaClusterSpec.Brokers { - if extListener.ExternalStartingPort+broker.Id < 1 || extListener.ExternalStartingPort+broker.Id > maxPort { - invalidBrokerIDs = append(invalidBrokerIDs, broker.Id) + externalPort := util.GetExternalPortForBroker(extListener.ExternalStartingPort, broker.Id) + if externalPort < 1 || externalPort > maxPort { + outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id) + } + + if kafkaClusterSpec.GetIngressController() == "envoy" { + if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() { + collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) + } } } - if len(invalidBrokerIDs) > 0 { - errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, invalidBrokerIDs) + + if len(outOfRangeBrokerIDs) > 0 { + errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, outOfRangeBrokerIDs) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) + allErrs = append(allErrs, fldErr) + } + + if len(collidingPortsBrokerIDs) > 0 { + errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ + "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", + extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), collidingPortsBrokerIDs) fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) allErrs = append(allErrs, fldErr) } diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 36ab3a2cc..9e6f03898 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -681,6 +681,34 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { "test-external2", []int32{102})), ), }, + { + testName: "invalid config: brokers with in-range external port numbers, but they are colliding with the envoy ports", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0}, {Id: 11}, {Id: 102}}, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + ExternalStartingPort: 8080, + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + ExternalStartingPort: 8070, + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("externalStartingPort"), int32(8080), + invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ + "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", + "test-external1", int32(8081), int32(8080), []int32{0})), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("externalStartingPort"), int32(8070), + invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ + "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", + "test-external2", int32(8081), int32(8080), []int32{11})), + ), + }, } for _, testCase := range testCases { @@ -688,5 +716,6 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { got := checkExternalListenerStartingPort(&testCase.kafkaClusterSpec) require.Equal(t, testCase.expected, got) }) + } } From 45c48e4e4d1418938a2e62ec664d6354d8c9dfd9 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 15 Jun 2023 22:40:22 -0400 Subject: [PATCH 7/9] Rebase from master to use the latest api tag --- api/v1beta1/kafkacluster_types.go | 8 -- pkg/webhooks/errors.go | 16 +-- pkg/webhooks/kafkacluster_validator.go | 50 +++++++- pkg/webhooks/kafkacluster_validator_test.go | 122 +++++++++++++++++++- 4 files changed, 172 insertions(+), 24 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 73bf8fd16..1badd3aa7 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -557,18 +557,10 @@ type ExternalListenerConfig struct { // configuring AnyCastPort allows kafka cluster access without specifying the exact broker // If not defined, 29092 will be used for external clients to reach the kafka cluster AnyCastPort *int32 `json:"anyCastPort,omitempty"` -<<<<<<< HEAD // +kubebuilder:validation:Minimum=1024 // +kubebuilder:validation:Maximum=65535 // +optional // IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. -======= - // +kubebuilder:validation:Minimum=0 - // +kubebuilder:validation:Maximum=65535 - // +optional - // IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. - // If defined, IngressControllerTargetPort should be >= 1024 when using IstioIngress as the ingress controller. ->>>>>>> 934237b (Add ingressControllerTargetPort to KafkaCluster API) // If not defined, 29092 will be used as the default IngressControllerTargetPort value. IngressControllerTargetPort *int32 `json:"ingressControllerTargetPort,omitempty"` // +kubebuilder:validation:Enum=LoadBalancer;NodePort diff --git a/pkg/webhooks/errors.go b/pkg/webhooks/errors.go index 3fcf6d7a0..58f3d9ef1 100644 --- a/pkg/webhooks/errors.go +++ b/pkg/webhooks/errors.go @@ -21,14 +21,14 @@ import ( ) const ( - cantConnectErrorMsg = "failed to connect to kafka cluster" - cantConnectAPIServerMsg = "failed to connect to Kubernetes API server" - invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster" - outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" - outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" - unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" - invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" - invalidExternalListenerTargetPortErrMsg = "invalid external listener ingress controller target port number" + cantConnectErrorMsg = "failed to connect to kafka cluster" + cantConnectAPIServerMsg = "failed to connect to Kubernetes API server" + invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster" + outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" + outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" + unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" + invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" + invalidContainerPortForIngressControllerErrMsg = "invalid trarget port number for ingress controller deployment" // errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors errorDuringValidationMsg = "error during validation" diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index 95dd22a7b..9a03e1715 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -178,6 +178,8 @@ func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpe allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) + allErrs = append(allErrs, checkTargetPortsCollisionForEnvoy(kafkaClusterSpec)...) + return allErrs } @@ -224,13 +226,17 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id) } + if externalPort == extListener.GetIngressControllerTargetPort() { + collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) + } + if kafkaClusterSpec.GetIngressController() == "envoy" { if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() { collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) } } } - + if len(outOfRangeBrokerIDs) > 0 { errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, outOfRangeBrokerIDs) fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) @@ -239,11 +245,49 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk if len(collidingPortsBrokerIDs) > 0 { errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), collidingPortsBrokerIDs) + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), or the ingressControllerTargetPort ('%d') for brokers %v", + extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), extListener.GetIngressControllerTargetPort(), collidingPortsBrokerIDs) fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg) allErrs = append(allErrs, fldErr) } } return allErrs } + +// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment +func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + if kafkaClusterSpec.GetIngressController() != "envoy" { + return nil + } + + var allErrs field.ErrorList + + ap := kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() + hcp := kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() + + if ap == hcp { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": The envoy configuration uses an admin port number that collides with the health-check port number" + fldErr := field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + + if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil { + for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.GetIngressControllerTargetPort() == ap { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf( + "ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's admin port", extListener.Name) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + + if extListener.GetIngressControllerTargetPort() == hcp { + errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf( + "ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's health-check port", extListener.Name) + fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg) + allErrs = append(allErrs, fldErr) + } + } + } + + return allErrs +} diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 9e6f03898..10fef1b42 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -21,6 +21,8 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/validation/field" + "github.com/banzaicloud/koperator/pkg/util" + "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -682,7 +684,7 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { ), }, { - testName: "invalid config: brokers with in-range external port numbers, but they are colliding with the envoy ports", + testName: "invalid config: brokers with in-range external port numbers, but they collide with the envoy ports", kafkaClusterSpec: v1beta1.KafkaClusterSpec{ Brokers: []v1beta1.Broker{{Id: 0}, {Id: 11}, {Id: 102}}, ListenersConfig: v1beta1.ListenersConfig{ @@ -701,12 +703,14 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { expected: append(field.ErrorList{}, field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("externalStartingPort"), int32(8080), invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - "test-external1", int32(8081), int32(8080), []int32{0})), + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+ + "or the ingressControllerTargetPort ('%d') for brokers %v", + "test-external1", int32(8081), int32(8080), int32(29092), []int32{0})), field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("externalStartingPort"), int32(8070), invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+ - "externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v", - "test-external2", int32(8081), int32(8080), []int32{11})), + "externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+ + "or the ingressControllerTargetPort ('%d') for brokers %v", + "test-external2", int32(8081), int32(8080), int32(29092), []int32{11})), ), }, } @@ -716,6 +720,114 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { got := checkExternalListenerStartingPort(&testCase.kafkaClusterSpec) require.Equal(t, testCase.expected, got) }) + } +} +func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpec v1beta1.KafkaClusterSpec + expected field.ErrorList + }{ + { + testName: "valid config: envoy admin port, envoy health-check port, and ingress controller target port are not defined", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + }, + }, + }, + }, + expected: nil, + }, + { + testName: "invalid config: user-specified envoy admin port collides with default envoy health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + AdminPort: util.Int32Pointer(8080), + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8080), + invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"), + ), + }, + { + testName: "invalid config: default envoy admin port collides with user-specified envoy health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + HealthCheckPort: util.Int32Pointer(8081), + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8081), + invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"), + ), + }, + { + testName: "invalid config: user-specified ingress controller target port collided with user-specified envoy admin port and default health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + AdminPort: util.Int32Pointer(29000), + }, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(29000), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(8080), + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(29000), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's admin port"), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8080), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's health-check"+ + " port"), + ), + }, + { + testName: "invalid config: user-specified ingress controller target port collided with default envoy admin port and user-specified health-check port", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + EnvoyConfig: v1beta1.EnvoyConfig{ + HealthCheckPort: util.Int32Pointer(19090), + }, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(19090), + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + IngressControllerTargetPort: util.Int32Pointer(8081), + }, + }, + }, + }, + expected: append(field.ErrorList{}, + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(19090), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's health-check port"), + field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8081), + invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's admin port"), + ), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.testName, func(t *testing.T) { + got := checkTargetPortsCollisionForEnvoy(&testCase.kafkaClusterSpec) + require.Equal(t, testCase.expected, got) + }) } } From 9080d98cfecf40bec7434d96bc1521751d46791e Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Fri, 16 Jun 2023 09:41:46 -0400 Subject: [PATCH 8/9] Update controller test to use StrVal for named target port instead of IntVal --- controllers/tests/kafkacluster_controller_envoy_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/controllers/tests/kafkacluster_controller_envoy_test.go b/controllers/tests/kafkacluster_controller_envoy_test.go index e01b973ca..449959044 100644 --- a/controllers/tests/kafkacluster_controller_envoy_test.go +++ b/controllers/tests/kafkacluster_controller_envoy_test.go @@ -66,7 +66,7 @@ func expectEnvoyLoadBalancer(ctx context.Context, kafkaCluster *v1beta1.KafkaClu Expect(loadBalancer.Spec.Ports[3].Name).To(Equal("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[3].Protocol).To(Equal(corev1.ProtocolTCP)) Expect(loadBalancer.Spec.Ports[3].Port).To(BeEquivalentTo(29092)) - Expect(loadBalancer.Spec.Ports[3].TargetPort.IntVal).To(BeEquivalentTo(29092)) + Expect(loadBalancer.Spec.Ports[3].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[4].Name).To(Equal("tcp-health")) Expect(loadBalancer.Spec.Ports[4].Protocol).To(Equal(corev1.ProtocolTCP)) @@ -391,7 +391,7 @@ func expectEnvoyWithConfigAz1(ctx context.Context, kafkaCluster *v1beta1.KafkaCl Expect(loadBalancer.Spec.Ports[1].Name).To(Equal("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[1].Protocol).To(Equal(corev1.ProtocolTCP)) Expect(loadBalancer.Spec.Ports[1].Port).To(BeEquivalentTo(29092)) - Expect(loadBalancer.Spec.Ports[1].TargetPort.IntVal).To(BeEquivalentTo(29092)) + Expect(loadBalancer.Spec.Ports[1].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[2].Name).To(Equal("tcp-health")) Expect(loadBalancer.Spec.Ports[2].Protocol).To(Equal(corev1.ProtocolTCP)) @@ -601,7 +601,7 @@ func expectEnvoyWithConfigAz2(ctx context.Context, kafkaCluster *v1beta1.KafkaCl Expect(loadBalancer.Spec.Ports[2].Name).To(Equal("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[2].Protocol).To(Equal(corev1.ProtocolTCP)) Expect(loadBalancer.Spec.Ports[2].Port).To(BeEquivalentTo(29092)) - Expect(loadBalancer.Spec.Ports[2].TargetPort.IntVal).To(BeEquivalentTo(29092)) + Expect(loadBalancer.Spec.Ports[2].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker")) Expect(loadBalancer.Spec.Ports[3].Name).To(Equal("tcp-health")) Expect(loadBalancer.Spec.Ports[3].Protocol).To(Equal(corev1.ProtocolTCP)) From 4a44b704eb01e5a4ef45ccc7ec239e4641377c79 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Fri, 16 Jun 2023 14:15:24 -0400 Subject: [PATCH 9/9] Skip target port collision checks for non-LB external access method --- pkg/webhooks/kafkacluster_validator.go | 6 ++++++ pkg/webhooks/kafkacluster_validator_test.go | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index 9a03e1715..f3395b731 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -20,6 +20,7 @@ import ( "emperror.dev/errors" "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -273,6 +274,11 @@ func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.Kafk if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil { for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + // the ingress controller target port only has impact while using LoadBalancer to access the Kafka cluster + if extListener.GetAccessMethod() != corev1.ServiceTypeLoadBalancer { + continue + } + if extListener.GetIngressControllerTargetPort() == ap { errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf( "ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's admin port", extListener.Name) diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 10fef1b42..cbcfe4be0 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" "github.com/banzaicloud/koperator/pkg/util" @@ -745,6 +746,25 @@ func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) { }, expected: nil, }, + { + testName: "valid config: external listeners use non-LoadBalancer access method", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + AccessMethod: corev1.ServiceTypeNodePort, + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"}, + IngressControllerTargetPort: util.Int32Pointer(29000), + }, + { + AccessMethod: corev1.ServiceTypeNodePort, + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"}, + }, + }, + }, + }, + expected: nil, + }, { testName: "invalid config: user-specified envoy admin port collides with default envoy health-check port", kafkaClusterSpec: v1beta1.KafkaClusterSpec{