Kafka Flow

Kafka Flow

  • Overview
  • Setup
  • Help

›Kafka-Flow

Kafka-Flow

  • Overview
  • Setup
  • FAQ
  • Style Guide
  • Persistence

Persistence

Persistence modes

TBD.

Compression

Kafka-flow has a built-in support for compressing application's state when it's being persisted. This can be achieved by creating an instance of Compressor and enhancing a user-defined instance of ToBytes[F, State] with it via a syntax extension. Additionally, you need to provide instances of ToBytes and FromBytes to encode/decode a Header which contains meta-information about compressed data.

The example below illustrates the approach. Note that it's using a simplified approach towards encoding both state and headers, and you may want to encode them differently (as JSON for example).

import cats.effect.IO
import com.evolutiongaming.kafka.flow.persistence.compression.{Compressor, Header}
import com.evolutiongaming.skafka.{FromBytes, ToBytes}
import com.evolutiongaming.kafka.flow.persistence.compression.CompressorSyntax._
import scodec.bits.BitVector
import scodec.codecs.{bool, int32}

// Application's state
final case class State(int: Int)

// Encoder of the application's state
val toBytes: ToBytes[IO, State] = (state, _) =>
  IO.fromTry(int32.encode(state.int).map(_.toByteArray).toTry)

// Encoder/decoder of metainformation header
implicit val headerToBytes: ToBytes[IO, Header] =
  (header, _) => IO.fromTry(bool.encode(header.compressed).map(_.toByteArray).toTry)
implicit val headerFromBytes: FromBytes[IO, Header] =
  (bytes, _) => IO.fromTry(bool.decode(BitVector(bytes)).map(result => Header(result.value)).toTry)
  
// Resulting instance can be passed to other parts of kafka-flow's API
for {
  compressor <- Compressor.of[IO](compressionThreshold = 10000)
  toBytesWithCompression = toBytes.withCompression(compressor)
} yield ()

Compression metrics

Compressor can report metrics of a size of data before and after compression. Metrics support is available as a part of FlowMetrics API from kafka-flow-metrics module in form of FlowMetrics#compressorMetrics(component) where component is the name of the label that will be used for metrics of this compressor.
The following metrics are reported:

  • compressor_raw_bytes - the size of state before compressing
  • compressor_compressed_bytes - the size of compressed state (including library-added meta-information)

Note: these metrics had a _total suffix in earlier versions. Starting with prometheus-metrics v1.0.0 this suffix is no longer allowed and has therefore been removed. Users of simpleclient forked version 0.9.999-evo1 will see a change in the metric name, since the _total suffix is not automatically added in that version.

import cats.effect.syntax.resource._
import com.evolutiongaming.kafka.flow.FlowMetrics
import com.evolutiongaming.kafka.flow.metrics.syntax._
import com.evolutiongaming.smetrics.CollectorRegistry

val registry: CollectorRegistry[IO] = CollectorRegistry.empty[IO]
for {
  flowMetrics <- FlowMetrics.of(registry)
  compressor <-
    Compressor
      .of[IO](compressionThreshold = 10000)
      .map(_.withMetrics(flowMetrics.compressorMetrics("settlement")))
      .toResource
  toBytesWithCompression = toBytes.withCompression(compressor)
} yield ()

Backward compatibility

To support smooth transition from raw state to using compression, the library implementation of Compressor tries to detect if the byte array it tries to decompress starts with an opening curly bracket({). In this case it makes an assumption that you keep the state in JSON and the particular byte array is in a raw format (without compression) and doesn't attempt to decompress the byte array, returning it as-is.
Please note that it's going to work only if the application's state was encoded as JSON before!

← Style Guide
  • Persistence modes
  • Compression
    • Compression metrics
    • Backward compatibility
Kafka Flow
Docs
OverviewSetupSources
Community
Twitter
More
GitHubStar
Copyright © 2025 Evolution Gaming