Skip to content

Commit

Permalink
BOC can track message consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 16, 2024
1 parent 10cea0b commit 50234f4
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BufferingOutputConsumer(
private val catalogs = mutableListOf<AirbyteCatalog>()
private val traces = mutableListOf<AirbyteTraceMessage>()
private val messages = mutableListOf<AirbyteMessage>()
private var messagesIndex: Int = 0

var callback: (AirbyteMessage) -> Unit = {}
set(value) {
Expand Down Expand Up @@ -79,4 +80,11 @@ class BufferingOutputConsumer(
fun traces(): List<AirbyteTraceMessage> = synchronized(this) { listOf(*traces.toTypedArray()) }

fun messages(): List<AirbyteMessage> = synchronized(this) { listOf(*messages.toTypedArray()) }

fun newMessages(): List<AirbyteMessage> =
synchronized(this) {
val newMessages = messages.subList(messagesIndex, messages.size)
messagesIndex = messages.size
newMessages
}
}

0 comments on commit 50234f4

Please sign in to comment.