Skip to content

Commit

Permalink
Add checks in validation webhook to prevent container ports from coll…
Browse files Browse the repository at this point in the history
…iding with others
  • Loading branch information
panyuenlau committed Jun 16, 2023
1 parent e773b2a commit fc40df3
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 16 deletions.
16 changes: 8 additions & 8 deletions pkg/webhooks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
)

const (
cantConnectErrorMsg = "failed to connect to kafka cluster"
cantConnectAPIServerMsg = "failed to connect to Kubernetes API server"
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"
invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number"
invalidExternalListenerTargetPortErrMsg = "invalid external listener ingress controller target port number"
cantConnectErrorMsg = "failed to connect to kafka cluster"
cantConnectAPIServerMsg = "failed to connect to Kubernetes API server"
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"
invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number"
invalidContainerPortForIngressControllerErrMsg = "invalid trarget port number for ingress controller deployment"

// errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors
errorDuringValidationMsg = "error during validation"
Expand Down
50 changes: 47 additions & 3 deletions pkg/webhooks/kafkacluster_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpe

allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...)

allErrs = append(allErrs, checkTargetPortsCollisionForEnvoy(kafkaClusterSpec)...)

return allErrs
}

Expand Down Expand Up @@ -224,13 +226,17 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk
outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id)
}

if externalPort == extListener.GetIngressControllerTargetPort() {
collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id)
}

if kafkaClusterSpec.GetIngressController() == "envoy" {
if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() {
collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id)
}
}
}

if len(outOfRangeBrokerIDs) > 0 {
errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, outOfRangeBrokerIDs)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg)
Expand All @@ -239,11 +245,49 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk

if len(collidingPortsBrokerIDs) > 0 {
errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+
"externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v",
extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), collidingPortsBrokerIDs)
"externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), or the ingressControllerTargetPort ('%d') for brokers %v",
extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), extListener.GetIngressControllerTargetPort(), collidingPortsBrokerIDs)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg)
allErrs = append(allErrs, fldErr)
}
}
return allErrs
}

// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment
func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList {
if kafkaClusterSpec.GetIngressController() != "envoy" {
return nil
}

var allErrs field.ErrorList

ap := kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort()
hcp := kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort()

if ap == hcp {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": The envoy configuration uses an admin port number that collides with the health-check port number"
fldErr := field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), errmsg)
allErrs = append(allErrs, fldErr)
}

if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil {
for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners {
if extListener.GetIngressControllerTargetPort() == ap {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf(
"ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's admin port", extListener.Name)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg)
allErrs = append(allErrs, fldErr)
}

if extListener.GetIngressControllerTargetPort() == hcp {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf(
"ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's health-check port", extListener.Name)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg)
allErrs = append(allErrs, fldErr)
}
}
}

return allErrs
}
122 changes: 117 additions & 5 deletions pkg/webhooks/kafkacluster_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"testing"

"github.com/banzaicloud/koperator/pkg/util"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/validation/field"

Expand Down Expand Up @@ -682,7 +683,7 @@ func TestCheckExternalListenerStartingPort(t *testing.T) {
),
},
{
testName: "invalid config: brokers with in-range external port numbers, but they are colliding with the envoy ports",
testName: "invalid config: brokers with in-range external port numbers, but they collide with the envoy ports",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{{Id: 0}, {Id: 11}, {Id: 102}},
ListenersConfig: v1beta1.ListenersConfig{
Expand All @@ -701,12 +702,14 @@ func TestCheckExternalListenerStartingPort(t *testing.T) {
expected: append(field.ErrorList{},
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("externalStartingPort"), int32(8080),
invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+
"externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v",
"test-external1", int32(8081), int32(8080), []int32{0})),
"externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+
"or the ingressControllerTargetPort ('%d') for brokers %v",
"test-external1", int32(8081), int32(8080), int32(29092), []int32{0})),
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("externalStartingPort"), int32(8070),
invalidExternalListenerStartingPortErrMsg+": "+fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+
"externalStartingPort + Broker ID) that are colliding with either the envoy admin port (default '%d') or the envoy health-check port (default '%d') for brokers %v",
"test-external2", int32(8081), int32(8080), []int32{11})),
"externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), "+
"or the ingressControllerTargetPort ('%d') for brokers %v",
"test-external2", int32(8081), int32(8080), int32(29092), []int32{11})),
),
},
}
Expand All @@ -719,3 +722,112 @@ func TestCheckExternalListenerStartingPort(t *testing.T) {

}
}

func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) {
testCases := []struct {
testName string
kafkaClusterSpec v1beta1.KafkaClusterSpec
expected field.ErrorList
}{
{
testName: "valid config: envoy admin port, envoy health-check port, and ingress controller target port are not defined",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
ListenersConfig: v1beta1.ListenersConfig{
ExternalListeners: []v1beta1.ExternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"},
},
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"},
},
},
},
},
expected: nil,
},
{
testName: "invalid config: user-specified envoy admin port collides with default envoy health-check port",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
EnvoyConfig: v1beta1.EnvoyConfig{
AdminPort: util.Int32Pointer(8080),
},
},
expected: append(field.ErrorList{},
field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8080),
invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"),
),
},
{
testName: "invalid config: default envoy admin port collides with user-specified envoy health-check port",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
EnvoyConfig: v1beta1.EnvoyConfig{
HealthCheckPort: util.Int32Pointer(8081),
},
},
expected: append(field.ErrorList{},
field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), int32(8081),
invalidContainerPortForIngressControllerErrMsg+": The envoy configuration uses an admin port number that collides with the health-check port number"),
),
},
{
testName: "invalid config: user-specified ingress controller target port collided with user-specified envoy admin port and default health-check port",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
EnvoyConfig: v1beta1.EnvoyConfig{
AdminPort: util.Int32Pointer(29000),
},
ListenersConfig: v1beta1.ListenersConfig{
ExternalListeners: []v1beta1.ExternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"},
IngressControllerTargetPort: util.Int32Pointer(29000),
},
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"},
IngressControllerTargetPort: util.Int32Pointer(8080),
},
},
},
},
expected: append(field.ErrorList{},
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(29000),
invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's admin port"),
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8080),
invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's health-check"+
" port"),
),
},
{
testName: "invalid config: user-specified ingress controller target port collided with default envoy admin port and user-specified health-check port",
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
EnvoyConfig: v1beta1.EnvoyConfig{
HealthCheckPort: util.Int32Pointer(19090),
},
ListenersConfig: v1beta1.ListenersConfig{
ExternalListeners: []v1beta1.ExternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external1"},
IngressControllerTargetPort: util.Int32Pointer(19090),
},
{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "test-external2"},
IngressControllerTargetPort: util.Int32Pointer(8081),
},
},
},
},
expected: append(field.ErrorList{},
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressControllerTargetPort"), int32(19090),
invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external1' uses an ingress controller target port number that collides with the envoy's health-check port"),
field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(1).Child("ingressControllerTargetPort"), int32(8081),
invalidContainerPortForIngressControllerErrMsg+": ExternalListener 'test-external2' uses an ingress controller target port number that collides with the envoy's admin port"),
),
},
}

for _, testCase := range testCases {
t.Run(testCase.testName, func(t *testing.T) {
got := checkTargetPortsCollisionForEnvoy(&testCase.kafkaClusterSpec)
require.Equal(t, testCase.expected, got)
})
}
}

0 comments on commit fc40df3

Please sign in to comment.