-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destinations CDK: better integration tests #45113
base: edgao/buffering_output_consumer_tracks_message_consumption
Are you sure you want to change the base?
Destinations CDK: better integration tests #45113
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
da9d896
to
7e3fa25
Compare
b0b856d
to
8d9871f
Compare
7e3fa25
to
48ca85d
Compare
8d9871f
to
228765c
Compare
236be41
to
70b9a9c
Compare
@@ -55,16 +55,16 @@ allprojects { | |||
sourceCompatibility = JavaVersion.VERSION_21 | |||
targetCompatibility = JavaVersion.VERSION_21 | |||
compileJava { | |||
options.compilerArgs += ["-Werror", "-Xlint:all,-serial,-processing"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revert this before merging, this was just driving me nuts locally
import org.junit.jupiter.api.Disabled; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class TestingSilentDestinationAcceptanceTest extends DestinationAcceptanceTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't check what these two classes are doing, but will revive these tests before merging this PR
70b9a9c
to
6384e77
Compare
command, | ||
config = config, | ||
catalog = catalog, | ||
// TODO is this really the right way to achieve this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnny-schmidt before I go too far down this path - this (+the related plumbing code in CliRunner + AirbyteConnectorRunner) seems like a nicer way to inject custom beans to the destination? I.e. this way, test authors don't need to define a new InputStreamFactory + do property dancing
but lmk if you have a different idea for doing this
(this does have the slightly unfortunate result that in E2EDestination, we have to do .run(args = args)
instead of .run(*args)
, b/c of varargs being a whiny child)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the idea is that we pass in config & catalog explicitly, would it make more sense to provide the input data explicitly also? Just wrap it in an serializing InputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maaaybe, though I'm leaning towards no. There are tests that want to interleave messages and test logic, a la
sendSomeRecords()
verifySomething()
sendMoreRecords()
endSync()
verifySomethingElse()
At which point I'd rather wrap it in the destination process via the sendMessage
function, rather than exposing the pipes to callers? I'm imagining something along these lines, which doesn't feel great?
val pipe = PipedOutputStream()
val pipeInputStream = PipedInputStream(pipe)
val dest = DestinationProcess(pipeInputStream)
messages1.each { pipe.write(serialize(it)) }
verifySomething()
messages2.each { pipe.write(serialize(it)) }
// flush all messages
pipe.close()
// this will magically cause the destination to shutdown
pipeInputStream.close()
plus I think we get most of the benefit by providing a utility method wrapping the destination process?
fun runSync(factory, config, catalog, messages) {
val dest = factory.start(config, catalog)
messages.each { dest.sendMessage(it) }
return dest.readMessages()
}
48ca85d
to
26b418a
Compare
e476fcc
to
617d533
Compare
26b418a
to
1930d44
Compare
ab66980
to
70220ef
Compare
1930d44
to
e0b98f1
Compare
70220ef
to
bde7d82
Compare
e0b98f1
to
0feb717
Compare
0b18a1a
to
7eedfd8
Compare
0250ca4
to
50234f4
Compare
dfd289a
to
cc112ac
Compare
50234f4
to
28207e7
Compare
f8e49cf
to
4cdec03
Compare
} | ||
|
||
@Test | ||
open fun testBasicWrite() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test case is mostly here to (a) be a quick smoke test on our state handling, and (b) show the typical interaction with DestinationProcess
. Most tests we'll actually write don't need any of the state stuff, and would look more like:
runSync(config, stream, listOf(records...))
dumpAndDiffRecords(...)
// (potentially more syncs, if we're e.g. testing refreshes)
28207e7
to
1508fa6
Compare
c2f4b23
to
30b24fb
Compare
@@ -1,4 +1,5 @@ | |||
plugins { | |||
id 'airbyte-java-connector' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a hack. We get the integrationTestJava
task from this plugin. https://github.com/airbytehq/airbyte-internal-issues/issues/9864 will fix.
30b24fb
to
43cbd5c
Compare
1508fa6
to
a30248e
Compare
c36924e
to
cd7a833
Compare
a30248e
to
2b8def0
Compare
cd7a833
to
78dc640
Compare
2b8def0
to
e905aed
Compare
78dc640
to
2d3af61
Compare
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) | ||
.format(DateTimeFormatter.ofPattern("YYYYMMDD")) | ||
// stream name doesn't need to be randomized, only the namespace. | ||
val randomizedNamespace = "test$timestampString$randomSuffix" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we never test the null namespace case?
|
||
package io.airbyte.cdk.test.util | ||
|
||
fun interface DestinationCleaner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dq, why is the cleaner its own thing and not an entry point into the process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks solid to me. If you want to merge it down into e2e I'll start working on the final productionalization.
I think overall we'll want to iterate on these interfaces (mine and yours) and work out how we want to handle configs, injection in general, mapping to and from schemas and records, all that. But we can learn as we go.
Implement the test framework, write a minimal set of base tests, and implement those tests for destination-e2e-test. I stuck with our existing abstract class + concrete per-destination implementation strategy, because:
override testFoo() { super.testFoo() }
declarations. But even without that, we still get the test case.general review guide: