CentralMesh.io

Kafka Fundamentals for Beginners
AdSense Banner (728x90)

6.5 Kafka Streams Deep Dive

Master KStream, KTable, and GlobalKTable with advanced joins, aggregations, and state management. Includes concurrency and scaling strategies.

Video Coming Soon

Kafka Streams Deep Dive

Overview

This session explores three key Kafka Streams components: KStream, KTable, and GlobalKTable. Each plays a distinct role in stream processing with different use cases and characteristics.

Setting Up Kafka with Docker

Docker Compose Configuration

yaml
1version: '3'
2services:
3  zookeeper:
4    image: confluentinc/cp-zookeeper:latest
5    container_name: zookeeper
6    environment:
7      ZOOKEEPER_CLIENT_PORT: 2181
8    ports:
9      - "2181:2181"
10
11  kafka1:
12    image: confluentinc/cp-kafka:latest
13    container_name: kafka1
14    depends_on:
15      - zookeeper
16    environment:
17      KAFKA_BROKER_ID: 1
18      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
20      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
21    ports:
22      - "9092:9092"

Starting Services

bash
1# Start Kafka and Zookeeper
2docker-compose up -d
3
4# Verify services are running
5docker ps

Creating Topics

All topics created with 2 partitions for parallel processing and replication factor 1 (local setup).

bash
1# Enter Kafka container
2docker exec -it kafka1 bash
3
4# Create topics
5kafka-topics.sh --create --topic user-clicks \
6  --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
7
8kafka-topics.sh --create --topic user-profiles \
9  --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
10
11kafka-topics.sh --create --topic country-codes \
12  --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
13
14kafka-topics.sh --create --topic user-transactions \
15  --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

Topic Purposes

  • user-clicks: Tracking user interactions
  • user-profiles: User-related information
  • country-codes: Country identifier mappings
  • user-transactions: User purchase data

Producing Sample Data

User Clicks Topic

bash
1kafka-console-producer.sh --broker-list localhost:9092 \
2  --topic user-clicks --property parse.key=true --property key.separator=:

Messages (key:value format):

text
1Alice:clicked on /home page
2Bob:visited cart page
3Alice:navigated to product page

User Profiles Topic

bash
1kafka-console-producer.sh --broker-list localhost:9092 \
2  --topic user-profiles --property parse.key=true --property key.separator=:

Messages:

text
1Alice:silver
2Bob:silver
3Alice:gold

Profile updates tracked as changelog.

Country Codes Topic

bash
1kafka-console-producer.sh --broker-list localhost:9092 \
2  --topic country-codes --property parse.key=true --property key.separator=:

Messages:

text
1US:United States
2DE:Germany
3IN:India

User Transactions Topic

bash
1kafka-console-producer.sh --broker-list localhost:9092 \
2  --topic user-transactions --property parse.key=true --property key.separator=:

Messages (user:user,amount format):

text
1Alice:Alice,200
2Bob:Bob,150
3Alice:Alice,300

Project Setup

Initialize Gradle Project

bash
1mkdir kafka-streams-app
2cd kafka-streams-app
3gradle init --type java-application

Project Structure

text
1kafka-streams-app/
2├── build.gradle
3├── src/
4│   ├── main/
5│   │   └── java/
6│   │       └── example/
7│   │           ├── KStreamExample.java
8│   │           ├── KTableExample.java
9│   │           └── GlobalKTableExample.java
10│   └── test/
11│       └── java/
12├── gradlew
13├── gradlew.bat
14└── settings.gradle

Add Kafka Streams Dependency

build.gradle:

gradle
1dependencies {
2    implementation 'org.apache.kafka:kafka-streams:3.4.0'
3}

KStream - Unbounded Event Stream

What is KStream?

KStream represents an unbounded, continuously updating stream of records. Each event is processed as it arrives in real-time.

Example: User Clicks Stream

text
1Input Topic: user-clicks
2
3time | user   | page
4---------------------
5  1  | Alice  | /home
6  2  | Bob    | /cart
7  3  | Alice  | /product

Each line is an independent event with timestamp, user, and page.

KStream Example Code

java
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.KStream;
3import java.util.Properties;
4
5public class KStreamExample {
6    public static void main(String[] args) {
7        // Configure application
8        Properties props = new Properties();
9        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-example");
10        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
11
12        // Build topology
13        StreamsBuilder builder = new StreamsBuilder();
14        KStream<String, String> stream = builder.stream("user-clicks");
15
16        // Process each event independently
17        stream.foreach((key, value) ->
18            System.out.println("User: " + key + ", Action: " + value)
19        );
20
21        // Start streams
22        KafkaStreams streams = new KafkaStreams(builder.build(), props);
23        streams.start();
24    }
25}

Characteristics

  • Independent Processing: Each event processed one at a time
  • No State: No memory of past events
  • Use Cases: Filtering, transforming, mapping values

Aggregation with KStream

Question: Can we count visits per day with KStream?

Answer: Yes, but with caveats:

  • Requires grouping by key and applying time window
  • Aggregation transforms KStream → KTable
  • Result is running count (stateful)
  • State stores maintain counts per user

KTable - Changelog Stream

What is KTable?

KTable keeps only the latest value for each key, acting like a changelog. When a key is updated, previous value is replaced.

Two Types of KTable

#### 1. Computed KTable (from aggregation)

Built from KStream aggregation. Output topic may contain multiple updates before compaction.

Example: Page visit counts

text
1Topic: page-visit-count (before compaction)
2Alice → 2
3Alice → 3
4
5KTable in-memory state:
6Alice → 3 (latest only)

#### 2. Direct Changelog KTable

Built directly from changelog topic. Topic stores all updates.

Example: User profiles

text
1Topic: user-profiles
2Alice → silver
3Alice → gold
4
5KTable in-memory state:
6Alice → gold (latest only)

Important: Log Compaction

Without compaction (cleanup.policy=delete):

  • All events retained indefinitely
  • Topic grows with every update
  • Inefficient for long-term storage

With compaction (cleanup.policy=compact):

  • Kafka eventually keeps only latest value per key
  • Aligns with KTable semantics
  • Efficient state restoration

KTable Examples

#### Example 1: Computed KTable (Count Aggregation)

java
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class KTableExample {
6    public static void main(String[] args) {
7        Properties props = new Properties();
8        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-count-example");
9        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11        StreamsBuilder builder = new StreamsBuilder();
12
13        // Start with KStream
14        KStream<String, String> clickStream = builder.stream("user-clicks");
15
16        // Group and count → produces KTable
17        KTable<String, Long> clickCounts = clickStream
18            .groupByKey()
19            .count();
20
21        // Write to output topic
22        clickCounts.toStream().to("page-visit-count");
23
24        KafkaStreams streams = new KafkaStreams(builder.build(), props);
25        streams.start();
26    }
27}

Result: Running count per user (e.g., Alice → 3)

#### Example 2: Direct KTable (Changelog)

java
1public class KTableExample2 {
2    public static void main(String[] args) {
3        Properties props = new Properties();
4        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-profile-example");
5        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
6
7        StreamsBuilder builder = new StreamsBuilder();
8
9        // Build KTable directly from topic
10        KTable<String, String> profiles = builder.table("user-profiles");
11
12        // KTable in-memory state has only latest values
13        profiles.toStream().foreach((key, value) ->
14            System.out.println("User: " + key + ", Profile: " + value)
15        );
16
17        KafkaStreams streams = new KafkaStreams(builder.build(), props);
18        streams.start();
19    }
20}

Result: Latest profile per user (e.g., Alice → gold)

Complex Aggregation Example

Computing Total Spent Per User

java
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import org.apache.kafka.common.serialization.Serdes;
4import java.util.Properties;
5
6public class AggregationExample {
7    public static void main(String[] args) {
8        // Configure application
9        Properties props = new Properties();
10        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-spending-aggregation");
11        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
12        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
13                  Serdes.String().getClass());
14        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
15                  Serdes.String().getClass());
16
17        StreamsBuilder builder = new StreamsBuilder();
18
19        // Read transactions stream
20        KStream<String, String> transactions = builder.stream("user-transactions");
21
22        // Process and aggregate
23        KTable<String, Integer> totalSpent = transactions
24            // Filter: keep only high-value transactions
25            .filter((key, value) -> {
26                String[] parts = value.split(",");
27                int amount = Integer.parseInt(parts[1]);
28                return amount > 100;
29            })
30            // Transform: extract amount as integer
31            .mapValues(value -> {
32                String[] parts = value.split(",");
33                return Integer.parseInt(parts[1]);
34            })
35            // Aggregate: sum amounts per user
36            .groupByKey()
37            .reduce((agg, newValue) -> agg + newValue);
38
39        // Write results to output topic
40        totalSpent.toStream().to("user-total-spent");
41
42        KafkaStreams streams = new KafkaStreams(builder.build(), props);
43        streams.start();
44    }
45}

Why KTable for Aggregation?

  • State Required: Must track running total per user
  • Latest Value: KTable holds current sum
  • Incremental Updates: Each new transaction updates existing total
  • State Store: Kafka Streams maintains state automatically

KTable Join Example

Joining KStream with KTable

java
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class StreamTableJoin {
6    public static void main(String[] args) {
7        Properties props = new Properties();
8        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join");
9        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11        StreamsBuilder builder = new StreamsBuilder();
12
13        // Read click events as KStream
14        KStream<String, String> clicks = builder.stream("user-clicks");
15
16        // Read profiles as KTable (latest value per user)
17        KTable<String, String> profiles = builder.table("user-profiles");
18
19        // Left join: enrich clicks with profile data
20        KStream<String, String> enrichedClicks = clicks.leftJoin(
21            profiles,
22            (clickValue, profileValue) ->
23                "Click: " + clickValue + ", Profile: " + profileValue
24        );
25
26        // Print results
27        enrichedClicks.foreach((key, value) ->
28            System.out.println(value)
29        );
30
31        KafkaStreams streams = new KafkaStreams(builder.build(), props);
32        streams.start();
33    }
34}

Join Timeline Example

text
1Timeline:
2T1: Alice → silver (profile update)
3T2: Bob → silver (profile update)
4T3: Alice clicks /home → joined with "silver"
5T4: Bob clicks /cart → joined with "silver"
6T5: Alice → gold (profile update)
7T6: Alice clicks /product → joined with "gold"

State Management in Joins

Two Types of State:

  1. KTable State: Current user profiles (backed by RocksDB + changelog)
  2. Join State: Temporary buffer for matching stream records with table values

    Both automatically managed by Kafka Streams with fault tolerance.

Concurrency and Partitioning

Two Instance Setup (2 Partitions)

text
1Topic Partitions: 2
2Kafka Streams Instances: 2
3
4Instance 1: Partition 0
5Instance 2: Partition 1
6
7Requirement: Same key type for both topics
8Example: User ID as key → Alice always on same partition

Key Points:

  • Partition-based parallelism
  • Local joins (no cross-partition shuffling)
  • Automatic workload distribution
  • Fault tolerance with rebalancing

Single Instance Setup (2 Partitions)

text
1Topic Partitions: 2
2Kafka Streams Instances: 1
3
4Instance 1: Partitions 0 and 1
5
6No parallelism, but still works
7Each partition has isolated state stores

Scaling Rules

  • Max instances = Number of partitions
  • More instances than partitions → extras idle (standby)
  • Same application.id required for grouping
  • Automatic rebalancing on instance changes

Running Multiple Instances

bash
1# Terminal 1
2java -jar kafka-streams-app.jar
3
4# Terminal 2 (same JAR, same application.id)
5java -jar kafka-streams-app.jar

Kafka automatically distributes partitions across instances.

GlobalKTable - Fully Replicated Table

What is GlobalKTable?

GlobalKTable stores latest value for each key, fully replicated on every instance.

Characteristics

  • Full Replication: Every instance has complete dataset
  • No Partitioning: Unlike KTable, not partition-based
  • Local Access: All keys available locally
  • Use Cases: Small reference data (country codes, product categories)

GlobalKTable Example

text
1Topic: country-codes (small reference data)
2US → United States
3DE → Germany
4IN → India
5
6Instance 1: Full copy (US, DE, IN)
7Instance 2: Full copy (US, DE, IN)

Each instance can resolve any country code locally without cross-instance communication.

GlobalKTable Join Code

java
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class GlobalKTableJoin {
6    public static void main(String[] args) {
7        Properties props = new Properties();
8        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "globalktable-app");
9        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11        StreamsBuilder builder = new StreamsBuilder();
12
13        // Stream of user clicks (may include country code in URL)
14        KStream<String, String> clicks = builder.stream("user-clicks");
15
16        // GlobalKTable: fully replicated country codes
17        GlobalKTable<String, String> countryCodes =
18            builder.globalTable("country-codes");
19
20        // Left join: extract country code from click and enrich
21        KStream<String, String> enrichedClicks = clicks.leftJoin(
22            countryCodes,
23            // Extract join key from stream value
24            (key, value) -> {
25                // Parse country code from URL (e.g., "/home?cc=US")
26                String[] parts = value.split("cc=");
27                return parts.length > 1 ? parts[1] : null;
28            },
29            // Combine stream and table values
30            (clickValue, countryName) ->
31                "Click: " + clickValue + ", Country: " + countryName
32        );
33
34        enrichedClicks.foreach((key, value) ->
35            System.out.println(value)
36        );
37
38        KafkaStreams streams = new KafkaStreams(builder.build(), props);
39        streams.start();
40    }
41}

Example Flow

text
1Clicks:
2  Instance 1: Alice → /home?cc=US, Carol → /product?cc=IN
3  Instance 2: Bob → /cart?cc=DE, Dave → /home?cc=US
4
5Both instances have full country-codes table:
6  US → United States
7  DE → Germany
8  IN → India
9
10Results:
11  Instance 1:
12    Click: /home?cc=US, Country: United States
13    Click: /product?cc=IN, Country: India
14  Instance 2:
15    Click: /cart?cc=DE, Country: Germany
16    Click: /home?cc=US, Country: United States

GlobalKTable vs KTable

| Feature | KTable | GlobalKTable |

|---------|--------|--------------|

| Partitioning | Yes (distributed) | No (full replication) |

| Data Size | Large datasets | Small reference data |

| Join Key | Must match stream key | Can extract from value |

| Local Access | Only assigned partitions | All keys available |

Summary

KStream

  • Purpose: Process every event as it arrives
  • State: Stateless (no memory)
  • Use Cases: Filtering, transforming, real-time reactions

KTable

  • Purpose: Maintain current state per key
  • State: Stateful (latest value)
  • Use Cases: Aggregations, counting, deduplication

GlobalKTable

  • Purpose: Replicated reference data
  • State: Stateful (fully replicated)
  • Use Cases: Lookups, small datasets, flexible joins

Scalability

  • Add topic partitions for parallelism
  • Run more instances (up to partition count)
  • Kafka handles work distribution automatically