In the realm of real-time data streaming and processing, Apache Kafka has emerged as a powerful tool. When combined with Avro, a compact binary format for serializing data, Kafka becomes even more efficient for handling complex data structures, like files. In this guide, we’ll explore how to use Kafka and Avro to seamlessly produce and consume files, and we’ll also integrate a REST API to trigger the Kafka producer.
Avro is a binary serialisation format developed within the Apache Hadoop project. It provides a compact, fast, and schema-driven serialisation system that integrates well with Kafka. Avro’s schema evolution capabilities make it ideal for scenarios where data schemas may evolve over time without breaking compatibility.
Apache Kafka is widely known for its ability to handle high-throughput, fault-tolerant, and scalable message streaming. However, the format in which the data is serialized plays a critical role in its performance and flexibility. This is where Apache Avro comes in as a powerful tool when working with Kafka.
Before diving into producing and consuming files with Kafka and Avro, ensure you have the following setup:
Include the following dependencies in your project:
<dependencies>
<!-- Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- Avro dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.11.0</version>
</dependency>
<!-- Kafka Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.1.1</version>
</dependency>
<!-- Kafka Avro Schema Registry -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>6.1.1</version>
</dependency>
</dependencies>
//To generate an Avro Schema from a Java Object, use AvroMapper:
public static void main(String[] args) throws IOException {
AvroMapper avroMapper = AvroMapper.builder()
.disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
.addModule(new AvroJavaTimeModule())
.build();
createAvroSchemaFromClass(DocumentRecord.class, avroMapper);
createAvroSchemaFromClass(Earth.class, avroMapper);
createAvroSchemaFromClass(Mars.class, avroMapper);
}
private static void createAvroSchemaFromClass(Class<?> clazz, AvroMapper avroMapper) throws IOException {
AvroSchemaGenerator gen = new AvroSchemaGenerator();
gen.enableLogicalTypes();
avroMapper.acceptJsonFormatVisitor(clazz, gen);
AvroSchema schemaWrapper = gen.getGeneratedSchema();
org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
String avroSchemaInJSON = avroSchema.toString(true);
// Write to File
Path fileName = Path.of(clazz.getSimpleName() + ".avsc");
Files.writeString(fileName, avroSchemaInJSON);
}
An Avro schema is written in JSON and defines the structure of your data. For example, here is a simple schema for a document with title and body fields:
{
"type": "record",
"name": "DocumentRecord",
"namespace": "com.example.document",
"fields": [
{
"name": "title",
"type": [
"null",
"string"
],
"default": null,
"doc": "The title of the document"
},
{
"name": "body",
"type": [
"null",
"string"
],
"default": null,
"doc": "The content of the document"
}
]
}
Ensure your application.yml file is configured correctly for Kafka and Avro:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema-registry-url: http://localhost:8081
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema-registry-url: http://localhost:8081
To produce Avro files to Kafka, use the Kafka Producer API along with Avro serialization:
@Autowired
private KafkaTemplate<String, SpecificRecordBase> kafkaTemplate;
public CompletableFuture<SendResult<String, SpecificRecordBase>> sendEvent(String topic,
String key, SpecificRecordBase documentRecord) {
return kafkaTemplate.send(topic, key, documentRecord)
.whenComplete((result, ex) -> {
if (ex != null) {
handleFailure(key, topic, ex);
} else {
handleSuccess(key, topic, result);
}
});
}
private void handleFailure(String key, String topic, Throwable ex) {
// Add error handling logic here (logging, retries, etc.)
log.error("Failed to send message to Kafka - Topic: {}, Key: {}", topic, key, ex);
}
private void handleSuccess(String key, String topic, SendResult<String, SpecificRecordBase> result) {
// Add success handling logic here (e.g., logging)
log.info("Message sent to Kafka - Topic: {}, Partition: {}, Key: {}", topic, result.getRecordMetadata().partition(), key);
}
To provide a way to trigger the Kafka producer through a REST API, you can create a simple Spring Boot controller.
This controller allows you to send a DocumentRecord object to Kafka via a POST request:
@RestController
@RequestMapping("/api/documents")
public class DocumentController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public ResponseEntity<String> sendDocumentToKafka(@RequestBody DocumentRecord documentRecord) {
String topic = "documents-event-topic";
String key = documentRecord.getTitle(); // Using the title as the key
try {
kafkaProducerService.sendEvent(topic, key, documentRecord);
return ResponseEntity.ok("Document sent to Kafka successfully.");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Failed to send document to Kafka: " + e.getMessage());
}
}
}
Here’s an example of the JSON payload you can send via the POST API:
{
"title": "Sample Document",
"body": "This is the content of the sample document."
}
To consume files from Kafka, configure a Kafka consumer to deserialize Avro data:
@Service
public class ConsumerService {
@KafkaListener(topics = "document.event.topic", groupId = "document.event.consumer.group-id")
public void consumeDocumentEvent(ConsumerRecord<String, DocumentRecord> rec) {
log.info(String.format(
"Consumed Document event from topic %s: partition %s key = %-10s",
rec.topic(), rec.partition(), rec.key()));
}
}
Apache Kafka and Avro together provide a robust solution for handling files in real-time data pipelines. Avro’s efficient serialisation and schema evolution capabilities complement Kafka’s scalability and fault-tolerant message delivery, making them ideal for building modern data architectures.
By integrating Kafka with Avro, organisations can achieve faster data processing, better scalability, and seamless integration across diverse data sources and applications. The added REST API allows easy interaction with the Kafka producer, making it simple to send Avro-encoded messages to Kafka from any application.
Comments (0)
No comments found.