Skip to content

Commit

Permalink
implement test framework
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 19, 2024
1 parent 2b8def0 commit 78dc640
Show file tree
Hide file tree
Showing 17 changed files with 799 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ class BufferingOutputConsumer(
messagesIndex = messages.size
newMessages
}

fun resetNewMessagesCursor() {
synchronized(this) { messagesIndex = 0 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.data.AirbyteValue
import io.airbyte.cdk.data.AirbyteValueToJson
import io.airbyte.cdk.data.JsonToAirbyteValue
import io.airbyte.cdk.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.message.CheckpointMessage.Checkpoint
import io.airbyte.cdk.message.CheckpointMessage.Stats
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
Expand Down Expand Up @@ -45,6 +47,21 @@ data class DestinationRecord(
val meta: Meta?,
val serialized: String,
) : DestinationStreamAffinedMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
namespace: String?,
name: String,
data: String,
emittedAtMs: Long,
changes: List<Change>? = null,
) : this(
stream = DestinationStream.Descriptor(namespace, name),
data = JsonToAirbyteValue().convert(Jsons.deserialize(data), ObjectTypeWithoutSchema),
emittedAtMs = emittedAtMs,
meta = Meta(changes),
serialized = "",
)

data class Meta(val changes: List<Change>?) {
fun asProtocolObject(): AirbyteRecordMessageMeta =
AirbyteRecordMessageMeta().also {
Expand Down Expand Up @@ -141,6 +158,23 @@ data class StreamCheckpoint(
override val destinationStats: Stats? = null,
val additionalProperties: Map<String, Any>
) : CheckpointMessage {
/** Convenience constructor, intended for use in tests. */
constructor(
streamNamespace: String?,
streamName: String,
blob: String,
sourceRecordCount: Long,
destinationRecordCount: Long? = null,
) : this(
Checkpoint(
DestinationStream.Descriptor(streamNamespace, streamName),
state = Jsons.deserialize(blob)
),
Stats(sourceRecordCount),
destinationRecordCount?.let { Stats(it) },
additionalProperties = mutableMapOf(),
)

override fun withDestinationStats(stats: Stats) =
StreamCheckpoint(checkpoint, sourceStats, stats, additionalProperties)

Expand Down Expand Up @@ -170,6 +204,11 @@ data class GlobalCheckpoint(
val checkpoints: List<Checkpoint> = emptyList(),
val additionalProperties: MutableMap<String, Any> = mutableMapOf()
) : CheckpointMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
blob: String,
sourceRecordCount: Long,
) : this(state = Jsons.deserialize(blob), Stats(sourceRecordCount))
override fun withDestinationStats(stats: Stats) =
GlobalCheckpoint(state, sourceStats, stats, checkpoints, additionalProperties)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.check

import io.airbyte.cdk.command.ConfigurationJsonObjectBase
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Path
import java.util.regex.Pattern
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

open class CheckIntegrationTest<T : ConfigurationJsonObjectBase>(
val configurationClass: Class<T>,
val successConfigFilenames: List<String>,
val failConfigFilenamesAndFailureReasons: Map<String, Pattern>,
) :
IntegrationTest(
FakeDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
) {
@Test
open fun testSuccessConfigs() {
for (path in successConfigFilenames) {
val fileContents = Files.readString(Path.of("secrets", path), StandardCharsets.UTF_8)
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess("check", config = config)
process.run()
val messages = process.readMessages()
val checkMessages = messages.filter { it.type == AirbyteMessage.Type.CONNECTION_STATUS }

assertEquals(
checkMessages.size,
1,
"Expected to receive exactly one connection status message, but got ${checkMessages.size}: $checkMessages"
)
assertEquals(
AirbyteConnectionStatus.Status.SUCCEEDED,
checkMessages.first().connectionStatus.status
)
}
}

@Test
open fun testFailConfigs() {
for ((path, failurePattern) in failConfigFilenamesAndFailureReasons) {
val fileContents = Files.readString(Path.of(path))
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess("check", config = config)
process.run()
val messages = process.readMessages()
val checkMessages = messages.filter { it.type == AirbyteMessage.Type.CONNECTION_STATUS }

assertEquals(
checkMessages.size,
1,
"Expected to receive exactly one connection status message, but got ${checkMessages.size}: $checkMessages"
)

val connectionStatus = checkMessages.first().connectionStatus
assertAll(
{ assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.status) },
{
assertTrue(
failurePattern.matcher(connectionStatus.message).matches(),
"Expected to match ${failurePattern.pattern()}, but got ${connectionStatus.message}"
)
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

fun interface DestinationCleaner {
/**
* Search the test destination for old test data and delete it. This should leave recent data
* (e.g. from the last week) untouched, to avoid causing failures in actively-running tests.
*/
fun cleanup()
}

object NoopDestinationCleaner : DestinationCleaner {
override fun cleanup() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

fun interface DestinationDataDumper {
fun dumpRecords(
streamName: String,
streamNamespace: String?,
): List<OutputRecord>
}

/**
* Some integration tests don't need to actually read records from the destination, and can use this
* implementation to satisfy the compiler.
*/
object FakeDataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.command.CliRunnable
import io.airbyte.cdk.command.CliRunner
import io.airbyte.cdk.command.ConfigurationJsonObjectBase
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.io.PrintWriter
import javax.inject.Singleton

private val logger = KotlinLogging.logger {}

/**
* Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker
* container. The general lifecycle is:
* 1. `val dest = DestinationProcessFactory.createDestinationProcess(...)`
* 2. `launch { dest.run() }`
* 3. [sendMessage] as many times as you want
* 4. [readMessages] as needed (e.g. to check that state messages are emitted during the sync)
* 5. [shutdown] once you have no more messages to send to the destination
*/
interface DestinationProcess {
/**
* Run the destination process. Callers who want to interact with the destination should
* `launch` this method.
*/
fun run()

fun sendMessage(message: AirbyteMessage)

/** Return all messages the destination emitted since the last call to [readMessages]. */
fun readMessages(): List<AirbyteMessage>

/**
* Wait for the destination to terminate, then return all messages it emitted since the last
* call to [readMessages].
*/
fun shutdown()
}

interface DestinationProcessFactory {
fun createDestinationProcess(
command: String,
config: ConfigurationJsonObjectBase? = null,
catalog: ConfiguredAirbyteCatalog? = null,
): DestinationProcess
}

class NonDockerizedDestination(
command: String,
config: ConfigurationJsonObjectBase?,
catalog: ConfiguredAirbyteCatalog?,
) : DestinationProcess {
private val destinationStdinPipe: PrintWriter
private val destination: CliRunnable

init {
val destinationStdin = PipedInputStream()
// This could probably be a channel, somehow. But given the current structure,
// it's easier to just use the pipe stuff.
destinationStdinPipe = PrintWriter(PipedOutputStream(destinationStdin))
destination =
CliRunner.destination(
command,
config = config,
catalog = catalog,
inputStream = destinationStdin,
)
}

override fun run() {
destination.run()
}

override fun sendMessage(message: AirbyteMessage) {
destinationStdinPipe.println(Jsons.serialize(message))
}

override fun readMessages(): List<AirbyteMessage> = destination.results.newMessages()

override fun shutdown() {
destinationStdinPipe.close()
}
}

// Notably, not actually a Micronaut factory. We want to inject the actual
// factory into our tests, not a pre-instantiated destination, because we want
// to run multiple destination processes per test.
// TODO only inject this when not running in CI, a la @Requires(notEnv = "CI_master_merge")
@Singleton
class NonDockerizedDestinationFactory : DestinationProcessFactory {
override fun createDestinationProcess(
command: String,
config: ConfigurationJsonObjectBase?,
catalog: ConfiguredAirbyteCatalog?
): DestinationProcess {
return NonDockerizedDestination(command, config, catalog)
}
}

// TODO define a factory for this class + @Require(env = CI_master_merge)
class DockerizedDestination(
command: String,
config: JsonNode?,
catalog: ConfiguredAirbyteCatalog?,
) : DestinationProcess {
override fun run() {
TODO("launch a docker container")
}

override fun sendMessage(message: AirbyteMessage) {
// push a message to the docker process' stdin
TODO("Not yet implemented")
}

override fun readMessages(): List<AirbyteMessage> {
// read everything from the process' stdout
TODO("Not yet implemented")
}

override fun shutdown() {
// close stdin, wait until process exits
TODO("Not yet implemented")
}
}

// This is currently unused, but we'll need it for the Docker version.
// it exists right now b/c I wrote it prior to the CliRunner retooling.
/**
* There doesn't seem to be a built-in equivalent to this? Scanner and BufferedReader both have
* `hasNextLine` methods which block until the stream has data to read, which we don't want to do.
*
* This class simply buffers the next line in-memory until it reaches a newline or EOF.
*/
private class LazyInputStreamReader(private val input: InputStream) {
private val buffer: ByteArrayOutputStream = ByteArrayOutputStream()
private var eof = false

/**
* Returns the next line of data, or null if no line is available. Doesn't block if the
* inputstream has no data.
*/
fun nextLine(): MaybeLine {
if (eof) {
return NoLine.EOF
}
while (input.available() != 0) {
when (val read = input.read()) {
-1 -> {
eof = true
val line = Line(buffer.toByteArray().toString(Charsets.UTF_8))
buffer.reset()
return line
}
'\n'.code -> {
val bytes = buffer.toByteArray()
buffer.reset()
return Line(bytes.toString(Charsets.UTF_8))
}
else -> {
buffer.write(read)
}
}
}
return NoLine.NOT_YET_AVAILABLE
}

companion object {
interface MaybeLine
enum class NoLine : MaybeLine {
EOF,
NOT_YET_AVAILABLE
}
data class Line(val line: String) : MaybeLine
}
}
Loading

0 comments on commit 78dc640

Please sign in to comment.