Skip to content

Commit

Permalink
[source-mysql/mssql] Fix state manager on determining non-resumable s…
Browse files Browse the repository at this point in the history
…treams (#45181)
  • Loading branch information
xiaohansong committed Sep 10, 2024
1 parent b6825ee commit 03584d5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 11 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.12
dockerImageTag: 4.1.13
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
// No special handling for resumable full refresh streams. We will report the cursor as it is.
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;

public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo,
Expand Down Expand Up @@ -61,6 +62,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
this.completedNonResumableFullRefreshStreams = new HashSet<>();

catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
Expand All @@ -70,7 +72,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
Expand All @@ -94,6 +97,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
}
});

completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});

if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
Expand All @@ -119,10 +128,13 @@ private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespac

@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {

final io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
streamsThatHaveCompletedSnapshot.add(pair);
} else if (nonResumableFullRefreshStreams.contains(pair)) {
completedNonResumableFullRefreshStreams.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
Expand All @@ -135,7 +147,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
});

nonResumableFullRefreshStreams.forEach(stream -> {
completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.7.1
dockerImageTag: 3.7.2
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadGlobalStateManager.class);
protected StateManager stateManager;

// Only one global state is emitted, which is fanned out into many entries in the DB by platform. As
Expand All @@ -42,6 +45,7 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan

// non ResumableFullRefreshStreams do not have any state. We only report count for them.
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;

private final boolean savedOffsetStillPresentOnServer;
private final ConfiguredAirbyteCatalog catalog;
Expand Down Expand Up @@ -69,6 +73,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
this.completedNonResumableFullRefreshStreams = new HashSet<>();

catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
Expand All @@ -78,7 +83,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
Expand Down Expand Up @@ -115,6 +121,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
}
});

completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});

if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
Expand All @@ -129,10 +142,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb

@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {
final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
streamsThatHaveCompletedSnapshot.add(pair);
} else if (nonResumableFullRefreshStreams.contains(pair)) {
completedNonResumableFullRefreshStreams.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();

Expand All @@ -146,7 +161,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
});

nonResumableFullRefreshStreams.forEach(stream -> {
completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.13 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 4.1.12 | 2024-09-10 | [45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging. |
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final |
| 3.6.9 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
Expand Down

0 comments on commit 03584d5

Please sign in to comment.