6.5 Kafka Streams Deep Dive
Master KStream, KTable, and GlobalKTable with advanced joins, aggregations, and state management. Includes concurrency and scaling strategies.
Video Coming Soon
Kafka Streams Deep Dive
Overview
This session explores three key Kafka Streams components: KStream, KTable, and GlobalKTable. Each plays a distinct role in stream processing with different use cases and characteristics.
Setting Up Kafka with Docker
Docker Compose Configuration
1version: '3'
2services:
3 zookeeper:
4 image: confluentinc/cp-zookeeper:latest
5 container_name: zookeeper
6 environment:
7 ZOOKEEPER_CLIENT_PORT: 2181
8 ports:
9 - "2181:2181"
10
11 kafka1:
12 image: confluentinc/cp-kafka:latest
13 container_name: kafka1
14 depends_on:
15 - zookeeper
16 environment:
17 KAFKA_BROKER_ID: 1
18 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
20 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
21 ports:
22 - "9092:9092"Starting Services
1# Start Kafka and Zookeeper
2docker-compose up -d
3
4# Verify services are running
5docker psCreating Topics
All topics created with 2 partitions for parallel processing and replication factor 1 (local setup).
1# Enter Kafka container
2docker exec -it kafka1 bash
3
4# Create topics
5kafka-topics.sh --create --topic user-clicks \
6 --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
7
8kafka-topics.sh --create --topic user-profiles \
9 --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
10
11kafka-topics.sh --create --topic country-codes \
12 --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
13
14kafka-topics.sh --create --topic user-transactions \
15 --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1Topic Purposes
- user-clicks: Tracking user interactions
- user-profiles: User-related information
- country-codes: Country identifier mappings
- user-transactions: User purchase data
Producing Sample Data
User Clicks Topic
1kafka-console-producer.sh --broker-list localhost:9092 \
2 --topic user-clicks --property parse.key=true --property key.separator=:Messages (key:value format):
1Alice:clicked on /home page
2Bob:visited cart page
3Alice:navigated to product pageUser Profiles Topic
1kafka-console-producer.sh --broker-list localhost:9092 \
2 --topic user-profiles --property parse.key=true --property key.separator=:Messages:
1Alice:silver
2Bob:silver
3Alice:goldProfile updates tracked as changelog.
Country Codes Topic
1kafka-console-producer.sh --broker-list localhost:9092 \
2 --topic country-codes --property parse.key=true --property key.separator=:Messages:
1US:United States
2DE:Germany
3IN:IndiaUser Transactions Topic
1kafka-console-producer.sh --broker-list localhost:9092 \
2 --topic user-transactions --property parse.key=true --property key.separator=:Messages (user:user,amount format):
1Alice:Alice,200
2Bob:Bob,150
3Alice:Alice,300Project Setup
Initialize Gradle Project
1mkdir kafka-streams-app
2cd kafka-streams-app
3gradle init --type java-applicationProject Structure
1kafka-streams-app/
2├── build.gradle
3├── src/
4│ ├── main/
5│ │ └── java/
6│ │ └── example/
7│ │ ├── KStreamExample.java
8│ │ ├── KTableExample.java
9│ │ └── GlobalKTableExample.java
10│ └── test/
11│ └── java/
12├── gradlew
13├── gradlew.bat
14└── settings.gradleAdd Kafka Streams Dependency
build.gradle:
1dependencies {
2 implementation 'org.apache.kafka:kafka-streams:3.4.0'
3}KStream - Unbounded Event Stream
What is KStream?
KStream represents an unbounded, continuously updating stream of records. Each event is processed as it arrives in real-time.
Example: User Clicks Stream
1Input Topic: user-clicks
2
3time | user | page
4---------------------
5 1 | Alice | /home
6 2 | Bob | /cart
7 3 | Alice | /productEach line is an independent event with timestamp, user, and page.
KStream Example Code
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.KStream;
3import java.util.Properties;
4
5public class KStreamExample {
6 public static void main(String[] args) {
7 // Configure application
8 Properties props = new Properties();
9 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-example");
10 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
11
12 // Build topology
13 StreamsBuilder builder = new StreamsBuilder();
14 KStream<String, String> stream = builder.stream("user-clicks");
15
16 // Process each event independently
17 stream.foreach((key, value) ->
18 System.out.println("User: " + key + ", Action: " + value)
19 );
20
21 // Start streams
22 KafkaStreams streams = new KafkaStreams(builder.build(), props);
23 streams.start();
24 }
25}Characteristics
- Independent Processing: Each event processed one at a time
- No State: No memory of past events
- Use Cases: Filtering, transforming, mapping values
Aggregation with KStream
Question: Can we count visits per day with KStream?
Answer: Yes, but with caveats:
- Requires grouping by key and applying time window
- Aggregation transforms KStream → KTable
- Result is running count (stateful)
- State stores maintain counts per user
KTable - Changelog Stream
What is KTable?
KTable keeps only the latest value for each key, acting like a changelog. When a key is updated, previous value is replaced.
Two Types of KTable
#### 1. Computed KTable (from aggregation)
Built from KStream aggregation. Output topic may contain multiple updates before compaction.
Example: Page visit counts
1Topic: page-visit-count (before compaction)
2Alice → 2
3Alice → 3
4
5KTable in-memory state:
6Alice → 3 (latest only)#### 2. Direct Changelog KTable
Built directly from changelog topic. Topic stores all updates.
Example: User profiles
1Topic: user-profiles
2Alice → silver
3Alice → gold
4
5KTable in-memory state:
6Alice → gold (latest only)Important: Log Compaction
Without compaction (cleanup.policy=delete):
- All events retained indefinitely
- Topic grows with every update
- Inefficient for long-term storage
With compaction (cleanup.policy=compact):
- Kafka eventually keeps only latest value per key
- Aligns with KTable semantics
- Efficient state restoration
KTable Examples
#### Example 1: Computed KTable (Count Aggregation)
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class KTableExample {
6 public static void main(String[] args) {
7 Properties props = new Properties();
8 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-count-example");
9 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11 StreamsBuilder builder = new StreamsBuilder();
12
13 // Start with KStream
14 KStream<String, String> clickStream = builder.stream("user-clicks");
15
16 // Group and count → produces KTable
17 KTable<String, Long> clickCounts = clickStream
18 .groupByKey()
19 .count();
20
21 // Write to output topic
22 clickCounts.toStream().to("page-visit-count");
23
24 KafkaStreams streams = new KafkaStreams(builder.build(), props);
25 streams.start();
26 }
27}Result: Running count per user (e.g., Alice → 3)
#### Example 2: Direct KTable (Changelog)
1public class KTableExample2 {
2 public static void main(String[] args) {
3 Properties props = new Properties();
4 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-profile-example");
5 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
6
7 StreamsBuilder builder = new StreamsBuilder();
8
9 // Build KTable directly from topic
10 KTable<String, String> profiles = builder.table("user-profiles");
11
12 // KTable in-memory state has only latest values
13 profiles.toStream().foreach((key, value) ->
14 System.out.println("User: " + key + ", Profile: " + value)
15 );
16
17 KafkaStreams streams = new KafkaStreams(builder.build(), props);
18 streams.start();
19 }
20}Result: Latest profile per user (e.g., Alice → gold)
Complex Aggregation Example
Computing Total Spent Per User
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import org.apache.kafka.common.serialization.Serdes;
4import java.util.Properties;
5
6public class AggregationExample {
7 public static void main(String[] args) {
8 // Configure application
9 Properties props = new Properties();
10 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-spending-aggregation");
11 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
12 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
13 Serdes.String().getClass());
14 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
15 Serdes.String().getClass());
16
17 StreamsBuilder builder = new StreamsBuilder();
18
19 // Read transactions stream
20 KStream<String, String> transactions = builder.stream("user-transactions");
21
22 // Process and aggregate
23 KTable<String, Integer> totalSpent = transactions
24 // Filter: keep only high-value transactions
25 .filter((key, value) -> {
26 String[] parts = value.split(",");
27 int amount = Integer.parseInt(parts[1]);
28 return amount > 100;
29 })
30 // Transform: extract amount as integer
31 .mapValues(value -> {
32 String[] parts = value.split(",");
33 return Integer.parseInt(parts[1]);
34 })
35 // Aggregate: sum amounts per user
36 .groupByKey()
37 .reduce((agg, newValue) -> agg + newValue);
38
39 // Write results to output topic
40 totalSpent.toStream().to("user-total-spent");
41
42 KafkaStreams streams = new KafkaStreams(builder.build(), props);
43 streams.start();
44 }
45}Why KTable for Aggregation?
- State Required: Must track running total per user
- Latest Value: KTable holds current sum
- Incremental Updates: Each new transaction updates existing total
- State Store: Kafka Streams maintains state automatically
KTable Join Example
Joining KStream with KTable
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class StreamTableJoin {
6 public static void main(String[] args) {
7 Properties props = new Properties();
8 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join");
9 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11 StreamsBuilder builder = new StreamsBuilder();
12
13 // Read click events as KStream
14 KStream<String, String> clicks = builder.stream("user-clicks");
15
16 // Read profiles as KTable (latest value per user)
17 KTable<String, String> profiles = builder.table("user-profiles");
18
19 // Left join: enrich clicks with profile data
20 KStream<String, String> enrichedClicks = clicks.leftJoin(
21 profiles,
22 (clickValue, profileValue) ->
23 "Click: " + clickValue + ", Profile: " + profileValue
24 );
25
26 // Print results
27 enrichedClicks.foreach((key, value) ->
28 System.out.println(value)
29 );
30
31 KafkaStreams streams = new KafkaStreams(builder.build(), props);
32 streams.start();
33 }
34}Join Timeline Example
1Timeline:
2T1: Alice → silver (profile update)
3T2: Bob → silver (profile update)
4T3: Alice clicks /home → joined with "silver"
5T4: Bob clicks /cart → joined with "silver"
6T5: Alice → gold (profile update)
7T6: Alice clicks /product → joined with "gold"State Management in Joins
Two Types of State:
- KTable State: Current user profiles (backed by RocksDB + changelog)
- Join State: Temporary buffer for matching stream records with table values
Both automatically managed by Kafka Streams with fault tolerance.
Concurrency and Partitioning
Two Instance Setup (2 Partitions)
1Topic Partitions: 2
2Kafka Streams Instances: 2
3
4Instance 1: Partition 0
5Instance 2: Partition 1
6
7Requirement: Same key type for both topics
8Example: User ID as key → Alice always on same partitionKey Points:
- Partition-based parallelism
- Local joins (no cross-partition shuffling)
- Automatic workload distribution
- Fault tolerance with rebalancing
Single Instance Setup (2 Partitions)
1Topic Partitions: 2
2Kafka Streams Instances: 1
3
4Instance 1: Partitions 0 and 1
5
6No parallelism, but still works
7Each partition has isolated state storesScaling Rules
- Max instances = Number of partitions
- More instances than partitions → extras idle (standby)
- Same
application.idrequired for grouping - Automatic rebalancing on instance changes
Running Multiple Instances
1# Terminal 1
2java -jar kafka-streams-app.jar
3
4# Terminal 2 (same JAR, same application.id)
5java -jar kafka-streams-app.jarKafka automatically distributes partitions across instances.
GlobalKTable - Fully Replicated Table
What is GlobalKTable?
GlobalKTable stores latest value for each key, fully replicated on every instance.
Characteristics
- Full Replication: Every instance has complete dataset
- No Partitioning: Unlike KTable, not partition-based
- Local Access: All keys available locally
- Use Cases: Small reference data (country codes, product categories)
GlobalKTable Example
1Topic: country-codes (small reference data)
2US → United States
3DE → Germany
4IN → India
5
6Instance 1: Full copy (US, DE, IN)
7Instance 2: Full copy (US, DE, IN)Each instance can resolve any country code locally without cross-instance communication.
GlobalKTable Join Code
1import org.apache.kafka.streams.*;
2import org.apache.kafka.streams.kstream.*;
3import java.util.Properties;
4
5public class GlobalKTableJoin {
6 public static void main(String[] args) {
7 Properties props = new Properties();
8 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "globalktable-app");
9 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
10
11 StreamsBuilder builder = new StreamsBuilder();
12
13 // Stream of user clicks (may include country code in URL)
14 KStream<String, String> clicks = builder.stream("user-clicks");
15
16 // GlobalKTable: fully replicated country codes
17 GlobalKTable<String, String> countryCodes =
18 builder.globalTable("country-codes");
19
20 // Left join: extract country code from click and enrich
21 KStream<String, String> enrichedClicks = clicks.leftJoin(
22 countryCodes,
23 // Extract join key from stream value
24 (key, value) -> {
25 // Parse country code from URL (e.g., "/home?cc=US")
26 String[] parts = value.split("cc=");
27 return parts.length > 1 ? parts[1] : null;
28 },
29 // Combine stream and table values
30 (clickValue, countryName) ->
31 "Click: " + clickValue + ", Country: " + countryName
32 );
33
34 enrichedClicks.foreach((key, value) ->
35 System.out.println(value)
36 );
37
38 KafkaStreams streams = new KafkaStreams(builder.build(), props);
39 streams.start();
40 }
41}Example Flow
1Clicks:
2 Instance 1: Alice → /home?cc=US, Carol → /product?cc=IN
3 Instance 2: Bob → /cart?cc=DE, Dave → /home?cc=US
4
5Both instances have full country-codes table:
6 US → United States
7 DE → Germany
8 IN → India
9
10Results:
11 Instance 1:
12 Click: /home?cc=US, Country: United States
13 Click: /product?cc=IN, Country: India
14 Instance 2:
15 Click: /cart?cc=DE, Country: Germany
16 Click: /home?cc=US, Country: United StatesGlobalKTable vs KTable
| Feature | KTable | GlobalKTable |
|---------|--------|--------------|
| Partitioning | Yes (distributed) | No (full replication) |
| Data Size | Large datasets | Small reference data |
| Join Key | Must match stream key | Can extract from value |
| Local Access | Only assigned partitions | All keys available |
Summary
KStream
- Purpose: Process every event as it arrives
- State: Stateless (no memory)
- Use Cases: Filtering, transforming, real-time reactions
KTable
- Purpose: Maintain current state per key
- State: Stateful (latest value)
- Use Cases: Aggregations, counting, deduplication
GlobalKTable
- Purpose: Replicated reference data
- State: Stateful (fully replicated)
- Use Cases: Lookups, small datasets, flexible joins
Scalability
- Add topic partitions for parallelism
- Run more instances (up to partition count)
- Kafka handles work distribution automatically