Writing a Simple Producer and Consumer Using ZIO Workflows

Pramod Shehan
4 min readDec 19, 2022

--

Here we are using zio-kafka library to build the kafka producer and consumer.

  "dev.zio" %% "zio-kafka" % "2.0.1"

Serialize and Deserialize Data

In ZIO, we are using Serdes to serialize our data to a byte array and deserialize byte arrays to our data types. ZIO Serde is supporting below mentioned primitive types.

Producer

ZIO Kafka is supporting several produce functions. We can use these fucntions different responsibilities.

Here I am using key and value as String and run this producer every 1 second using ZIO scheduler. That is why I am using Serde.string as keySerializer and valueSerializer.

  private def produce(topic: String,
key: String,
value: String): RIO[Any with Producer, RecordMetadata] =
Producer.produce[Any, String, String](
topic = topic,
key = key,
value = value,
keySerializer = Serde.string,
valueSerializer = Serde.string
)

KAFKA_HOST and KAFKA_TOPIC are defined as environment variables.

I am using random key and random value here.. makeString is implicit class to generate the random string according to the length.

Consumer

ZIO Kafka also has several consumer functions. Here I am using consumeWith function. This function is also using ZIO scheduler to run this one recursively.

Here I am using 3 brokers Kafka cluster and Kafdrop. Kafdrop displays information such as brokers, topics, partitions, consumers, and lets you view messages.

Kafka cluster details
  • Here I am creating topic Test with 3 partitions and 3 replication factors using below command(using inbuilt kafka-topics script).
docker exec kafka1 kafka-topics --create --bootstrap-server 
kafka1:9092 --partitions 3 --replication-factor 3 --topic Test
  • After creating topic, I can see newly created topic on Kafdrop view.
Kafka cluster details
  • Partition details looks like this.
Test topic details

dockerize consumer and producer service

  • Add sbt-native-packager in plugin.sbt
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.9")
  • Add these two in build.sbt
enablePlugins(JavaAppPackaging)
enablePlugins(DockerPlugin)
  • Use below mentioned sbt command to build the docker. It is automatically generated dockerFile and create the docker.
sbt clean compile docker:publishLocal
  • This is the one of generated dockerFile for producer.
generated docker file
  • After creating docker images for consumer and producer, use docker compose to managing all the containers including Kafka cluster and Kafdrop as well.
version: '3'
services:
kafka1:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka1
ports:
- 49092:49092
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093,PLAINTEXT_HOST://0.0.0.0:49092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:49092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
kafka2:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka2
ports:
- 29092:29092
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
kafka3:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka3
ports:
- 39092:39092
environment:
KAFKA_NODE_ID: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka3:9092,CONTROLLER://kafka3:9093,PLAINTEXT_HOST://0.0.0.0:39092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
kafdrop:
image: obsidiandynamics/kafdrop
ports:
- 9001:9000
environment:
KAFKA_BROKERCONNECT: kafka3:9092
JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
zio-kafka-consumer:
image: zio-kafka-consumer:0.1.0-SNAPSHOT
container_name: consumer
environment:
KAFKA_HOST: kafka1:9092
KAFKA_TOPIC: Test
zio-kafka-producer:
image: zio-kafka-producer:0.1.0-SNAPSHOT
container_name: producer
environment:
KAFKA_HOST: kafka1:9092
KAFKA_TOPIC: Test

  • These are the up and running docker containers after executing docker compose up -d.
docker ps command
docker desktop view
  • This is the consumer docker logs. It is printing all the event which produced from the producer.
Key - random string(10)
Value - time --- random string(5)
docker logs consumer
  • We can view the all the events on Kafdrop.

Github project- https://github.com/pramodShehan5/zio-kafka-demo1

References

https://github.com/obsidiandynamics/kafdrop

https://zio.dev/guides/tutorials/producing-consuming-data-from-kafka-topics

--

--