package space.npstr.magma.connections;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Objects;
import java.util.logging.Level;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import space.npstr.magma.events.audio.ws.in.InboundWsEvent;
import space.npstr.magma.events.audio.ws.out.OutboundWsEvent;

/* loaded from: input_file:space/npstr/magma/connections/AudioWebSocketSessionHandler.class */
public class AudioWebSocketSessionHandler extends BaseSubscriber<OutboundWsEvent> implements WebSocketHandler {
    private static final Logger log = LoggerFactory.getLogger(AudioWebSocketSessionHandler.class);
    private final Subscriber<InboundWsEvent> inbound;
    private volatile Flux<OutboundWsEvent> intermediaryOutbound;
    private volatile FluxSink<OutboundWsEvent> intermediaryOutboundSink;

    @Nullable
    private WebSocketSession session;

    public AudioWebSocketSessionHandler(Subscriber<InboundWsEvent> subscriber) {
        prepareConnect();
        this.inbound = subscriber;
    }

    public void close() {
        if (this.session != null) {
            this.session.close().publishOn(Schedulers.parallel()).subscribe();
        }
    }

    public void prepareConnect() {
        UnicastProcessor create = UnicastProcessor.create();
        this.intermediaryOutboundSink = create.sink();
        this.intermediaryOutbound = create;
    }

    public Mono<Void> handle(WebSocketSession webSocketSession) {
        this.session = webSocketSession;
        log.trace("Handshake: {}", webSocketSession.getHandshakeInfo());
        webSocketSession.receive().map((v0) -> {
            return v0.getPayloadAsText();
        }).log(log.getName() + ".>>>", Level.FINEST, new SignalType[0]).map(InboundWsEvent::from).doOnTerminate(() -> {
            log.trace("Receiving terminated");
        }).publishOn(Schedulers.parallel()).subscribe(this.inbound);
        Flux log2 = this.intermediaryOutbound.map((v0) -> {
            return v0.asMessage();
        }).log(log.getName() + ".<<<", Level.FINEST, new SignalType[0]);
        Objects.requireNonNull(webSocketSession);
        return webSocketSession.send(log2.map(webSocketSession::textMessage)).doOnTerminate(() -> {
            log.trace("Sending terminated");
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hookOnNext(OutboundWsEvent outboundWsEvent) {
        this.intermediaryOutboundSink.next(outboundWsEvent);
    }
}
