Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information for LZ4Exception during compression #293

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.annotation.InternalApi

import javax.crypto.Cipher
import javax.crypto.spec.{GCMParameterSpec, SecretKeySpec}
import net.jpountz.lz4.LZ4Factory
import net.jpountz.lz4.{LZ4Exception, LZ4Factory}

import scala.collection.mutable

Expand Down Expand Up @@ -90,10 +90,14 @@ class LZ4KryoCompressor extends Transformer {
override def toBinary(inputBuff: Array[Byte], outputBuff: ByteBuffer): Unit = {
val inputSize = inputBuff.length
val lz4 = lz4factory.fastCompressor
lz4.maxCompressedLength(inputSize)
// encode 32 bit length in the first bytes
outputBuff.order(ByteOrder.LITTLE_ENDIAN).putInt(inputSize)
lz4.compress(ByteBuffer.wrap(inputBuff), outputBuff)
try {
lz4.compress(ByteBuffer.wrap(inputBuff), outputBuff)
} catch {
case e: LZ4Exception =>
throw new RuntimeException(s"Compression failed for input buffer size: ${inputBuff.length} and output buffer size: ${outputBuff.capacity()}", e)
}
}

override def fromBinary(inputBuff: Array[Byte]): Array[Byte] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[Act

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = {
val path = input.readString()
println("deserializing actor ref " + typ + "for path: " + path)
system.provider.resolveActorRef(path)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = {
println("serializing actor ref " + obj)
output.writeAscii(Serialization.serializedActorPath(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.altoo.akka.serialization.kryo

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.ByteBuffer

class LZ4KryoCompressorTest extends AnyFlatSpec with Matchers {

it should "serialize short message" in {
// arrange
val compressor = new LZ4KryoCompressor()
val data = Seq[Byte](0, 1, 2, 3, 4, 5, 6, 7, 8, 9).toArray
val buffer = ByteBuffer.allocateDirect(2* data.length)

// act & assert
compressor.toBinary(data, buffer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.altoo.testing

import akka.actor.ActorRef

case class SampleMessage(actorRef: ActorRef) extends Serializable