package de.esoco.coroutine.step.nio;

import de.esoco.coroutine.Continuation;
import de.esoco.coroutine.Coroutine;
import de.esoco.coroutine.CoroutineException;
import de.esoco.coroutine.CoroutineStep;
import de.esoco.coroutine.Suspension;
import de.esoco.coroutine.step.nio.AsynchronousChannelStep;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.obrel.core.RelationType;
import org.obrel.core.RelationTypeModifier;
import org.obrel.core.RelationTypes;
import org.obrel.type.MetaTypes;

/* loaded from: input_file:de/esoco/coroutine/step/nio/AsynchronousSocketStep.class */
public abstract class AsynchronousSocketStep extends AsynchronousChannelStep<ByteBuffer, ByteBuffer> {
    public static final RelationType<AsynchronousSocketChannel> SOCKET_CHANNEL = RelationTypes.newType(new RelationTypeModifier[0]);
    private final Function<Continuation<?>, SocketAddress> fGetSocketAddress;

    public AsynchronousSocketStep(Function<Continuation<?>, SocketAddress> function) {
        Objects.requireNonNull(function);
        this.fGetSocketAddress = function;
    }

    @Override // de.esoco.coroutine.CoroutineStep
    public void runAsync(CompletableFuture<ByteBuffer> completableFuture, CoroutineStep<ByteBuffer, ?> coroutineStep, Continuation<?> continuation) {
        completableFuture.thenAcceptAsync(byteBuffer -> {
            connectAsync(byteBuffer, continuation.suspend(this, coroutineStep));
        }, (Executor) continuation);
    }

    protected abstract boolean performAsyncOperation(int i, AsynchronousSocketChannel asynchronousSocketChannel, ByteBuffer byteBuffer, AsynchronousChannelStep.ChannelCallback<Integer, AsynchronousSocketChannel> channelCallback) throws Exception;

    protected abstract void performBlockingOperation(AsynchronousSocketChannel asynchronousSocketChannel, ByteBuffer byteBuffer) throws Exception;

    protected ByteBuffer execute(ByteBuffer byteBuffer, Continuation<?> continuation) {
        try {
            AsynchronousSocketChannel socketChannel = getSocketChannel(continuation);
            if (socketChannel.getRemoteAddress() == null) {
                socketChannel.connect(getSocketAddress(continuation)).get();
            }
            performBlockingOperation(socketChannel, byteBuffer);
            return byteBuffer;
        } catch (Exception e) {
            throw new CoroutineException(e);
        }
    }

    protected SocketAddress getSocketAddress(Continuation<?> continuation) {
        return this.fGetSocketAddress.apply(continuation);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Function<Continuation<?>, SocketAddress> getSocketAddressFactory() {
        return this.fGetSocketAddress;
    }

    protected AsynchronousSocketChannel getSocketChannel(Continuation<?> continuation) throws IOException {
        Coroutine<?, ?> currentCoroutine = continuation.getCurrentCoroutine();
        AsynchronousSocketChannel asynchronousSocketChannel = (AsynchronousSocketChannel) currentCoroutine.get(SOCKET_CHANNEL);
        if (asynchronousSocketChannel == null || !asynchronousSocketChannel.isOpen()) {
            asynchronousSocketChannel = AsynchronousSocketChannel.open(getChannelGroup(continuation));
            currentCoroutine.set(SOCKET_CHANNEL, asynchronousSocketChannel).annotate(MetaTypes.MANAGED);
        }
        return asynchronousSocketChannel;
    }

    private void connectAsync(ByteBuffer byteBuffer, Suspension<ByteBuffer> suspension) {
        try {
            AsynchronousSocketChannel socketChannel = getSocketChannel(suspension.continuation());
            if (socketChannel.getRemoteAddress() == null) {
                socketChannel.connect(this.fGetSocketAddress.apply(suspension.continuation()), byteBuffer, new AsynchronousChannelStep.ChannelCallback(socketChannel, suspension, this::performAsyncOperation));
            } else {
                performAsyncOperation(-2, socketChannel, byteBuffer, new AsynchronousChannelStep.ChannelCallback<>(socketChannel, suspension, this::performAsyncOperation));
            }
        } catch (Exception e) {
            suspension.fail(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // de.esoco.coroutine.CoroutineStep
    public /* bridge */ /* synthetic */ Object execute(Object obj, Continuation continuation) {
        return execute((ByteBuffer) obj, (Continuation<?>) continuation);
    }

    static {
        RelationTypes.init(AsynchronousSocketStep.class);
    }
}
