Our Blog

Ongoing observations by End Point people

Creating a Messaging App Using Spring for Apache Kafka, Part 4

Kürşat Kutlu Aydemir

By Kürşat Kutlu Aydemir
January 14, 2021

Spring-Kafka Photo by Daniel Tran on Unsplash

This article is part of a series. The GitHub repository with code examples can be found here.

After a long break from this series let’s keep moving further.

Let’s configure and prepare the WebSocket session pool. As we go through some custom operations like authentication and storing messages on the time of socket messages and sessions received, we need to create a WebSocketHandler for WebSocket configuration. When a WebSocket session message is received we’re going to send the message to a Kafka topic. In order to achieve this we need to define our WebSocket message handler MessageHandler and Kafka message producer MessageSender. We also need a session pool WebSocketPool so we can manage the client sessions.

WebSocketPool

package com.endpoint.SpringKafkaMessaging.websocket;

import org.springframework.web.socket.WebSocketSession;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class WebSocketPool {
  public static Map<Long, Set<WebSocketSession>> websockets = new HashMap<>();
}

WebSocketPool holds client sessions in a map of <user_id, <set of WebSocketSession>>. This map allows multiple sessions for one user, ensuring it will work from multiple client applications.

MessageHandler

package com.endpoint.SpringKafkaMessaging.websocket;

import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;

public interface MessageHandler {
  public void addSessionToPool(Long userId, WebSocketSession session);
  public void sendMessageToUser(Long userId, String message) throws IOException;
  void removeFromSessionToPool(Long userId, WebSocketSession session);
}

MessageHandlerImpl

package com.endpoint.SpringKafkaMessaging.websocket;

import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

@Service
public class MessageHandlerImpl implements MessageHandler {

  @Override
  public void addSessionToPool(Long userId, WebSocketSession session) {

    Set<WebSocketSession> userSessions = WebSocketPool.websockets.get(userId);

    if (userSessions != null) {
      userSessions.add(session);
      WebSocketPool.websockets.put(userId, userSessions);
    } else {
      Set<WebSocketSession> newUserSessions = new HashSet<>();
      newUserSessions.add(session);
      WebSocketPool.websockets.put(userId, newUserSessions);
    }

  }

  @Override
  public void sendMessageToUser(Long userId, String message) throws IOException {

    Set<WebSocketSession> userSessions = WebSocketPool.websockets.get(userId);

    if (userSessions == null) {
      return;
    }

    TextMessage textMessage = new TextMessage(message);
    for (WebSocketSession session : userSessions) {
      session.sendMessage(textMessage);
    }

  }

  @Override
  public void removeFromSessionToPool(Long userId, WebSocketSession session) {
    Set<WebSocketSession> userSessions = WebSocketPool.websockets.get(userId);

    if (userSessions != null) {
      for (WebSocketSession sessionItem : userSessions) {
        if (sessionItem.equals(session)) {
          userSessions.remove(session);
        }
      }
    }
    WebSocketPool.websockets.put(userId, userSessions);
  }
}

With MessageHandler we are able to handle adding or removing WebSocketSession sessions from the pool as well as sending messages to a target user’s WebSocket sessions. On the other hand, MessageSender is going to define a KafkaTemplate broker to send the messages to a specified Kafka topic.

MessageSender

package com.endpoint.SpringKafkaMessaging.message.broker;

import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageSender {

  private static final Logger LOG = LoggerFactory.getLogger(MessageSender.class);

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  public void send(String topic, String message) {
    kafkaTemplate.send(topic, message);
  }
}

Now we can create our WebSocketHandler. It will utilize MessageHandler to add and remove user sessions and MessageSender to send WebSocketSession messages to Kafka topics. I only used one topic in this messaging app, SEND_MESSAGE, but in a real application there would be a bunch of Kafka topics for sending different types of messages like new contact requests, notifications, etc.

WebSocketHandler

package com.endpoint.SpringKafkaMessaging.websocket;

import com.endpoint.SpringKafkaMessaging.cache.respository.CacheRepository;
import com.endpoint.SpringKafkaMessaging.message.broker.MessageSender;
import com.endpoint.SpringKafkaMessaging.persistent.model.User;
import com.endpoint.SpringKafkaMessaging.persistent.repository.UserRepository;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

@Component
public class WebSocketHandler extends TextWebSocketHandler {

  @Autowired
  CacheRepository cacheRepository;

  @Autowired
  UserRepository userRepository;

  @Autowired
  MessageHandler messageHandler;

  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {

    String parameters[] = session.getUri().getQuery().split("=");

    if (parameters.length == 2 && parameters[0].equals("accessToken")) {
      String accessToken = parameters[1];

      Long senderUserId = 0L;
      String senderId = cacheRepository.getUserIdByAccessToken(accessToken);

      if (senderId == null) {
        User sender = userRepository.findByToken(accessToken);
        if (sender != null) {
          senderUserId = sender.getUserId();
        }
      } else {
        senderUserId = Long.valueOf(senderId);
      }
      if (senderUserId == 0L) {
        return;
      }

      messageHandler.removeFromSessionToPool(senderUserId, session);
    }

  }

  @Override
  public void afterConnectionEstablished(WebSocketSession session) throws Exception {

    String parameters[] = session.getUri().getQuery().split("=");

    if (parameters.length == 2 && parameters[0].equals("accessToken")) {
      String accessToken = parameters[1];

      Long senderUserId = 0L;
      String senderId = cacheRepository.getUserIdByAccessToken(accessToken);

      if (senderId == null) {
        User sender = userRepository.findByToken(accessToken);
        if (sender != null) {
          senderUserId = sender.getUserId();
        }
      } else {
        senderUserId = Long.valueOf(senderId);
      }
      if (senderUserId == 0L) {
        return;
      }

      messageHandler.addSessionToPool(senderUserId, session);
    }
    else {
      session.close();
    }

  }

  @Autowired
  private MessageSender sender;

  @Override
  protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception {

    JSONObject jsonObject = new JSONObject(textMessage.getPayload());
    String topic = jsonObject.getString("topic");

    // only SEND_MESSAGE topic is available
    if (topic == null && !topic.equals("SEND_MESSAGE")) {
      return;
    }

    sender.send(topic, textMessage.getPayload());
  }
}

We can configure WebSocket in our application by adding the WebSocketHandler Bean and our custom MessageHandler.

WebSocketConfig

package com.endpoint.SpringKafkaMessaging.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

  @Bean
  public WebSocketHandler myMessageHandler() {
    return new WebSocketHandler();
  }

  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(myMessageHandler(), "/messaging").setAllowedOrigins("*");
  }

}

Now our WebSocket configuration is ready. Let’s configure our MessageService:

MessageService

package com.endpoint.SpringKafkaMessaging.message;

import com.endpoint.SpringKafkaMessaging.persistent.model.Message;

import java.util.List;

public interface MessageService {
  public void sendMessage(String accessToken, Long sendTo, String msg);
  List<Message> getMessageHistory(Long fromUserId, Long toUserId);
}

With our MessageService interface we can define common messaging methods to be utilized by the Kafka listeners. Notice the sendMessage method where we take accessToken, sendTo and msg parameters. accessToken will be used to authenticate user sending the message. sendTo is the user_id of the recipient.

MessageServiceImpl

package com.endpoint.SpringKafkaMessaging.message;

import com.endpoint.SpringKafkaMessaging.cache.respository.CacheRepository;
import com.endpoint.SpringKafkaMessaging.persistent.model.Message;
import com.endpoint.SpringKafkaMessaging.persistent.model.User;
import com.endpoint.SpringKafkaMessaging.persistent.repository.MessageRepository;
import com.endpoint.SpringKafkaMessaging.persistent.repository.UserRepository;
import com.endpoint.SpringKafkaMessaging.websocket.MessageHandler;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.List;

@Service
public class MessageServiceImpl implements MessageService {

  private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);

  @Autowired
  MessageRepository messageRepository;

  @Autowired
  CacheRepository cacheRepository;

  @Autowired
  UserRepository userRepository;

  @Autowired
  MessageHandler messageHandler;

  @Override
  public void sendMessage(String accessToken, Long sendTo, String msg) {

    Long senderUserId = 0L;
    String senderId = cacheRepository.getUserIdByAccessToken(accessToken);

    if (senderId == null) {
      User sender = userRepository.findByToken(accessToken);
      if (sender != null) {
        senderUserId = sender.getUserId();
      }
    } else {
      senderUserId = Long.valueOf(senderId);
    }
    if (senderUserId == 0L) {
      return;
    }

    try {
      // enrich message with senderId
      JSONObject msgJson = new JSONObject();
      msgJson.put("msg", msg);
      msgJson.put("senderId", senderUserId);
      messageHandler.sendMessageToUser(sendTo, msgJson.toString());
    } catch (IOException e) {
      return;
    }
  }

  @Override
  public List<Message> getMessageHistory(Long fromUserId, Long toUserId) {
    return messageRepository.findByFromUserIdAndToUserId(fromUserId, toUserId);
  }

  private void storeMessageToUser(Message message) {
    messageRepository.save(message);
  }
}

We are going to use message brokers as MessageSender and MessageReceiver. We’re going to define a KafkaListener listening to SEND_MESSAGE topic inside MessageReceiver, which is actually sending message to the target user. Here notice that MessageReceiver is not a client method; because this is a server side message orchestration application, it does not directly serve as a client interface. Above we implemented the WebSocket sessions where the actual communication with the clients happens, but the client application is out of our scope.

MessageReceiver

package com.endpoint.SpringKafkaMessaging.message.broker;

import com.endpoint.SpringKafkaMessaging.message.MessageService;
import com.endpoint.SpringKafkaMessaging.websocket.MessageHandler;
import com.endpoint.SpringKafkaMessaging.websocket.WebSocketPool;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class MessageReceiver {

  private static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);

  @Autowired
  MessageService messageService;

  @Autowired
  MessageHandler messageHandler;

  @KafkaListener(topics = "SEND_MESSAGE")
  public void messagesSendToUser(@Payload String message, @Headers MessageHeaders headers) {

    JSONObject jsonObject = new JSONObject(message);

    if (WebSocketPool.websockets.get(jsonObject.getString("sendTo")) != null) {

      String accessToken = jsonObject.getString("accessToken");
      Long sendTo = Long.parseLong(jsonObject.getString("sendTo"));
      String msg = jsonObject.getString("msg");

      messageService.sendMessage(accessToken, sendTo, msg);

    }
  }

}

Here’s what our simple authentication service looks like:

AuthService

package com.endpoint.SpringKafkaMessaging.auth;

public interface AuthService {
  void putAccessToken(String accessToken, Long userId);
  Long loginWithAccessToken(String code);
}

AuthServiceImpl

package com.endpoint.SpringKafkaMessaging.auth;

import com.endpoint.SpringKafkaMessaging.persistent.model.User;
import com.endpoint.SpringKafkaMessaging.persistent.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.endpoint.SpringKafkaMessaging.cache.respository.CacheRepository;
import com.endpoint.SpringKafkaMessaging.persistent.model.AccessToken;
import com.endpoint.SpringKafkaMessaging.persistent.repository.AccessTokenRepository;

import java.util.Calendar;
import java.util.UUID;

@Service
public class AuthServiceImpl implements AuthService {

  @Autowired
  CacheRepository cacheRepository;

  @Autowired
  AccessTokenRepository accessTokenRepository;

  @Autowired
  UserRepository userRepository;

  @Override
  public void putAccessToken(String token, Long userId) {

    // store token in the cache
    cacheRepository.putAccessToken(token, String.valueOf(userId));

    // store token in the persistence
    AccessToken accessToken = AccessToken.builder()
                  .token(token)
                  .userId(userId)
                  .createdAt(Calendar.getInstance().getTime())
                  .build();
    accessTokenRepository.save(accessToken);
  }

  @Override
  public Long loginWithAccessToken(String token) {
    String userIdStr = cacheRepository.getUserIdByAccessToken(token);
    if (userIdStr == null) {
      User user = userRepository.findByToken(token);
      if (user != null)
        return user.getUserId();
      else
        return 0L;
    }
    return Long.valueOf(userIdStr);
  }
}

AuthController

package com.endpoint.SpringKafkaMessaging.auth.controller;

import javax.validation.Valid;

import com.endpoint.SpringKafkaMessaging.persistent.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.endpoint.SpringKafkaMessaging.auth.AuthService;
import com.endpoint.SpringKafkaMessaging.cache.respository.CacheRepository;
import com.endpoint.SpringKafkaMessaging.persistent.repository.UserRepository;
import com.endpoint.SpringKafkaMessaging.util.StringHelper;

import java.util.UUID;


@RestController
@RequestMapping("/api/auth")
public class AuthController {

  @Autowired
  UserRepository userRepository;

  @Autowired
  AuthService authService;

  @Autowired
  CacheRepository cacheRepository;
  
  @RequestMapping(value = "/getcode", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  public ResponseEntity<Object> getCode(@Valid @RequestBody ActivationRequest activationRequest) {
    
    int code = StringHelper.generateRandomNumber(6);
    
    // save the activation code to the cache repository (cached auth token)
    cacheRepository.putActivationCode(activationRequest.getMobile(), String.valueOf(code));

    ActivationResponse activationResponse = ActivationResponse.builder()
        .mobile(activationRequest.getMobile())
        .activationCode(String.valueOf(code))
        .build();
    
    return new ResponseEntity<>(
        activationResponse,
        HttpStatus.OK);
  }
  
  @RequestMapping(value = "/login", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  public ResponseEntity<Object> login(@RequestBody LoginRequest loginRequest) {
    String mobile = cacheRepository.queryMobileActivationCode(loginRequest.getMobile(), loginRequest.getActivationCode());

    if (mobile == null) {
      return new ResponseEntity<>(
          "Mobile number not found!",
          HttpStatus.NOT_FOUND);
    } else {
      Long userId = 0L;
      User user = userRepository.findByMobile(loginRequest.getMobile());
      if (user == null) {
        // save user in persistence
        userRepository.save(
            User.builder()
            .mobile(loginRequest.getMobile())
            .build()
        );
        user = userRepository.findByMobile(loginRequest.getMobile());
      }
      userId = user.getUserId();
      String accessToken = UUID.randomUUID().toString();
      authService.putAccessToken(accessToken, userId);

      return new ResponseEntity<>(
          LoginResponse.builder()
              .accessToken(accessToken)
              .build(),
          HttpStatus.OK);
    }
  }

}

Here the authentication controller is consumed by a client application, which will use the /getcode endpoint to get an activation code for a mobile number. Then the client application will use the /login endpoint, passing their activation code. Once the /login endpoint confirms the activation code, it will return an access token to the client for future requests. The client application will be responsible for adding the accessToken to all WebSocket session messages. Here we used a StringHelper method to generate the activation code and a random UUID string to generate an accessToken. Since UUID is not an ideal random token generator for authentication, you should use your own token generator algorithm.

java spring kafka spring-kafka-series


Comments

Popular Tags


Archive


Search our blog