package zjtech.websocket.termination.core;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Mono;
import zjtech.websocket.termination.api.ErrorResponse;
import zjtech.websocket.termination.api.Request;
import zjtech.websocket.termination.common.RequestWrapper;
import zjtech.websocket.termination.common.WsConnectionException;
import zjtech.websocket.termination.common.WsErrorCode;
import zjtech.websocket.termination.common.WsUtils;
import zjtech.websocket.termination.core.CommandMappingHandler;

/* loaded from: input_file:zjtech/websocket/termination/core/MessageHandler.class */
public class MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(MessageHandler.class);
    private final IRequestParser requestParser;
    private final CommandMappingHandler mappingHandler;
    private final WsUtils wsUtils;

    public MessageHandler(IRequestParser iRequestParser, CommandMappingHandler commandMappingHandler, WsUtils wsUtils) {
        this.requestParser = iRequestParser;
        this.mappingHandler = commandMappingHandler;
        this.wsUtils = wsUtils;
    }

    public Mono<Void> handle(SessionHandler sessionHandler) {
        return sessionHandler.receive().doOnNext(str -> {
            Mono.just(str).map(str -> {
                return this.requestParser.parse(str);
            }).map(requestWrapper -> {
                return constructContext(sessionHandler, requestWrapper);
            }).doOnNext(this::invokeConsumer).doOnError(th -> {
                handleError(sessionHandler, str, th);
            }).subscribe();
        }).then();
    }

    private void invokeConsumer(IConsumerContext iConsumerContext) {
        CommandMappingHandler.ConsumerInfo consumeInfo = this.mappingHandler.getConsumeInfo(iConsumerContext.getCommand());
        Object bean = this.wsUtils.getBean(consumeInfo.getConsumeClass());
        if (bean == null) {
            log.warn("No consumer found for consuming the message associated with command type '{}'", iConsumerContext.getCommand());
        } else {
            ReflectionUtils.invokeMethod(consumeInfo.getMethod(), bean, new Object[]{iConsumerContext});
        }
    }

    private IConsumerContext constructContext(SessionHandler sessionHandler, Request request) {
        RequestWrapper requestWrapper = (RequestWrapper) request;
        ConsumerContext consumerContext = new ConsumerContext();
        consumerContext.setSessionHandler(sessionHandler);
        consumerContext.setCommand(requestWrapper.getCommand());
        consumerContext.setHeaders(requestWrapper.getHeader());
        consumerContext.setPayload(requestWrapper.getPayload());
        return consumerContext;
    }

    private void handleError(SessionHandler sessionHandler, String str, Throwable th) {
        ErrorResponse errorResponse = new ErrorResponse();
        if (th instanceof WsConnectionException) {
            log.warn("Received a unknown message from client '{}' and the message is '{}'", new Object[]{sessionHandler.getClientInfo(), str, ((WsConnectionException) th).getRawException()});
            WsErrorCode errorCode = ((WsConnectionException) th).getErrorCode();
            errorResponse.setErrorCode(400);
            errorResponse.setErrorMessage(errorCode.name());
        } else {
            log.warn(String.format("An internal exception occurs for session {%s}, the message is: %s ", sessionHandler.getSessionId(), str), th);
            errorResponse.setErrorCode(500);
            errorResponse.setErrorMessage(th.getMessage());
        }
        sessionHandler.sendJsonString(errorResponse);
    }
}
