package com.acgist.snail.net.torrent.utp;

import com.acgist.snail.config.UtpConfig;
import com.acgist.snail.context.exception.NetException;
import com.acgist.snail.net.IMessageEncryptSender;
import com.acgist.snail.net.UdpMessageHandler;
import com.acgist.snail.net.torrent.peer.PeerCryptMessageCodec;
import com.acgist.snail.net.torrent.peer.PeerSubMessageHandler;
import com.acgist.snail.net.torrent.peer.PeerUnpackMessageCodec;
import com.acgist.snail.utils.CollectionUtils;
import com.acgist.snail.utils.DateUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/acgist/snail/net/torrent/utp/UtpMessageHandler.class */
public final class UtpMessageHandler extends UdpMessageHandler implements IMessageEncryptSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(UtpMessageHandler.class);
    private static final int UTP_HEADER_LENGTH = 20;
    private static final int UTP_HEADER_MIN_LENGTH = 20;
    private static final int UTP_EXT_MIN_LENGTH = 2;
    private boolean connect;
    private final short recvId;
    private final short sendId;
    private final UtpService utpService;
    private final UtpWindow sendWindow;
    private final UtpWindow recvWindow;
    private final AtomicInteger ackLossTimes;
    private final AtomicBoolean connectLock;
    private final PeerSubMessageHandler peerSubMessageHandler;

    public UtpMessageHandler(short s, InetSocketAddress inetSocketAddress) {
        this(PeerSubMessageHandler.newInstance(), inetSocketAddress, s, true);
    }

    public UtpMessageHandler(PeerSubMessageHandler peerSubMessageHandler, InetSocketAddress inetSocketAddress) {
        this(peerSubMessageHandler, inetSocketAddress, (short) 0, false);
    }

    private UtpMessageHandler(PeerSubMessageHandler peerSubMessageHandler, InetSocketAddress inetSocketAddress, short s, boolean z) {
        this.peerSubMessageHandler = peerSubMessageHandler;
        this.peerSubMessageHandler.messageEncryptSender(this);
        this.messageCodec = new PeerCryptMessageCodec(new PeerUnpackMessageCodec(this.peerSubMessageHandler), this.peerSubMessageHandler);
        this.utpService = UtpService.getInstance();
        this.sendWindow = UtpWindow.newSendInstance();
        this.recvWindow = UtpWindow.newRecvInstance(this.messageCodec);
        this.ackLossTimes = new AtomicInteger(0);
        this.connectLock = new AtomicBoolean(false);
        this.socketAddress = inetSocketAddress;
        if (z) {
            this.sendId = s;
            this.recvId = (short) (this.sendId + 1);
        } else {
            this.recvId = this.utpService.connectionId();
            this.sendId = (short) (this.recvId + 1);
        }
        this.utpService.put(this);
    }

    public String key() {
        return this.utpService.buildKey(this.recvId, this.socketAddress);
    }

    @Override // com.acgist.snail.net.UdpMessageHandler, com.acgist.snail.net.IMessageReceiver
    public void onReceive(ByteBuffer byteBuffer, InetSocketAddress inetSocketAddress) throws NetException {
        if (byteBuffer.remaining() < 20) {
            throw new NetException("处理UTP消息错误（长度）：" + byteBuffer.remaining());
        }
        byte b = byteBuffer.get();
        UtpConfig.Type of = UtpConfig.Type.of(b);
        if (of == null) {
            throw new NetException("未知UTP消息类型：" + b);
        }
        byte b2 = byteBuffer.get();
        short s = byteBuffer.getShort();
        int i = byteBuffer.getInt();
        int i2 = byteBuffer.getInt();
        int i3 = byteBuffer.getInt();
        short s2 = byteBuffer.getShort();
        short s3 = byteBuffer.getShort();
        if (b2 != 0 && byteBuffer.remaining() >= 2) {
            int i4 = byteBuffer.getShort();
            if (i4 <= 0 || byteBuffer.remaining() < i4) {
                throw new NetException("处理UTP消息错误（扩展长度）：" + i4);
            }
            byteBuffer.get(new byte[i4]);
        }
        switch (of) {
            case DATA:
                data(i, s2, s3, byteBuffer);
                return;
            case STATE:
                state(i, s2, s3, i3);
                return;
            case FIN:
                fin(i, s2, s3);
                return;
            case RESET:
                reset(i, s2, s3);
                return;
            case SYN:
                syn(i, s2, s3);
                return;
            default:
                LOGGER.warn("处理UTP消息错误（未知类型），类型：{}，扩展：{}，连接ID：{}，时间戳：{}，时间差：{}，窗口大小：{}，请求编号：{}，应答编号：{}", new Object[]{of, Byte.valueOf(b2), Short.valueOf(s), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Short.valueOf(s2), Short.valueOf(s3)});
                return;
        }
    }

    @Override // com.acgist.snail.net.UdpMessageHandler, com.acgist.snail.net.IMessageSender
    public void send(ByteBuffer byteBuffer, int i) throws NetException {
        sendPacket(byteBuffer);
    }

    @Override // com.acgist.snail.net.IMessageEncryptSender
    public void sendEncrypt(ByteBuffer byteBuffer, int i) throws NetException {
        this.messageCodec.encode(byteBuffer);
        sendPacket(byteBuffer);
    }

    private void sendPacket(ByteBuffer byteBuffer) throws NetException {
        check(byteBuffer);
        while (true) {
            int remaining = byteBuffer.remaining();
            if (remaining <= 0) {
                return;
            }
            byte[] bArr = remaining > 1452 ? new byte[UtpConfig.UTP_PACKET_MAX_LENGTH] : new byte[remaining];
            byteBuffer.get(bArr);
            data(this.sendWindow.build(bArr));
        }
    }

    public boolean connect() {
        this.connect = false;
        syn();
        lockConnect();
        if (!this.connect) {
            closeAll();
        }
        return this.connect;
    }

    public boolean timeoutRetry() {
        List<UtpWindowData> timeoutWindowData = this.sendWindow.timeoutWindowData();
        if (!CollectionUtils.isNotEmpty(timeoutWindowData)) {
            return false;
        }
        data(timeoutWindowData);
        LOGGER.debug("超时数据包重新发送：{}-{}", Short.valueOf(this.sendId), Integer.valueOf(timeoutWindowData.size()));
        return true;
    }

    private void data(List<UtpWindowData> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(utpWindowData -> {
            if (utpWindowData.getPushTimes() <= 3) {
                data(utpWindowData);
            } else {
                LOGGER.warn("发送数据包失败（次数超限）：{}-{}", Short.valueOf(utpWindowData.getSeqnr()), Byte.valueOf(utpWindowData.getPushTimes()));
                this.sendWindow.discard(utpWindowData.getSeqnr());
            }
        });
    }

    private void data(int i, short s, short s2, ByteBuffer byteBuffer) throws NetException {
        LOGGER.debug("处理数据消息：{}", Short.valueOf(s));
        try {
            try {
                this.recvWindow.receive(i, s, byteBuffer);
                state(i, this.recvWindow.seqnr());
            } catch (IOException e) {
                throw new NetException(e);
            }
        } catch (Throwable th) {
            state(i, this.recvWindow.seqnr());
            throw th;
        }
    }

    private void data(UtpWindowData utpWindowData) {
        LOGGER.debug("发送数据消息：{}", Short.valueOf(utpWindowData.getSeqnr()));
        ByteBuffer buildHeader = buildHeader(UtpConfig.Type.DATA, utpWindowData.getLength() + 20);
        int pushUpdateGetTimestamp = utpWindowData.pushUpdateGetTimestamp();
        buildHeader.putShort(this.sendId);
        buildHeader.putInt(pushUpdateGetTimestamp);
        buildHeader.putInt(pushUpdateGetTimestamp - this.recvWindow.timestamp());
        buildHeader.putInt(this.recvWindow.wndSize());
        buildHeader.putShort(utpWindowData.getSeqnr());
        buildHeader.putShort(this.recvWindow.seqnr());
        buildHeader.put(utpWindowData.getData());
        pushMessage(buildHeader);
    }

    private void state(int i, short s, short s2, int i2) {
        UtpWindowData lastUnack;
        LOGGER.debug("处理响应消息：{}", Short.valueOf(s2));
        if (!this.connect) {
            this.connect = available();
            if (this.connect) {
                this.recvWindow.connect(i, (short) (s - 1));
            }
            unlockConnect();
        }
        if (!this.sendWindow.ack(s2, i2)) {
            this.ackLossTimes.set(0);
        } else {
            if (this.ackLossTimes.incrementAndGet() <= 3 || (lastUnack = this.sendWindow.lastUnack()) == null) {
                return;
            }
            LOGGER.debug("UTP消息快速重传：{}-{}", Short.valueOf(s2), Short.valueOf(lastUnack.getSeqnr()));
            data(lastUnack);
        }
    }

    private void state(int i, short s) {
        LOGGER.debug("发送响应消息：{}", Short.valueOf(s));
        int timestampUs = DateUtils.timestampUs();
        ByteBuffer buildHeader = buildHeader(UtpConfig.Type.STATE, 20);
        buildHeader.putShort(this.sendId);
        buildHeader.putInt(timestampUs);
        buildHeader.putInt(timestampUs - i);
        buildHeader.putInt(this.recvWindow.wndSize());
        buildHeader.putShort(this.sendWindow.seqnr());
        buildHeader.putShort(s);
        pushMessage(buildHeader);
    }

    private void fin(int i, short s, short s2) {
        LOGGER.debug("处理结束消息：{}", this.socketAddress);
        if (this.connect) {
            state(i, s);
        }
        closeAll();
    }

    private void fin() {
        LOGGER.debug("发送结束消息：{}", this.socketAddress);
        ByteBuffer buildHeader = buildHeader(UtpConfig.Type.FIN, 20);
        buildHeader.putShort(this.sendId);
        buildHeader.putInt(DateUtils.timestampUs());
        buildHeader.putInt(0);
        buildHeader.putInt(0);
        buildHeader.putShort((short) (this.sendWindow.seqnr() + 1));
        buildHeader.putShort((short) 0);
        pushMessage(buildHeader);
    }

    private void reset(int i, short s, short s2) {
        LOGGER.debug("处理重置消息：{}", this.socketAddress);
        if (this.connect) {
            state(i, s);
        }
        closeAll();
    }

    private void reset() {
        LOGGER.debug("发送重置消息：{}", this.socketAddress);
        ByteBuffer buildHeader = buildHeader(UtpConfig.Type.RESET, 20);
        buildHeader.putShort(this.sendId);
        buildHeader.putInt(DateUtils.timestampUs());
        buildHeader.putInt(0);
        buildHeader.putInt(0);
        buildHeader.putShort((short) (this.sendWindow.seqnr() + 1));
        buildHeader.putShort((short) 0);
        pushMessage(buildHeader);
    }

    private void syn(int i, short s, short s2) {
        LOGGER.debug("处理握手消息");
        if (!this.connect) {
            this.connect = true;
            this.recvWindow.connect(i, s);
        }
        state(i, s);
    }

    private void syn() {
        LOGGER.debug("发送握手消息");
        UtpWindowData build = this.sendWindow.build();
        ByteBuffer buildHeader = buildHeader(UtpConfig.Type.SYN, 20);
        buildHeader.putShort(this.recvId);
        buildHeader.putInt(build.pushUpdateGetTimestamp());
        buildHeader.putInt(0);
        buildHeader.putInt(0);
        buildHeader.putShort(build.getSeqnr());
        buildHeader.putShort((short) 0);
        pushMessage(buildHeader);
    }

    private ByteBuffer buildHeader(UtpConfig.Type type, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(i);
        allocate.put(type.typeVersion());
        allocate.put((byte) 0);
        return allocate;
    }

    private void pushMessage(ByteBuffer byteBuffer) {
        try {
            send(byteBuffer, remoteSocketAddress());
        } catch (NetException e) {
            LOGGER.error("发送UTP消息异常", e);
        }
    }

    private void lockConnect() {
        if (this.connectLock.get()) {
            return;
        }
        synchronized (this.connectLock) {
            if (!this.connectLock.get()) {
                try {
                    this.connectLock.wait(5000L);
                } catch (InterruptedException e) {
                    LOGGER.debug("线程等待异常", e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void unlockConnect() {
        synchronized (this.connectLock) {
            this.connectLock.set(true);
            this.connectLock.notifyAll();
        }
    }

    private void closeWindow() {
        this.sendWindow.close();
        this.recvWindow.close();
    }

    private void closeAll() {
        super.close();
        this.connect = false;
        this.utpService.remove(this);
    }

    @Override // com.acgist.snail.net.UdpMessageHandler, com.acgist.snail.net.IMessageSender
    public void close() {
        LOGGER.debug("关闭UTP");
        closeWindow();
        fin();
        closeAll();
    }

    public void resetAndClose() {
        LOGGER.debug("重置UTP");
        closeWindow();
        reset();
        closeAll();
    }
}
