KLogic
Message Inspection Guide

Debugging Data Pipelines with Kafka Message Inspector

When a data pipeline misbehaves, the fastest path to root cause is inspecting the actual messages on the wire. This guide covers browsing raw and schema-encoded messages, filtering techniques, and the scenarios where message inspection saves hours of debugging.

Published: January 3, 2025 • 9 min read • Debugging & Observability

When Message Inspection Saves the Day

Metrics tell you something is wrong. Message inspection tells you exactly what is wrong. These are the scenarios where browsing actual Kafka messages cuts debugging time from hours to minutes:

Consumer deserialization errors

A consumer crashes with SerializationException. Inspect the raw bytes and decode with the correct schema to identify whether a field type changed or the wrong serializer is configured.

Missing or duplicate messages

A downstream system reports missing orders. Browse the Kafka topic to verify whether the producer wrote them, then check which offset the consumer last committed.

Unexpected null fields

A downstream database has null in a required column. Inspect the Kafka messages to determine whether the null originated at the producer or was introduced by a transform.

Schema version mismatch

A Kafka Connect sink is failing with schema compatibility errors. View the schema ID in the message header and compare it against the Schema Registry to find the conflicting version.

Message Browsing via CLI

Basic Message Consumption

# Read from the beginning of a topic (JSON messages)
kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --from-beginning \
  --max-messages 20 \
  --property print.key=true \
  --property key.separator=" | " \
  --property print.timestamp=true \
  --property print.headers=true

# Sample output:
# CreateTime:1706745600000 | order-uuid-123 | {"id":"order-uuid-123","amount":99.99,"status":"pending"}

# Read a specific offset range
kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --partition 3 \
  --offset 42000 \
  --max-messages 100

Decoding Avro Messages

# Confluent Schema Registry Avro consumer
kafka-avro-console-consumer \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --topic orders.v1 \
  --from-beginning \
  --max-messages 5 \
  --property print.key=true \
  --property key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

# Decode with jq for readability
kafka-avro-console-consumer ... | jq '.[] | {id, amount, status}'

Decoding Protobuf Messages

# Using kafka-protobuf-console-consumer (Confluent tools)
kafka-protobuf-console-consumer \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --topic orders.v1 \
  --property print.key=true \
  --from-beginning \
  --max-messages 10

# Using protoc to manually decode a specific message binary
# 1. Save raw bytes to file
# 2. protoc --decode=Order order.proto < message.bin

Advanced Filtering Techniques

Filter by Key

To find all messages for a specific entity (e.g., a specific order ID), pipe the console consumer output to grep. For high-throughput topics, target the specific partition using the key's hash:

# Find all messages for a specific key
kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --from-beginning \
  --property print.key=true | grep "order-uuid-123"

# Calculate which partition a key lands on (avoids full scan)
python3 -c "
import hashlib
key = b'order-uuid-123'
num_partitions = 12
partition = int(hashlib.md5(key).hexdigest(), 16) % num_partitions
print(f'Key routes to partition: {partition}')
"
# Then read only that partition
kafka-console-consumer.sh ... --partition 7 --from-beginning

Filter by Header

Message headers carry metadata like correlation IDs, source service names, and schema versions. Filtering by header quickly isolates messages from a specific producer version or geographic region.

# Print headers and filter by source service
kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --property print.headers=true \
  --from-beginning | grep "source-service=payments-api-v2"

# Sample message with headers:
# source-service:payments-api-v2,region:us-east-1 | order-123 | {json...}

Time-Based Range Queries

# Find the offset at a specific timestamp using kafka-get-offsets
kafka-get-offsets.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --partition 0 \
  --time 1706745600000  # Unix ms timestamp

# Then seek to that offset
kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic orders.v1 \
  --partition 0 \
  --offset 38420 \        # offset returned above
  --max-messages 500

Inspecting Schema Versions in Messages

Confluent-compatible Avro and Protobuf messages embed the Schema Registry schema ID in the first 5 bytes of each message. Decoding this byte sequence tells you exactly which schema version produced a given message.

Decode Schema ID from Raw Bytes

# Wire format: [magic byte (0x00)] [4-byte schema ID] [avro/proto bytes]

# Extract schema ID from first 5 bytes (Python)
import struct

def extract_schema_id(raw_bytes: bytes) -> int:
    assert raw_bytes[0] == 0x00, "Not a Confluent-encoded message"
    schema_id = struct.unpack(">I", raw_bytes[1:5])[0]
    return schema_id

# Then look up the schema
curl http://schema-registry:8081/schemas/ids/42
# Returns: {"schema": "{"type":"record",...}"}

# Full schema history for a subject
curl http://schema-registry:8081/subjects/orders.v1-value/versions
# Returns: [1, 2, 3, 4]  # all registered versions

Message Inspection in KLogic

KLogic's built-in message browser eliminates the need to install CLI tools or manage Schema Registry credentials locally. Every team member can inspect messages directly in the browser with their SSO credentials.

One-click decoding

Avro, Protobuf, and JSON messages are decoded automatically using the connected Schema Registry.

Visual filters

Filter by key prefix, header value, timestamp range, or partition — no grep required.

Partition seek

Jump to any offset or timestamp directly. No manual offset calculation needed.

Schema diff view

Compare the schema version in a message against the current Schema Registry version side by side.

Key Takeaways

Message inspection is the fastest path from alert to root cause for data pipeline bugs.
Use the key hash formula to calculate which partition a specific entity key lands on before doing a full topic scan.
Schema ID bytes are embedded in every Confluent-encoded message — decode them to identify schema version mismatches without consumer error logs.
Headers carry correlation IDs and source metadata — filter by them to isolate messages from a specific producer deployment.
Time-based offset queries let you inspect exactly what happened during a specific incident window.

Inspect Kafka Messages Without the CLI

KLogic's message browser lets your entire team inspect, decode, and filter Kafka messages in the browser — no CLI tools, no credentials distribution, no grep pipelines.

Request a Demo