/*
 * Decompiled with CFR 0.152.
 */
package net.consensys.cava.rlpx.vertx;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import net.consensys.cava.bytes.Bytes;
import net.consensys.cava.bytes.Bytes32;
import net.consensys.cava.concurrent.AsyncCompletion;
import net.consensys.cava.concurrent.CompletableAsyncCompletion;
import net.consensys.cava.crypto.SECP256K1;
import net.consensys.cava.rlpx.HandshakeMessage;
import net.consensys.cava.rlpx.MemoryWireConnectionsRepository;
import net.consensys.cava.rlpx.RLPxConnection;
import net.consensys.cava.rlpx.RLPxConnectionFactory;
import net.consensys.cava.rlpx.RLPxMessage;
import net.consensys.cava.rlpx.RLPxService;
import net.consensys.cava.rlpx.WireConnectionRepository;
import net.consensys.cava.rlpx.wire.DefaultWireConnection;
import net.consensys.cava.rlpx.wire.DisconnectReason;
import net.consensys.cava.rlpx.wire.SubProtocol;
import net.consensys.cava.rlpx.wire.SubProtocolHandler;
import net.consensys.cava.rlpx.wire.SubProtocolIdentifier;
import net.consensys.cava.rlpx.wire.WireConnection;
import org.logl.Logger;
import org.logl.LoggerProvider;

public final class VertxRLPxService
implements RLPxService {
    private static final int DEVP2P_VERSION = 5;
    private final LoggerProvider loggerProvider;
    private final Logger logger;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Vertx vertx;
    private final int listenPort;
    private final String networkInterface;
    private final int advertisedPort;
    private final SECP256K1.KeyPair keyPair;
    private final List<SubProtocol> subProtocols;
    private final String clientId;
    private LinkedHashMap<SubProtocol, SubProtocolHandler> handlers;
    private NetClient client;
    private NetServer server;
    private final WireConnectionRepository repository;

    private static void checkPort(int port) {
        if (port < 0 || port > 65536) {
            throw new IllegalArgumentException("Invalid port: " + port);
        }
    }

    public VertxRLPxService(Vertx vertx, LoggerProvider loggerProvider, int listenPort, String networkInterface, int advertisedPort, SECP256K1.KeyPair identityKeyPair, List<SubProtocol> subProtocols, String clientId) {
        this(vertx, loggerProvider, listenPort, networkInterface, advertisedPort, identityKeyPair, subProtocols, clientId, new MemoryWireConnectionsRepository());
    }

    public VertxRLPxService(Vertx vertx, LoggerProvider loggerProvider, int listenPort, String networkInterface, int advertisedPort, SECP256K1.KeyPair identityKeyPair, List<SubProtocol> subProtocols, String clientId, WireConnectionRepository repository) {
        VertxRLPxService.checkPort(listenPort);
        VertxRLPxService.checkPort(advertisedPort);
        if (clientId == null || clientId.trim().isEmpty()) {
            throw new IllegalArgumentException("Client ID must contain a valid identifier");
        }
        this.vertx = vertx;
        this.loggerProvider = loggerProvider;
        this.listenPort = listenPort;
        this.networkInterface = networkInterface;
        this.advertisedPort = advertisedPort;
        this.keyPair = identityKeyPair;
        this.subProtocols = subProtocols;
        this.clientId = clientId;
        this.repository = repository;
        this.logger = loggerProvider.getLogger("VertxRLPxService");
    }

    @Override
    public AsyncCompletion start() {
        if (this.started.compareAndSet(false, true)) {
            this.handlers = new LinkedHashMap();
            for (SubProtocol subProtocol : this.subProtocols) {
                this.handlers.put(subProtocol, subProtocol.createHandler(this));
            }
            this.client = this.vertx.createNetClient(new NetClientOptions());
            this.server = this.vertx.createNetServer(new NetServerOptions().setPort(this.listenPort).setHost(this.networkInterface).setTcpKeepAlive(true)).connectHandler(this::receiveMessage);
            CompletableAsyncCompletion complete = AsyncCompletion.incomplete();
            this.server.listen(res -> {
                if (res.succeeded()) {
                    complete.complete();
                } else {
                    complete.completeExceptionally(res.cause());
                }
            });
            return complete;
        }
        return AsyncCompletion.completed();
    }

    @Override
    public void send(SubProtocolIdentifier subProtocolIdentifier, int messageType, String connectionId, Bytes message) {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        DefaultWireConnection conn = this.wireConnection(connectionId);
        if (conn != null) {
            conn.sendMessage(subProtocolIdentifier, messageType, message);
        }
    }

    @Override
    public void disconnect(String connectionId, DisconnectReason disconnectReason) {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        DefaultWireConnection conn = this.wireConnection(connectionId);
        if (conn != null) {
            conn.disconnect(disconnectReason);
        }
    }

    @Override
    public void broadcast(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        for (WireConnection conn : this.repository.asIterable()) {
            ((DefaultWireConnection)conn).sendMessage(subProtocolIdentifier, messageType, message);
        }
    }

    private void receiveMessage(final NetSocket netSocket) {
        netSocket.handler((Handler)new Handler<Buffer>(){
            private RLPxConnection conn;
            private DefaultWireConnection wireConnection;

            public void handle(Buffer buffer) {
                if (this.conn == null) {
                    this.conn = RLPxConnectionFactory.respondToHandshake(Bytes.wrapBuffer((Buffer)buffer), VertxRLPxService.this.keyPair, bytes -> netSocket.write(Buffer.buffer((byte[])bytes.toArrayUnsafe())));
                    if (this.wireConnection == null) {
                        this.wireConnection = VertxRLPxService.this.createConnection(this.conn, netSocket);
                        this.wireConnection.handleConnectionStart();
                    }
                } else {
                    this.conn.stream(Bytes.wrapBuffer((Buffer)buffer), this.wireConnection::messageReceived);
                }
            }
        });
    }

    @Override
    public AsyncCompletion stop() {
        if (this.started.compareAndSet(true, false)) {
            for (WireConnection conn : this.repository.asIterable()) {
                ((DefaultWireConnection)conn).disconnect(DisconnectReason.CLIENT_QUITTING);
            }
            this.repository.close();
            this.client.close();
            AsyncCompletion handlersCompletion = AsyncCompletion.allOf((Collection)this.handlers.values().stream().map(SubProtocolHandler::stop).collect(Collectors.toList()));
            CompletableAsyncCompletion completableAsyncCompletion = AsyncCompletion.incomplete();
            this.server.close(res -> {
                if (res.succeeded()) {
                    completableAsyncCompletion.complete();
                } else {
                    completableAsyncCompletion.completeExceptionally(res.cause());
                }
            });
            return handlersCompletion.thenCombine((AsyncCompletion)completableAsyncCompletion);
        }
        return AsyncCompletion.completed();
    }

    public int actualPort() {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        return this.server.actualPort();
    }

    public int advertisedPort() {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        return this.listenPort == 0 ? this.actualPort() : this.advertisedPort;
    }

    @Override
    public WireConnectionRepository repository() {
        return this.repository;
    }

    @Override
    public AsyncCompletion connectTo(final SECP256K1.PublicKey peerPublicKey, InetSocketAddress peerAddress) {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        final CompletableAsyncCompletion connected = AsyncCompletion.incomplete();
        this.logger.debug("Connecting to {} with public key {}", (Object)peerAddress, (Object)peerPublicKey);
        this.client.connect(peerAddress.getPort(), peerAddress.getHostString(), netSocketFuture -> netSocketFuture.map(netSocket -> {
            final Bytes32 nonce = RLPxConnectionFactory.generateRandomBytes32();
            final SECP256K1.KeyPair ephemeralKeyPair = SECP256K1.KeyPair.random();
            final Bytes initHandshakeMessage = RLPxConnectionFactory.init(this.keyPair, peerPublicKey, ephemeralKeyPair, nonce);
            this.logger.debug("Initiating handshake to {}", (Object)peerAddress);
            netSocket.write(Buffer.buffer((byte[])initHandshakeMessage.toArrayUnsafe()));
            netSocket.handler((Handler)new Handler<Buffer>(){
                private RLPxConnection conn;
                private DefaultWireConnection wireConnection;

                public void handle(Buffer buffer) {
                    try {
                        Bytes messageBytes = Bytes.wrapBuffer((Buffer)buffer);
                        if (this.conn == null) {
                            int messageSize = RLPxConnectionFactory.messageSize(messageBytes);
                            Bytes responseBytes = messageBytes;
                            if (messageBytes.size() > messageSize) {
                                responseBytes = responseBytes.slice(0, messageSize);
                            }
                            messageBytes = messageBytes.slice(messageSize);
                            HandshakeMessage responseMessage = RLPxConnectionFactory.readResponse(responseBytes, VertxRLPxService.this.keyPair.secretKey());
                            this.conn = RLPxConnectionFactory.createConnection(true, initHandshakeMessage, responseBytes, ephemeralKeyPair.secretKey(), responseMessage.ephemeralPublicKey(), nonce, responseMessage.nonce(), VertxRLPxService.this.keyPair.publicKey(), peerPublicKey);
                            this.wireConnection = VertxRLPxService.this.createConnection(this.conn, netSocket);
                            connected.complete();
                            if (messageBytes.isEmpty()) {
                                return;
                            }
                        }
                        if (this.conn != null) {
                            this.conn.stream(messageBytes, this.wireConnection::messageReceived);
                        }
                    }
                    catch (Exception e) {
                        VertxRLPxService.this.logger.error(e.getMessage(), (Throwable)e);
                        connected.completeExceptionally((Throwable)e);
                        netSocket.close();
                    }
                }
            });
            return null;
        }));
        return connected;
    }

    private DefaultWireConnection createConnection(RLPxConnection conn, NetSocket netSocket) {
        String id = UUID.randomUUID().toString();
        DefaultWireConnection wireConnection = new DefaultWireConnection(id, conn.publicKey().bytes(), conn.peerPublicKey().bytes(), this.loggerProvider.getLogger("wireConnection-" + id), message -> {
            RLPxConnection rLPxConnection = conn;
            synchronized (rLPxConnection) {
                Bytes bytes = conn.write((RLPxMessage)message);
                this.vertx.eventBus().send(netSocket.writeHandlerID(), (Object)Buffer.buffer((byte[])bytes.toArrayUnsafe()));
            }
        }, conn::configureAfterHandshake, () -> ((NetSocket)netSocket).end(), this.handlers, 5, this.clientId, this.advertisedPort());
        this.repository.add(wireConnection);
        return wireConnection;
    }

    private DefaultWireConnection wireConnection(String id) {
        if (!this.started.get()) {
            throw new IllegalStateException("The RLPx service is not active");
        }
        return (DefaultWireConnection)this.repository.get(id);
    }
}

