package zjtech.websocket.termination.core;

import java.nio.channels.ClosedChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;
import zjtech.websocket.termination.common.WsConnectionException;
import zjtech.websocket.termination.common.WsErrorCode;
import zjtech.websocket.termination.common.WsUtils;
import zjtech.websocket.termination.config.WsConnectionConfigProps;

/* loaded from: input_file:zjtech/websocket/termination/core/SessionHandler.class */
public class SessionHandler {
    private static final Logger log = LoggerFactory.getLogger(SessionHandler.class);
    private final EmitterProcessor<SessionHandler> clientConnectedEventBus;
    private final EmitterProcessor<SessionHandler> clientDisconnectedEventBus;
    private DirectProcessor<String> messageProcessor = DirectProcessor.create();
    private DirectProcessor<WebSocketMessage> pongMessageProcessor = DirectProcessor.create();
    private WsUtils wsUtils;
    private WebSocketSession session;
    private String sessionId;
    private String clientInfo;
    private volatile boolean isConnected;
    private WsConnectionConfigProps configProps;

    public SessionHandler(WsUtils wsUtils, WsConnectionConfigProps wsConnectionConfigProps, EmitterProcessor<SessionHandler> emitterProcessor, EmitterProcessor<SessionHandler> emitterProcessor2) {
        this.clientConnectedEventBus = emitterProcessor;
        this.clientDisconnectedEventBus = emitterProcessor2;
        this.wsUtils = wsUtils;
        this.configProps = wsConnectionConfigProps;
    }

    public Mono<Void> handle(WebSocketSession webSocketSession) {
        init(webSocketSession);
        notifyConnect();
        return handleReceive().then();
    }

    private void init(WebSocketSession webSocketSession) {
        this.session = webSocketSession;
        this.sessionId = this.session.getId();
        this.clientInfo = this.sessionId + "," + this.session.getHandshakeInfo().getRemoteAddress().getHostString();
        this.isConnected = true;
    }

    private Mono<Void> handleReceive() {
        return this.session.receive().takeUntil(webSocketMessage -> {
            return !this.isConnected;
        }).doOnNext(this::publishTextMessage).doOnComplete(() -> {
            completeProcessor();
            notifyDisconnect();
        }).then();
    }

    private void publishTextMessage(WebSocketMessage webSocketMessage) {
        if (webSocketMessage.getType().equals(WebSocketMessage.Type.PONG)) {
            this.pongMessageProcessor.onNext(webSocketMessage);
            return;
        }
        String payloadAsText = webSocketMessage.getPayloadAsText();
        if (!this.configProps.getPing().isSupressLog()) {
            log.info("Receive a message from client '{}' and the message is: {}", this.clientInfo, payloadAsText);
        }
        this.messageProcessor.onNext(payloadAsText);
    }

    public void sendJsonString(Object obj) {
        sendText(this.wsUtils.convertString(obj), this.session);
    }

    public void sendJsonString(Object obj, WebSocketSession webSocketSession) {
        sendText(this.wsUtils.convertString(obj), webSocketSession);
    }

    public void sendText(String str) {
        send(this.session.textMessage(str), this.session);
    }

    public void sendText(String str, WebSocketSession webSocketSession) {
        send(webSocketSession.textMessage(str), webSocketSession);
    }

    public void send(WebSocketMessage webSocketMessage) {
        send(webSocketMessage, this.session);
    }

    public void send(WebSocketMessage webSocketMessage, WebSocketSession webSocketSession) {
        if (this.isConnected) {
            webSocketSession.send(Mono.just(webSocketMessage)).doOnError(th -> {
                if ((th instanceof ClosedChannelException) || (th instanceof AbortedException)) {
                    if (!isPingOrPong(webSocketMessage)) {
                        log.warn("Won't send a message to a disconnected client '{}'", this.clientInfo);
                    }
                    this.isConnected = false;
                }
            }).subscribe();
        } else {
            if (!isPingOrPong(webSocketMessage)) {
                log.warn("Won't send a message to a disconnected client '{}'", this.clientInfo);
            }
            throw new WsConnectionException(WsErrorCode.CLIENT_CLOSED);
        }
    }

    private boolean isPingOrPong(WebSocketMessage webSocketMessage) {
        return webSocketMessage.getType().equals(WebSocketMessage.Type.PONG) || webSocketMessage.getType().equals(WebSocketMessage.Type.PING);
    }

    public DirectProcessor<WebSocketMessage> pong() {
        return this.pongMessageProcessor;
    }

    public void close() {
        if (this.isConnected) {
            log.info("Trying to close termination for client ''", this.clientInfo);
            this.session.close().subscribe();
            this.isConnected = false;
            completeProcessor();
            notifyDisconnect();
        }
    }

    private void completeProcessor() {
        this.messageProcessor.onComplete();
        this.pongMessageProcessor.onComplete();
    }

    public String getClientInfo() {
        return this.clientInfo;
    }

    public DirectProcessor<String> receive() {
        return this.messageProcessor;
    }

    public void notifyConnect() {
        this.clientConnectedEventBus.onNext(this);
    }

    public void notifyDisconnect() {
        this.clientDisconnectedEventBus.onNext(this);
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public boolean isConnected() {
        return this.isConnected;
    }

    public WebSocketSession getSession() {
        return this.session;
    }
}
