[Spring, Kafka] Spring-Kafka Deserialization

Deserialization

1. Byte array(converted from JSON formatted string) to Java object with custom deserializer

  • Object class (empty constructor is needed)
public class MyObject {
    ...
    
    MyObject() {
    }
    
    ...
}
  • Custom deserializer
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class MyDeserializer implements Deserializer {
    @Override
    public Object deserialize(String topic, byte[] data) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            MyObject myObject = null;
            try {
                parsingResult = mapper.readValue(data, MyObject.class);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return myObject;
        } catch (Exception e) {
            throw new SerializationException("SerializationException");
        }
    }
}
  • application.properties
spring.kafka.consumer.value-deserializer=com.example.MyDeserializer
  • Consumer
@KafkaListener(
        topics = "my-topic",
        groupId = "my-consumer-group-id"
)
public void recordListener(ConsumerRecord<String, MyObject> record) {
    repository.save(record.value());
}

2.

Comments