[Spring, Kafka] Spring-Kafka Deserialization
Deserialization
- Object class (empty constructor is needed)
public class MyObject {
...
MyObject() {
}
...
}
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
public class MyDeserializer implements Deserializer {
@Override
public Object deserialize(String topic, byte[] data) {
try {
ObjectMapper mapper = new ObjectMapper();
MyObject myObject = null;
try {
parsingResult = mapper.readValue(data, MyObject.class);
} catch (Exception e) {
e.printStackTrace();
}
return myObject;
} catch (Exception e) {
throw new SerializationException("SerializationException");
}
}
}
spring.kafka.consumer.value-deserializer=com.example.MyDeserializer
@KafkaListener(
topics = "my-topic",
groupId = "my-consumer-group-id"
)
public void recordListener(ConsumerRecord<String, MyObject> record) {
repository.save(record.value());
}
2.
Comments