CentralMesh.io

Kafka Fundamentals for Beginners
AdSense Banner (728x90)

6.4 Running Kafka Streams Locally

Build and run Kafka Streams application locally using Java and Docker. Filter and transform transaction data in real-time with hands-on examples.

Video Coming Soon

Running Kafka Streams Locally

Overview

This session sets up a local Kafka Streams environment to process real-time data efficiently using Docker, Java, and Gradle.

Objectives

  1. Run Kafka instance (using Docker)
  2. Write simple Kafka Streams application
  3. Verify stream processing with test messages

Exercise Overview

Processing Logic

Input Stream: Transactions with user name and amount

json
1{"user": "Alice", "amount": 100}
2{"user": "Bob", "amount": 200}

Processing Steps:

  1. Filter out transactions where amount ≤ 100
  2. Transform user name to uppercase

    Output Stream:

    json
    1{"user": "BOB", "amount": 200}

    Alice's transaction is filtered out (amount = 100), Bob's name converted to uppercase.

Prerequisites

Required Software

  • Docker: Quickly spin up Kafka environment
  • Java 11+: Kafka Streams is Java-based library
  • Gradle: Manage dependencies and build project

Setting Up Kafka with Docker

Docker Compose Configuration

yaml
1version: '3'
2services:
3  zookeeper:
4    image: confluentinc/cp-zookeeper:latest
5    container_name: zookeeper
6    environment:
7      ZOOKEEPER_CLIENT_PORT: 2181
8    ports:
9      - "2181:2181"
10
11  kafka1:
12    image: confluentinc/cp-kafka:latest
13    container_name: kafka1
14    depends_on:
15      - zookeeper
16    environment:
17      KAFKA_BROKER_ID: 1
18      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
20      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
21    ports:
22      - "9092:9092"

Configuration Details

#### Zookeeper

  • Manages Kafka brokers
  • Port: 2181 (default)
  • Confluent image for Kafka compatibility

#### Kafka Broker

  • ID: 1 (useful for multiple brokers)
  • Port: 9092 (standard Kafka port)
  • Advertised listeners: localhost:9092 for client connections
  • Security: Plaintext (no encryption/authentication)
  • Depends on Zookeeper

Starting Kafka

bash
1# Start services in background
2docker-compose up -d
3
4# Check running containers
5docker ps

Setting Up Kafka Streams Project

Initialize Project

bash
1# Create project directory
2mkdir kafka-streams-app
3cd kafka-streams-app
4
5# Initialize Gradle project
6gradle init --type java-application

Project Structure

text
1kafka-streams-app/
2├── build.gradle              # Build configuration and dependencies
3├── src/
4│   ├── main/
5│   │   └── java/            # Application code
6│   └── test/
7│       └── java/            # Test code
8├── gradlew                   # Gradle wrapper (Unix)
9├── gradlew.bat              # Gradle wrapper (Windows)
10└── settings.gradle          # Project settings

Add Dependencies

build.gradle:

gradle
1dependencies {
2    implementation 'org.apache.kafka:kafka-streams:3.4.0'
3}

Application Properties

properties
1bootstrap.servers=localhost:9092
2application.id=streams-app

Running the Application

bash
1java -jar my-streams-app.jar

Creating Kafka Topics

Create Input Topic

bash
1docker exec -it kafka1 kafka-topics.sh \
2  --create --topic input-topic \
3  --bootstrap-server localhost:9092 \
4  --partitions 1 \
5  --replication-factor 1

Create Output Topic

bash
1docker exec -it kafka1 kafka-topics.sh \
2  --create --topic output-topic \
3  --bootstrap-server localhost:9092 \
4  --partitions 1 \
5  --replication-factor 1

Command Breakdown

  • docker exec -it kafka1: Run command inside Kafka container
  • kafka-topics.sh --create: Create new topic
  • --topic <name>: Topic name
  • --bootstrap-server localhost:9092: Kafka broker address
  • --partitions 1: Single partition (simple setup)
  • --replication-factor 1: No replication (single broker)

Writing Kafka Streams Application

java
1import org.apache.kafka.streams.KafkaStreams;
2import org.apache.kafka.streams.StreamsBuilder;
3import org.apache.kafka.streams.StreamsConfig;
4import org.apache.kafka.streams.kstream.KStream;
5import org.apache.kafka.common.serialization.Serdes;
6
7import java.util.Properties;
8
9public class StreamsApp {
10    public static void main(String[] args) {
11        // 1. Configure Kafka Streams
12        Properties props = new Properties();
13        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
14        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
15        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
16                  Serdes.String().getClass());
17        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
18                  Serdes.String().getClass());
19
20        // 2. Build topology
21        StreamsBuilder builder = new StreamsBuilder();
22        KStream<String, String> stream = builder.stream("input-topic");
23
24        // 3. Process data
25        stream
26            .filter((key, value) -> {
27                String[] parts = value.split(",");
28                int amount = Integer.parseInt(parts[1]);
29                return amount > 100;  // Filter: amount > 100
30            })
31            .mapValues(value -> {
32                String[] parts = value.split(",");
33                String user = parts[0].toUpperCase();  // Transform: uppercase
34                return user + "," + parts[1];
35            })
36            .to("output-topic");  // Write to output
37
38        // 4. Start streams
39        KafkaStreams streams = new KafkaStreams(builder.build(), props);
40        streams.start();
41
42        // 5. Graceful shutdown
43        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
44    }
45}

Configuration Details

  • APPLICATION_ID_CONFIG: Unique identifier for Streams app
  • BOOTSTRAP_SERVERS_CONFIG: Kafka broker address
  • DEFAULT_KEY_SERDE_CLASS_CONFIG: Key serialization (String)
  • DEFAULT_VALUE_SERDE_CLASS_CONFIG: Value serialization (String)

Processing Logic

  1. Read: Stream from input-topic
  2. Filter: Keep transactions where amount > 100
  3. Transform: Convert username to uppercase
  4. Write: Send to output-topic

Sending Test Messages

bash
1docker exec -it kafka1 kafka-console-producer.sh \
2  --broker-list localhost:9092 \
3  --topic input-topic

Type messages (press Enter after each):

text
1Alice,100
2Bob,200

Command Breakdown

  • docker exec -it kafka1: Interactive command in container
  • kafka-console-producer.sh: Producer tool
  • --broker-list localhost:9092: Kafka broker
  • --topic input-topic: Target topic

Consuming Transformed Messages

bash
1docker exec -it kafka1 kafka-console-consumer.sh \
2  --bootstrap-server localhost:9092 \
3  --topic output-topic \
4  --from-beginning

Expected Output:

text
1BOB,200

Verification

  • Alice's transaction (amount = 100) filtered out
  • Bob's name converted to uppercase
  • Confirms stream processing works correctly

Scalability in Kafka Streams

Horizontal Scaling

#### Increase Partitions

  • More partitions → better data distribution
  • Enables parallel processing across instances

#### Multiple Application Instances

  • Behaves like consumer group
  • Automatic workload balancing
  • Example: 3 servers → run app on each
  • Kafka dynamically assigns partitions

Scaling Limits

  • Maximum instances = number of partitions
  • Topic with 2 partitions → max 2 instances
  • Additional instances remain idle (standby)

Key Requirement: Partition Alignment

  • Same keying for KStream and KTable
  • Ensures data for same key goes to same partition
  • Enables local joins without data shuffling
  • Example: User ID as key → Alice's data always on same partition

Summary

  • Real-Time Processing: Kafka Streams transforms and filters data as it flows
  • Example Operations:

- .filter(): Remove unwanted records

- .mapValues(): Transform data

  • Standard Java Application: No external processing frameworks needed
  • Horizontal Scaling: Like regular Kafka consumers
  • High-Throughput: Suitable for demanding data pipelines