Persistence
Persistence modes
TBD.
Compression
Kafka-flow has a built-in support for compressing application's state
when it's being persisted. This can be achieved by creating an instance of Compressor
and enhancing a user-defined instance of ToBytes[F, State]
with it
via a syntax extension. Additionally, you need to provide instances of
ToBytes
and FromBytes
to encode/decode a Header
which contains
meta-information about compressed data.
The example below illustrates the approach. Note that it's using a simplified approach towards encoding both state and headers, and you may want to encode them differently (as JSON for example).
import cats.effect.IO
import com.evolutiongaming.kafka.flow.persistence.compression.{Compressor, Header}
import com.evolutiongaming.skafka.{FromBytes, ToBytes}
import com.evolutiongaming.kafka.flow.persistence.compression.CompressorSyntax._
import scodec.bits.BitVector
import scodec.codecs.{bool, int32}
// Application's state
final case class State(int: Int)
// Encoder of the application's state
val toBytes: ToBytes[IO, State] = (state, _) =>
IO.fromTry(int32.encode(state.int).map(_.toByteArray).toTry)
// Encoder/decoder of metainformation header
implicit val headerToBytes: ToBytes[IO, Header] =
(header, _) => IO.fromTry(bool.encode(header.compressed).map(_.toByteArray).toTry)
implicit val headerFromBytes: FromBytes[IO, Header] =
(bytes, _) => IO.fromTry(bool.decode(BitVector(bytes)).map(result => Header(result.value)).toTry)
// Resulting instance can be passed to other parts of kafka-flow's API
for {
compressor <- Compressor.of[IO](compressionThreshold = 10000)
toBytesWithCompression = toBytes.withCompression(compressor)
} yield ()
Compression metrics
Compressor
can report metrics of a size of data before and after compression. Metrics support is available as a part
of FlowMetrics
API from kafka-flow-metrics
module in form of FlowMetrics#compressorMetrics(component)
where component
is the name of the label that will be used for metrics of this compressor.
The following metrics are reported:
compressor_raw_bytes
- the size of state before compressingcompressor_compressed_bytes
- the size of compressed state (including library-added meta-information)
Note: these metrics had a _total
suffix in earlier versions.
Starting with prometheus-metrics
v1.0.0 this suffix is no longer allowed and has therefore been removed.
Users of simpleclient
forked version 0.9.999-evo1
will see a change in the metric name, since the _total
suffix is not automatically added in that version.
import cats.effect.syntax.resource._
import com.evolutiongaming.kafka.flow.FlowMetrics
import com.evolutiongaming.kafka.flow.metrics.syntax._
import com.evolutiongaming.smetrics.CollectorRegistry
val registry: CollectorRegistry[IO] = CollectorRegistry.empty[IO]
for {
flowMetrics <- FlowMetrics.of(registry)
compressor <-
Compressor
.of[IO](compressionThreshold = 10000)
.map(_.withMetrics(flowMetrics.compressorMetrics("settlement")))
.toResource
toBytesWithCompression = toBytes.withCompression(compressor)
} yield ()
Backward compatibility
To support smooth transition from raw state to using compression, the library implementation of Compressor
tries
to detect if the byte array it tries to decompress starts with an opening curly bracket({
). In this case it makes
an assumption that you keep the state in JSON and the particular byte array is in a raw format (without compression)
and doesn't attempt to decompress the byte array, returning it as-is.
Please note that it's going to work only if the application's state was encoded as JSON before!