Skip to content

Commit

Permalink
stream: slot and FES should not be created if the publication creatio…
Browse files Browse the repository at this point in the history
…n fails (#2704)

* slot should not be created if the publication creation fails
* not create FES resource when slot doesn't exist
  • Loading branch information
idanovinda committed Aug 2, 2024
1 parent 31f474a commit 94d3632
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
26 changes: 26 additions & 0 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2041,6 +2041,20 @@ def test_stream_resources(self):
"recoveryEventType": "test-event-dlq"
}
}
},
{
"applicationId": "test-app2",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_non_exist_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
]
}
Expand All @@ -2064,6 +2078,18 @@ def test_stream_resources(self):
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5)

# check if the non-existing table in the stream section does not create a publication and slot
get_publication_query_not_exist_table = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
"""
get_slot_query_not_exist_table = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
"Publication is created for non-existing tables", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
"Replication slot is created for non-existing tables", 10, 5)

# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
Expand Down
29 changes: 20 additions & 9 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
} else if currentTables != tableList {
alterPublications[slotName] = tableList
}
(*slotsToSync)[slotName] = slotAndPublication.Slot
}

// check if there is any deletion
Expand All @@ -148,24 +147,30 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil
}

var errorMessage error = nil
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
return fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
return fmt.Errorf("update of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for _, publicationName := range deletePublications {
(*slotsToSync)[publicationName] = nil
if err = c.executeDropPublication(publicationName); err != nil {
return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = nil
}

return nil
return errorMessage
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -390,15 +395,15 @@ func (c *Cluster) syncStreams() error {
}

// finally sync stream CRDs
err = c.createOrUpdateStreams()
err = c.createOrUpdateStreams(slotsToSync)
if err != nil {
return err
}

return nil
}

func (c *Cluster) createOrUpdateStreams() error {
func (c *Cluster) createOrUpdateStreams(createdSlots map[string]map[string]string) error {

// fetch different application IDs from streams section
// there will be a separate event stream resource for each ID
Expand All @@ -413,7 +418,7 @@ func (c *Cluster) createOrUpdateStreams() error {
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
}

for _, appId := range appIds {
for idx, appId := range appIds {
streamExists := false

// update stream when it exists and EventStreams array differs
Expand All @@ -435,6 +440,12 @@ func (c *Cluster) createOrUpdateStreams() error {
}

if !streamExists {
// check if there is any slot with the applicationId
slotName := getSlotName(c.Spec.Streams[idx].Database, appId)
if _, exists := createdSlots[slotName]; !exists {
c.logger.Warningf("no slot %s with applicationId %s exists, skipping event stream creation", slotName, appId)
continue
}
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
streamCRD, err := c.createStreams(appId)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))

fakeCreatedSlots map[string]map[string]string = map[string]map[string]string{
slotName: {},
}

pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{
Kind: "Postgresql",
Expand Down Expand Up @@ -222,7 +226,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

// create the streams
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// compare generated stream with expected stream
Expand All @@ -248,7 +252,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
}

// sync streams once again
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
Expand Down Expand Up @@ -397,7 +401,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

// now create the stream
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// change specs of streams and patch CRD
Expand All @@ -419,7 +423,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// compare stream returned from API with expected stream
Expand Down Expand Up @@ -448,7 +452,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

result = cluster.generateFabricEventStream(appId)
Expand All @@ -466,7 +470,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgUpdated.Spec
cluster.createOrUpdateStreams()
cluster.createOrUpdateStreams(fakeCreatedSlots)

streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streamList.Items) > 0 || err != nil {
Expand Down

0 comments on commit 94d3632

Please sign in to comment.