Previously the real-time processing is done in different ways. For Ex: when payments were made we will save them in the database and later in the night or in multiple batches we process the data and get reflected into the system with new data offline.
But nowadays we need to process real-time data, where we will get continuous/stream of data. For Ex: recommendation engines most of the. modern apps like Netflix, prime. Based on our watch/search history the recommendation engine predicts the next watchlist. This happens at the immediate moment. This will provide a better user experience and business value as well. Another example would be credit card transactions/UPI transactions that we do instantly.
Stream processing is real-time continuous data processing. Let’s see how we can achieve simple real-time stream processing using Redis Stream With Spring Boot.
Let us take a use case, of a movie database. The subscribed users or anonymous users watch a movie and can either like, dislike, and rating a movie. Our task is to record a number of views, likes, dislikes, and also ratings. I will create a producer/publisher service by randomizing the likes, dislikes, and rating part. They will be published to Redis streams and consumers who have subscribed to these streams will update them either in DB or any other subsystem (Here I have used Redis sorted set for simplicity).
Technology Stack
Java 8 Spring Boot 2.3.7.RELEASE Maven For Build Docker for simplicity (Redis in docker and applications also in docker)
Let us look at the code base
Publisher Configuration
For simplicity, I am using a scheduler that publishes the data to streams every 2 seconds, the consumers who have subscribed will publish to Redis sorted set with details like (movie name, likes count, dislikes count, and summated rating).
First, let us see the DTO classes that are published and consumed.
I generated some random movie repository to publish every 2 seconds.
package com.spring.redis.streams.repository;
import com.spring.redis.streams.dto.Movie;
import com.spring.redis.streams.dto.MovieDetails;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created on 21/December/2020 By Author Eresh, Gorantla
**/
@Repository
public class MovieRepository {
public static final List <Movie>MOVIE_LIST=Stream.of(
new Movie(1, "Avengers End Game", "Marvel Studios"),
new Movie(2, "Avengers Infinity War", "Marvel Studios"),
new Movie(3, "Dark Knight", "Warner Bros"),
new Movie(4, "Pulp Fiction", "MiraMax"),
new Movie(5, "Fight Club", "Warner Bros"),
new Movie(6, "Good Fellas", "Warner Bros"),
new Movie(7, "Seven", "Warner Bros"),
new Movie(8, "Cast Away", "ImageMovers Playtone"),
new Movie(9, "Forest Gump", "The Tisch Company"),
new Movie(10, "King Kong", "Warner Bros"),
new Movie(11, "The Silence Of Lambs", "Strong Heart Productions"),
new Movie(12, "Usual Suspects", "PolyGram Filmed Entertainment"),
new Movie(13, "Green Mile", "Castle Rock Entertainment"),
new Movie(14, "No Country For Old Men", "Scott Rudin Productions"),
new Movie(15, "Train to Busan", "Next Entertainment World"),
new Movie(16, "Parasite", "Barunson E&A"),
new Movie(17, "Whiplash", "Sony Pictures"),
new Movie(18, "The Prestige", "Warner Bros"),
new Movie(19, "Joker", "Warner Bros"),
new Movie(20, "Old Boy", "Show East"),
new Movie(21, "I Saw Devil", "Peppermint and company"),
new Movie(22, "The Perfect Murder", "Warner Bros"),
new Movie(23, "The Chaser", "Snow Box"),
new Movie(24, "Goodwill Hunting", "Be Gentlemen"),
new Movie(25, "Snatch", "Columbia Pictures")
).collect(Collectors.toList());
public MovieDetails getRandomMovie() {
Integer index =ThreadLocalRandom.current().nextInt(0, 25);
Movie movie =MOVIE_LIST.get(index);
Random random = new Random();
Integer value = random.ints(0, 1000).findFirst().getAsInt();
Double rating = random.doubles(1.0,
10.0).findFirst().getAsDouble();
return new MovieDetails(movie, value % 2 == 0, value % 2 == 1,
rating);
}
Movie Publish Event Class which runs for every 2 seconds and publish to Redis stream
package com.spring.redis.streams.publisher;
import com.spring.redis.streams.dto.MovieDetails;
import com.spring.redis.streams.repository.MovieRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created on 21/December/2020 By Author Eresh, Gorantla
**/
@Service
@Slf4j
public class MovieEventPublisher {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Value("${stream.key}")
private String streamKey;
private final MovieRepository movieRepository;
private final ReactiveRedisTemplate <String, String> redisTemplate;
public MovieEventPublisher(MovieRepository repository,
ReactiveRedisTemplate <String, String>redisTemplate) {
this.movieRepository = repository;
this.redisTemplate = redisTemplate;
}
@Scheduled(fixedRateString="${publish.rate}")
public void publishEvent(){
MovieDetails movieDetails =
this.movieRepository.getRandomMovie();
log.info("Movie Details :: "+movieDetails);
ObjectRecord<String, MovieDetails> record =
StreamRecords.newRecord()
.ofObject(movieDetails)
.withStreamKey(streamKey);
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate=10000)
public void showPublishedEventsSoFar(){
log.info("Total Events :: "+atomicInteger.get());
}
Redis Consumer
RedisConfig Class
package com.spring.redis.streams.config;
import com.spring.redis.streams.dto.MovieDetails;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
/**
* Created on 21/December/2020 By Author Eresh, Gorantla
**/
@Configuration
public class RedisConfig {
@Value("${stream.key}")
private String streamKey;
@Autowired
private StreamListener<String, ObjectRecord<String, MovieDetails>>
streamListener;
@Bean
public Subscription
subscription(RedisConnectionFactoryredisConnectionFactory)
throws UnknownHostException {
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
<String, ObjectRecord<String, MovieDetails>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(MovieDetails.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String,
MovieDetails>> listenerContainer =
StreamMessageListenerContainer
.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.receive(
Consumer.from(streamKey,
InetAddress.getLocalHost()
.getHostName()),
StreamOffset.create(streamKey,
ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
Movie Event Consumer
package com.spring.redis.streams.config;
import com.spring.redis.streams.dto.MovieDetails;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created on 21/December/2020 By Author Eresh, Gorantla
**/
@Service
@Slf4j
public class MovieEventConsumer implements StreamListener<String, ObjectRecord<String, MovieDetails>>
{
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired
private ReactiveRedisTemplate <String, String> redisTemplate;
@Override
@SneakyThrows
public void onMessage(ObjectRecord<String, MovieDetails>record) {
log.info(InetAddress.getLocalHost().getHostName() +" - consumed
:"+ record.getValue());
if (record.getValue().getLikes()) {
this.redisTemplate
.opsForZSet()
.incrementScore(record.getValue().getMovie().getName(),
"Likes", 1)
.subscribe();
}
if (record.getValue().getDisLike()) {
this.redisTemplate
.opsForZSet()
.incrementScore(record.getValue()
.getMovie().getName(), "Dislikes", 1)
.subscribe();
}
this.redisTemplate
.opsForZSet()
.incrementScore(record.getValue()
.getMovie().getName(), "Views", 1)
.subscribe();
this.redisTemplate
.opsForZSet()
.incrementScore(record.getValue().getMovie().getName(),
"Rating", record.getValue().getRating())
.subscribe();
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate=10000)
public void showPublishedEventsSoFar(){
log.info("Total Consumer :: "+ atomicInteger.get());
}
}
Dockerizing the application
I have used docker to simply set up Redis and other boot applications in a simple docker-compose file to avoid any further installations. The consumer application will have 3 replicas to ensure the load is distributed. For this docker is used.
Docker-compose.yaml
version: '3'
services:
redis:
image: redis
ports:
- 6379:6379
redis-commander:
image: rediscommander/redis-commander:latest
depends_on:
- redis
environment:
- REDIS_HOSTS=redis:redis
ports:
- 8081:8081
producer:
build: ./redis-publisher
image: eresh.gorantla/redis-publisher
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
- PUBLISH_RATE=2000
consumer:
build: ./redis-consumer
image: eresh.gorantla/redis-consumer
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
redis image is used for redis instance. redis-commander is a GUI for redis and bind to port 8081. redis-publisher with environment variables were provided with publish rate as 2000. redis-consumer also configured.
Docker file for Redis-publisher
FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
Docker file for Redis-consumer
FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
I haven’t triggered the build in the docker file we have to do builds before we bring the docker container up and running.
How to start the application
Let us build both publisher and consumer. Navigate to consumer and publisher directories and run the below maven command to create a jar file.
mvn clean package -DskipTests
Bring up the Redis with Redis commander
Go to the root directory and run the below command
docker-compose up redis redis-commander
ereshgorantla@Ereshs-MacBook-Pro redis-stream % docker-compose up redis redis-commander
WARNING: The Docker Engine you're using is running in swarm mode.
Compose does not use swarm mode to deploy services to multiple nodes in a swarm. All containers will be scheduled on the current node.
To deploy your application across the swarm, use `docker stack deploy`.
Starting redis-stream_redis_1 ... done
Starting redis-stream_redis-commander_1 ... done
Attaching to redis-stream_redis_1, redis-stream_redis-commander_1
redis-commander_1 | Creating custom redis-commander config '/redis-commander/config/local-production.json'.
redis_1 | 1:C 22 Dec 2020 05:23:35.977 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
redis_1 | 1:C 22 Dec 2020 05:23:35.977 # Redis version=6.0.9, bits=64, commit=00000000, modified=0, pid=1, just started
redis_1 | 1:C 22 Dec 2020 05:23:35.977 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
redis-commander_1 | Parsing 1 REDIS_HOSTS into custom redis-commander config '/redis-commander/config/local-production.json'.
redis_1 | 1:M 22 Dec 2020 05:23:35.978 * Running mode=standalone, port=6379.
redis_1 | 1:M 22 Dec 2020 05:23:35.978 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
redis_1 | 1:M 22 Dec 2020 05:23:35.978 # Server initialized
redis_1 | 1:M 22 Dec 2020 05:23:35.978 * Loading RDB produced by version 6.0.9
redis_1 | 1:M 22 Dec 2020 05:23:35.978 * RDB age 2642 seconds
redis_1 | 1:M 22 Dec 2020 05:23:35.978 * RDB memory usage when created 0.79 Mb
redis_1 | 1:M 22 Dec 2020 05:23:35.978 * DB loaded from disk: 0.000 seconds
redis_1 | 1:M 22 Dec 2020 05:23:35.979 * Ready to accept connections
redis-commander_1 | node ./bin/redis-commander
redis-commander_1 | Using scan instead of keys
redis-commander_1 | No Save: false
redis-commander_1 | listening on 0.0.0.0:8081
redis-commander_1 | access with browser at http://127.0.0.1:8081
redis-commander_1 | Redis Connection redis:6379 using Redis DB #0
Go to browser and open http://127.0.0.1:8081
Create Redis stream by name “movie-events” we have used the same name in our application.
XADD movie-events * any-key any-value
Create a consumer group to share the load between consumers
XGROUP CREATE movie-events movie-events $
Bring up the Redis-producer
docker-compose up producer
Let us bring up the consumer with 3 replicas
docker-compose up --scale consumer=3
See the console: You could observe 3 consumers have come up and started consuming the stream events.
Let us see docker instances
Let us see Redis commander whether the data is published in a sorted set with a movie name or not.
Source: Medium
The Tech Platform
Comments