Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got unexpected type byte[], expected type ByteBuffer when deserialising byte array with bytes Codec #594

Open
onepintwig opened this issue Jun 11, 2024 · 0 comments

Comments

@onepintwig
Copy link

onepintwig commented Jun 11, 2024

Hey!

Discovered while working with the fs2-kafka-vulcan module fd4s/fs2-kafka#1330

Using the provided Codec.bytes I get the following error when trying to deserialize a Byte Array.

vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
  at vulcan.AvroException$.apply(AvroException.scala:18)
  at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
  at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
  at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
  at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
  at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
  at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
  at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
  at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
  at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)

I believe it is expected to be a ByteBuffer normally, however I think there is an optimisation in the schema-registry avro-deserializer to return the raw bytes when it detects the schema type is just Bytes. This causes an Array[Byte] to enter the decoder of the codec which does not match any of the available patterns.

I have added an extra match statement into the codecs where Avro.Bytes are expected, to handle this and it seemed to resolve the error. I am keen to raise a PR if this is understood to potentially be an issue.

@onepintwig onepintwig changed the title Got unexpected type byte[], expected type ByteBuffer when deserialising with bytes Codec Got unexpected type byte[], expected type ByteBuffer when deserialising byte array with bytes Codec Jun 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant