package org.lockss.protocol;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLSocket;
import junit.framework.Test;
import junit.framework.TestCase;
import org.lockss.config.ConfigManager;
import org.lockss.config.Configuration;
import org.lockss.crawler.TestBaseCrawler;
import org.lockss.protocol.BlockingPeerChannel;
import org.lockss.protocol.BlockingStreamComm;
import org.lockss.protocol.IdentityManager;
import org.lockss.protocol.LcapStreamComm;
import org.lockss.protocol.PeerAddress;
import org.lockss.test.ConfigurationUtil;
import org.lockss.test.InternalServerSocket;
import org.lockss.test.InternalSocket;
import org.lockss.test.LockssTestCase;
import org.lockss.test.MockLockssDaemon;
import org.lockss.test.SimpleBinarySemaphore;
import org.lockss.test.SimpleQueue;
import org.lockss.test.StringInputStream;
import org.lockss.test.TcpTestUtil;
import org.lockss.util.ByteArray;
import org.lockss.util.IDUtil;
import org.lockss.util.IOUtil;
import org.lockss.util.ListUtil;
import org.lockss.util.Logger;
import org.lockss.util.Queue;
import org.lockss.util.SetUtil;
import org.lockss.util.StreamUtil;
import org.lockss.util.net.IPAddr;
import org.lockss.util.time.Deadline;
import org.lockss.util.time.TimeBase;

/* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm.class */
public class TestBlockingStreamComm extends LockssTestCase {
    static final int MAX_COMMS = 5;
    static final String ALT_IP_ADDR = "::1";
    static final int HEADER_LEN = 14;
    static final int HEADER_CHECK = -1;
    static final int HEADER_OFF_CHECK = 0;
    static final int HEADER_OFF_OP = 1;
    static final int HEADER_OFF_LEN = 2;
    static final int HEADER_OFF_PROTO = 10;
    static final byte OP_PEERID = 1;
    static final byte OP_DATA = 2;
    static final String BAD_PROTO = "SSLv3";
    int[] testports;
    PeerIdentity[] pids;
    PeerAddress.Tcp[] pads;
    MyBlockingStreamComm[] comms;
    SimpleQueue[] rcvdMsgss;
    int testport1;
    int testport2;
    PeerIdentity pid1;
    PeerIdentity pid2;
    PeerAddress.Tcp pad1;
    PeerAddress.Tcp pad2;
    MyBlockingStreamComm comm1;
    MyBlockingStreamComm comm2;
    SimpleQueue rcvdMsgs1;
    SimpleQueue rcvdMsgs2;
    String testStr1;
    String testStr2;
    String testStr3;
    PeerMessage msg1;
    PeerMessage msg2;
    PeerMessage msg3;
    private byte[] rcvHeader;
    private byte[] peerbuf;
    private MockLockssDaemon daemon;
    Properties cprops;
    SimpleBinarySemaphore sem1;
    SimpleBinarySemaphore sem2;
    SimpleQueue assocQ;
    SimpleQueue assocQ2;
    boolean useInternalSockets;
    protected boolean shutdownOutputSupported;
    protected boolean isCheckSocketType;
    public static Class[] testedClasses = {BlockingStreamComm.class, BlockingPeerChannel.class};
    static Logger log = Logger.getLogger();
    static int createCounter = 1;

    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$Buff.class */
    public static class Buff extends TestBlockingStreamComm {
        public Buff(String str) {
            super(str);
        }

        @Override // org.lockss.protocol.TestBlockingStreamComm
        public void addSuiteProps(Properties properties) {
            properties.setProperty("org.lockss.scomm.bufferedSend", "true");
            properties.setProperty("org.lockss.scomm.tcpNoDelay", "false");
        }
    }

    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$BuffNoDelay.class */
    public static class BuffNoDelay extends TestBlockingStreamComm {
        public BuffNoDelay(String str) {
            super(str);
        }

        @Override // org.lockss.protocol.TestBlockingStreamComm
        public void addSuiteProps(Properties properties) {
            properties.setProperty("org.lockss.scomm.bufferedSend", "true");
            properties.setProperty("org.lockss.scomm.tcpNoDelay", "true");
        }

        public void testIsNoDelay() throws IOException {
            testSockOpts(true, true);
        }
    }

    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$HighPri.class */
    public static class HighPri extends TestBlockingStreamComm {
        public HighPri(String str) {
            super(str);
        }

        @Override // org.lockss.protocol.TestBlockingStreamComm
        public void addSuiteProps(Properties properties) {
            properties.setProperty("org.lockss.thread.Channel.priority", "5");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$MessageHandler.class */
    public class MessageHandler implements LcapStreamComm.MessageHandler {
        SimpleQueue queue;

        public MessageHandler(SimpleQueue simpleQueue) {
            this.queue = simpleQueue;
        }

        public void handleMessage(PeerMessage peerMessage) {
            TestBlockingStreamComm.log.debug("handleMessage(" + peerMessage + ")");
            this.queue.put(peerMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$MyBlockingPeerChannel.class */
    public static class MyBlockingPeerChannel extends BlockingPeerChannel {
        volatile SimpleBinarySemaphore stopSem;
        volatile SimpleBinarySemaphore calcSendWaitSem;
        volatile int calcSendWaitCtr;
        boolean simulateSendBusy;
        Socket sock;

        MyBlockingPeerChannel(BlockingStreamComm blockingStreamComm, PeerIdentity peerIdentity) {
            super(blockingStreamComm, peerIdentity);
            this.calcSendWaitCtr = TestBlockingStreamComm.HEADER_OFF_CHECK;
            this.simulateSendBusy = false;
        }

        MyBlockingPeerChannel(BlockingStreamComm blockingStreamComm, Socket socket) {
            super(blockingStreamComm, socket);
            this.calcSendWaitCtr = TestBlockingStreamComm.HEADER_OFF_CHECK;
            this.simulateSendBusy = false;
        }

        void setSocket(Socket socket) {
            this.sock = socket;
        }

        Socket getSocket() {
            return this.sock;
        }

        void stopChannel(boolean z, String str, Throwable th) {
            super.stopChannel(z, str, th);
            if (this.stopSem != null) {
                this.stopSem.give();
            }
        }

        void setStopSem(SimpleBinarySemaphore simpleBinarySemaphore) {
            this.stopSem = simpleBinarySemaphore;
        }

        Deadline calcSendWaitDeadline() {
            Deadline calcSendWaitDeadline = super.calcSendWaitDeadline();
            this.calcSendWaitCtr++;
            if (this.calcSendWaitSem != null) {
                this.calcSendWaitSem.give();
            }
            return calcSendWaitDeadline;
        }

        void setCalcSendWaitSem(SimpleBinarySemaphore simpleBinarySemaphore) {
            this.calcSendWaitSem = simpleBinarySemaphore;
        }

        void setSimulateSendBusy(boolean z) {
            this.simulateSendBusy = z;
        }

        boolean isSendIdle() {
            if (this.simulateSendBusy) {
                return false;
            }
            return super.isSendIdle();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$MyBlockingStreamComm.class */
    public class MyBlockingStreamComm extends BlockingStreamComm {
        Properties instanceConfig;
        BlockingStreamComm.SocketFactory sockFact = null;
        PeerIdentity localId;
        SimpleQueue assocEvents;
        SimpleQueue handshakeEvents;
        SimpleBinarySemaphore acceptSem;
        BlockingStreamComm.SocketFactory superSockFact;
        int uniqueId;
        boolean isSsl;
        boolean isClientAuth;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MyBlockingStreamComm(PeerIdentity peerIdentity) {
            this.localId = peerIdentity;
            int i = TestBlockingStreamComm.createCounter;
            TestBlockingStreamComm.createCounter = i + 1;
            this.uniqueId = i;
        }

        public void setConfig(Configuration configuration, Configuration configuration2, Configuration.Differences differences) {
            if (this.instanceConfig != null) {
                Configuration newConfiguration = ConfigManager.newConfiguration();
                newConfiguration.copyFrom(configuration);
                for (Map.Entry entry : this.instanceConfig.entrySet()) {
                    newConfiguration.put((String) entry.getKey(), (String) entry.getValue());
                }
                configuration = newConfiguration;
            }
            super.setConfig(configuration, configuration2, differences);
            this.isSsl = configuration.getBoolean("org.lockss.scomm.v3OverSsl", false);
            this.isClientAuth = configuration.getBoolean("org.lockss.scomm.sslClientAuth", true);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setInstanceConfig(Properties properties) {
            this.instanceConfig = properties;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isSsl() {
            return this.isSsl;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isClientAuth() {
            return this.isClientAuth;
        }

        protected String getStatusAccessorName(String str) {
            return str + this.uniqueId;
        }

        BlockingStreamComm.SocketFactory getSocketFactory() {
            if (this.sockFact == null) {
                this.superSockFact = super.getSocketFactory();
                this.sockFact = new MySocketFactory(this.superSockFact);
            }
            return this.sockFact;
        }

        protected PeerIdentity getLocalPeerIdentity() {
            return this.localId;
        }

        void associateChannelWithPeer(BlockingPeerChannel blockingPeerChannel, PeerIdentity peerIdentity) {
            super.associateChannelWithPeer(blockingPeerChannel, peerIdentity);
            if (this.assocEvents != null) {
                this.assocEvents.put(ListUtil.list(new Object[]{"assoc", this}));
            }
        }

        void dissociateChannelFromPeer(BlockingPeerChannel blockingPeerChannel, PeerIdentity peerIdentity, Queue queue) {
            super.dissociateChannelFromPeer(blockingPeerChannel, peerIdentity, queue);
            if (this.assocEvents != null) {
                this.assocEvents.put(ListUtil.list(new Object[]{"dissoc", this}));
            }
        }

        void processIncomingConnection(Socket socket) throws IOException {
            if (this.acceptSem != null) {
                this.acceptSem.take();
            }
            super.processIncomingConnection(socket);
        }

        protected void handshake(SSLSocket sSLSocket) throws SSLPeerUnverifiedException {
            try {
                super.handshake(sSLSocket);
                if (this.handshakeEvents != null) {
                    this.handshakeEvents.put(sSLSocket);
                }
            } catch (SSLPeerUnverifiedException e) {
                if (this.handshakeEvents != null) {
                    this.handshakeEvents.put(e);
                }
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setHandShakeQueue(SimpleQueue simpleQueue) {
            this.handshakeEvents = simpleQueue;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setAssocQueue(SimpleQueue simpleQueue) {
            this.assocEvents = simpleQueue;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setAcceptSem(SimpleBinarySemaphore simpleBinarySemaphore) {
            this.acceptSem = simpleBinarySemaphore;
        }

        void setChannelIdleTime(long j) {
            this.paramChannelIdleTime = j;
        }

        BlockingStreamComm.PeerData makePeerData(PeerIdentity peerIdentity) {
            return new BlockingStreamComm.PeerData(this, peerIdentity);
        }

        void addPeerToRetry(BlockingStreamComm.PeerData peerData) {
            this.peersToRetry.add(peerData);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$MyMemoryPeerMessage.class */
    public static class MyMemoryPeerMessage extends MemoryPeerMessage {
        boolean isDeleted = false;

        MyMemoryPeerMessage() {
        }

        public void delete() {
            this.isDeleted = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$MySocketFactory.class */
    public class MySocketFactory implements BlockingStreamComm.SocketFactory {
        protected BlockingStreamComm.SocketFactory sf;
        IOException newSocketThrow = null;

        MySocketFactory(BlockingStreamComm.SocketFactory socketFactory) {
            this.sf = socketFactory;
        }

        public ServerSocket newServerSocket(String str, int i, int i2) throws IOException {
            if (TestBlockingStreamComm.this.useInternalSockets) {
                return new InternalServerSocket(i, i2);
            }
            ServerSocket newServerSocket = this.sf.newServerSocket(str, i, i2);
            if (TestBlockingStreamComm.this.isCheckSocketType()) {
                if (TestBlockingStreamComm.this.isSsl()) {
                    LockssTestCase.assertClass(SSLServerSocket.class, newServerSocket);
                    TestCase.assertFalse(ListUtil.fromArray(((SSLServerSocket) newServerSocket).getEnabledProtocols()).contains(TestBlockingStreamComm.BAD_PROTO));
                } else {
                    LockssTestCase.assertNotClass(SSLServerSocket.class, newServerSocket);
                }
            }
            return newServerSocket;
        }

        public Socket newSocket(IPAddr iPAddr, int i) throws IOException {
            if (this.newSocketThrow != null) {
                throw this.newSocketThrow;
            }
            if (TestBlockingStreamComm.this.useInternalSockets) {
                return new InternalSocket(iPAddr.getInetAddr(), i);
            }
            Socket newSocket = this.sf.newSocket(iPAddr, i);
            if (TestBlockingStreamComm.this.isCheckSocketType()) {
                if (TestBlockingStreamComm.this.isSsl()) {
                    TestCase.assertTrue(newSocket instanceof SSLSocket);
                    TestCase.assertFalse(ListUtil.fromArray(((SSLSocket) newSocket).getEnabledProtocols()).contains(TestBlockingStreamComm.BAD_PROTO));
                } else {
                    TestCase.assertFalse(newSocket instanceof SSLSocket);
                }
            }
            return newSocket;
        }

        public BlockingPeerChannel newPeerChannel(BlockingStreamComm blockingStreamComm, Socket socket) throws IOException {
            return new MyBlockingPeerChannel(blockingStreamComm, socket);
        }

        public BlockingPeerChannel newPeerChannel(BlockingStreamComm blockingStreamComm, PeerIdentity peerIdentity) throws IOException {
            return new MyBlockingPeerChannel(blockingStreamComm, peerIdentity);
        }

        void setNewSocketThrow(IOException iOException) {
            this.newSocketThrow = iOException;
        }
    }

    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$UnBuff.class */
    public static class UnBuff extends TestBlockingStreamComm {
        public UnBuff(String str) {
            super(str);
        }

        @Override // org.lockss.protocol.TestBlockingStreamComm
        public void addSuiteProps(Properties properties) {
            properties.setProperty("org.lockss.scomm.bufferedSend", "false");
            properties.setProperty("org.lockss.scomm.tcpNoDelay", "false");
        }
    }

    /* loaded from: input_file:org/lockss/protocol/TestBlockingStreamComm$UnBuffNoDelay.class */
    public static class UnBuffNoDelay extends TestBlockingStreamComm {
        public UnBuffNoDelay(String str) {
            super(str);
        }

        @Override // org.lockss.protocol.TestBlockingStreamComm
        public void addSuiteProps(Properties properties) {
            properties.setProperty("org.lockss.scomm.bufferedSend", "false");
            properties.setProperty("org.lockss.scomm.tcpNoDelay", "true");
        }

        public void testIsNoDelay() throws IOException {
            testSockOpts(true, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestBlockingStreamComm(String str) {
        super(str);
        this.testports = new int[5];
        this.pids = new PeerIdentity[5];
        this.pads = new PeerAddress.Tcp[5];
        this.comms = new MyBlockingStreamComm[5];
        this.rcvdMsgss = new SimpleQueue[5];
        this.testStr1 = "This is test data 1";
        this.testStr2 = "This message contains a null �� character";
        this.testStr3 = "They that can give up essential liberty to obtain a little temporary safety deserve neither liberty nor safety.";
        this.rcvHeader = new byte[HEADER_LEN];
        this.peerbuf = new byte[50];
        this.useInternalSockets = false;
        this.shutdownOutputSupported = true;
        this.isCheckSocketType = true;
    }

    protected boolean isSsl() {
        return false;
    }

    protected boolean isCheckSocketType() {
        return this.isCheckSocketType;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSuiteProps(Properties properties) {
    }

    @Override // org.lockss.test.LockssTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.sem1 = new SimpleBinarySemaphore();
        this.sem2 = new SimpleBinarySemaphore();
        this.assocQ = new SimpleQueue.Fifo();
        this.daemon = getMockLockssDaemon();
        String str = HEADER_OFF_CHECK;
        try {
            str = getTempDir().getAbsolutePath() + File.separator;
        } catch (IOException e) {
            fail("unable to create a temporary directory");
        }
        this.cprops = new Properties();
        addSuiteProps(this.cprops);
        this.cprops.setProperty("org.lockss.id.database.dir", str + "iddb");
        this.cprops.setProperty("org.lockss.localIPAddress", "127.0.0.1");
        this.cprops.setProperty("org.lockss.scomm.enabled", "true");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        this.daemon.setDaemonInited(true);
        setupMessages();
        useInternalSockets(false);
    }

    @Override // org.lockss.test.LockssTestCase
    public void tearDown() throws Exception {
        for (int i = HEADER_OFF_CHECK; i < 5; i++) {
            LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
            try {
                interrupter = interruptMeIn(Math.max(2002L, TIMEOUT_SHOULDNT));
                stopComm(i);
                Thread.interrupted();
                if (interrupter.did()) {
                    log.warning("Timeout waiting for comm " + i + " to exit");
                }
                if (interrupter != null) {
                    interrupter.cancel();
                }
            } catch (Throwable th) {
                if (interrupter != null) {
                    interrupter.cancel();
                }
                throw th;
            }
        }
        TimeBase.setReal();
        super.tearDown();
    }

    void stopComm(int i) {
        if (this.comms[i] != null) {
            this.comms[i].stopService();
            this.comms[i] = null;
        }
    }

    BlockingPeerChannel getChannel(BlockingStreamComm blockingStreamComm, PeerIdentity peerIdentity) {
        BlockingStreamComm.PeerData peerData = blockingStreamComm.getPeerData(peerIdentity);
        if (peerData == null) {
            return null;
        }
        return peerData.getPrimaryChannel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<BlockingPeerChannel> getChannels(BlockingStreamComm blockingStreamComm) {
        ArrayList arrayList = new ArrayList();
        Iterator it = blockingStreamComm.getAllPeerData().iterator();
        while (it.hasNext()) {
            BlockingPeerChannel primaryChannel = ((BlockingStreamComm.PeerData) it.next()).getPrimaryChannel();
            if (primaryChannel != null) {
                arrayList.add(primaryChannel);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<BlockingPeerChannel> getRcvChannels(BlockingStreamComm blockingStreamComm) {
        ArrayList arrayList = new ArrayList();
        Iterator it = blockingStreamComm.getAllPeerData().iterator();
        while (it.hasNext()) {
            BlockingPeerChannel secondaryChannel = ((BlockingStreamComm.PeerData) it.next()).getSecondaryChannel();
            if (secondaryChannel != null) {
                arrayList.add(secondaryChannel);
            }
        }
        return arrayList;
    }

    void useInternalSockets(boolean z) {
        this.useInternalSockets = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PeerIdentity setupPid(int i) throws IOException {
        if (this.useInternalSockets) {
            setupInternalPid(i);
        } else {
            setupRealPid(i);
        }
        return this.pids[i];
    }

    void setupRealPid(int i) throws IOException {
        if (this.pids[i] == null) {
            this.testports[i] = TcpTestUtil.findUnboundTcpPort();
            this.pids[i] = findPeerId("127.0.0.1", this.testports[i]);
            this.pads[i] = (PeerAddress.Tcp) this.pids[i].getPeerAddress();
            peerhack(i);
        }
    }

    PeerIdentity findPeerId(String str, int i) throws IdentityManager.MalformedIdentityKeyException {
        return V3TestUtils.findPeerIdentity(this.daemon, IDUtil.ipAddrToKey(str, i));
    }

    void setupInternalPid(int i) throws IOException {
        if (this.pids[i] == null) {
            String hostAddress = InternalSocket.internalInetAddr.getHostAddress();
            this.testports[i] = InternalServerSocket.findUnboundPort(2000);
            this.pids[i] = findPeerId(hostAddress, this.testports[i]);
            this.pads[i] = (PeerAddress.Tcp) this.pids[i].getPeerAddress();
            peerhack(i);
        }
    }

    void peerhack(int i) {
        switch (i) {
            case 1:
                this.testport1 = this.testports[i];
                this.pid1 = this.pids[i];
                this.pad1 = this.pads[i];
                return;
            case 2:
                this.testport2 = this.testports[i];
                this.pid2 = this.pids[i];
                this.pad2 = this.pads[i];
                return;
            default:
                return;
        }
    }

    void setupComm(int i) throws IOException {
        if (this.pids[i] == null) {
            setupPid(i);
        }
        setupComm(i, new MyBlockingStreamComm(this.pids[i]));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupComm(int i, MyBlockingStreamComm myBlockingStreamComm) throws IOException {
        this.comms[i] = myBlockingStreamComm;
        this.comms[i].initService(this.daemon);
        this.comms[i].startService();
        this.rcvdMsgss[i] = new SimpleQueue.Fifo();
        for (int i2 = 1; i2 <= 3; i2++) {
            this.comms[i].registerMessageHandler(i2, new MessageHandler(this.rcvdMsgss[i]));
        }
        commhack(i);
    }

    void commhack(int i) {
        switch (i) {
            case 1:
                this.comm1 = this.comms[i];
                this.rcvdMsgs1 = this.rcvdMsgss[i];
                return;
            case 2:
                this.comm2 = this.comms[i];
                this.rcvdMsgs2 = this.rcvdMsgss[i];
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupComm1() throws IOException {
        setupComm(1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupComm2() throws IOException {
        setupComm(2);
    }

    void setupMessages() throws IOException {
        this.msg1 = makePeerMessage(1, this.testStr1);
        this.msg2 = makePeerMessage(2, this.testStr2);
        this.msg3 = makePeerMessage(3, this.testStr3);
    }

    PeerMessage makePeerMessage(int i) throws IOException {
        MyMemoryPeerMessage myMemoryPeerMessage = new MyMemoryPeerMessage();
        myMemoryPeerMessage.setProtocol(i);
        return myMemoryPeerMessage;
    }

    PeerMessage makePeerMessage(int i, String str) throws IOException {
        return makePeerMessage(i, str.getBytes());
    }

    PeerMessage makePeerMessage(int i, byte[] bArr) throws IOException {
        PeerMessage makePeerMessage = makePeerMessage(i);
        OutputStream outputStream = makePeerMessage.getOutputStream();
        outputStream.write(bArr);
        outputStream.close();
        return makePeerMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PeerMessage makePeerMessage(int i, String str, int i2) throws IOException {
        PeerMessage makePeerMessage = makePeerMessage(i);
        byte[] bytes = str.getBytes();
        OutputStream outputStream = makePeerMessage.getOutputStream();
        for (int i3 = i2; i3 > 0; i3 += HEADER_CHECK) {
            outputStream.write(bytes);
        }
        outputStream.close();
        return makePeerMessage;
    }

    void writePeerId(OutputStream outputStream, PeerIdentity peerIdentity) throws IOException {
        writePeerId(outputStream, peerIdentity.getIdString());
    }

    void writePeerId(OutputStream outputStream, String str) throws IOException {
        writeHeader(outputStream, 1, str.length(), HEADER_OFF_CHECK);
        outputStream.write(str.getBytes());
        outputStream.flush();
    }

    void writeHeader(OutputStream outputStream, int i, int i2, int i3) throws IOException {
        byte[] bArr = new byte[HEADER_LEN];
        bArr[HEADER_OFF_CHECK] = HEADER_CHECK;
        bArr[1] = (byte) i;
        ByteArray.encodeLong(i2, bArr, 2);
        ByteArray.encodeInt(i3, bArr, HEADER_OFF_PROTO);
        outputStream.write(bArr);
    }

    public void assertRcvHeader(InputStream inputStream, int i) throws IOException {
        StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
        assertHeaderOp(this.rcvHeader, i);
    }

    String rcvMsgData(InputStream inputStream) throws IOException {
        long decodeLong = ByteArray.decodeLong(this.rcvHeader, 2);
        assertTrue("len: " + decodeLong, decodeLong <= 2147483647L);
        StreamUtil.readBytes(inputStream, this.peerbuf, (int) decodeLong);
        return new String(this.peerbuf, HEADER_OFF_CHECK, (int) decodeLong);
    }

    public void assertHeader(byte[] bArr) {
        assertEquals(HEADER_CHECK, bArr[HEADER_OFF_CHECK]);
    }

    public void assertHeaderOp(byte[] bArr, int i) {
        assertHeader(bArr);
        assertEquals(i, bArr[1]);
    }

    public static void assertEqualsButSender(PeerMessage peerMessage, PeerMessage peerMessage2) {
        if (peerMessage != peerMessage2) {
            if (peerMessage == null || !peerMessage.equalsButSender(peerMessage2)) {
                failNotEquals((String) null, peerMessage, peerMessage2);
            }
        }
    }

    public void assertEqualsMessageFrom(PeerMessage peerMessage, PeerIdentity peerIdentity, PeerMessage peerMessage2) {
        assertNotNull("Null message", peerMessage2);
        assertEquals(peerIdentity, peerMessage2.getSender());
        assertEqualsButSender(peerMessage, peerMessage2);
    }

    public void testIsTrusted() throws IOException {
        setupComm1();
        if (isSsl()) {
            assertTrue(this.comm1.isTrustedNetwork());
        } else {
            assertFalse(this.comm1.isTrustedNetwork());
        }
    }

    public void testStateTrans() throws IOException {
        setupComm1();
        BlockingPeerChannel blockingPeerChannel = new BlockingPeerChannel(this.comm1, this.pid1, (InputStream) null, (OutputStream) null);
        assertEquals(BlockingPeerChannel.ChannelState.INIT, blockingPeerChannel.getState());
        assertFalse(blockingPeerChannel.stateTrans(BlockingPeerChannel.ChannelState.CONNECTING, BlockingPeerChannel.ChannelState.CONNECT_FAIL));
        assertEquals(BlockingPeerChannel.ChannelState.INIT, blockingPeerChannel.getState());
        assertTrue(blockingPeerChannel.stateTrans(BlockingPeerChannel.ChannelState.INIT, BlockingPeerChannel.ChannelState.ACCEPTED));
        assertEquals(BlockingPeerChannel.ChannelState.ACCEPTED, blockingPeerChannel.getState());
        assertTrue(blockingPeerChannel.stateTrans(BlockingPeerChannel.ChannelState.ACCEPTED, BlockingPeerChannel.ChannelState.STARTING, "shouldn't"));
        assertEquals(BlockingPeerChannel.ChannelState.STARTING, blockingPeerChannel.getState());
        try {
            blockingPeerChannel.stateTrans(BlockingPeerChannel.ChannelState.ACCEPTED, BlockingPeerChannel.ChannelState.STARTING, "should error");
            fail("stateTrans should have thrown");
        } catch (IllegalStateException e) {
        }
    }

    public void testNotStateTrans() throws IOException {
        setupComm1();
        BlockingPeerChannel blockingPeerChannel = new BlockingPeerChannel(this.comm1, this.pid1, (InputStream) null, (OutputStream) null);
        assertEquals(BlockingPeerChannel.ChannelState.INIT, blockingPeerChannel.getState());
        assertFalse(blockingPeerChannel.notStateTrans(BlockingPeerChannel.ChannelState.INIT, BlockingPeerChannel.ChannelState.CONNECT_FAIL));
        assertEquals(BlockingPeerChannel.ChannelState.INIT, blockingPeerChannel.getState());
        assertTrue(blockingPeerChannel.notStateTrans(BlockingPeerChannel.ChannelState.CONNECTING, BlockingPeerChannel.ChannelState.ACCEPTED));
        assertEquals(BlockingPeerChannel.ChannelState.ACCEPTED, blockingPeerChannel.getState());
        assertTrue(blockingPeerChannel.notStateTrans(BlockingPeerChannel.ChannelState.CONNECT_FAIL, BlockingPeerChannel.ChannelState.OPEN, "shouldn't"));
        assertEquals(BlockingPeerChannel.ChannelState.OPEN, blockingPeerChannel.getState());
        try {
            blockingPeerChannel.notStateTrans(BlockingPeerChannel.ChannelState.OPEN, BlockingPeerChannel.ChannelState.STARTING, "should error");
            fail("notStateTrans should have thrown");
        } catch (IllegalStateException e) {
        }
        assertEquals(BlockingPeerChannel.ChannelState.OPEN, blockingPeerChannel.getState());
        BlockingPeerChannel.ChannelState[] channelStateArr = {BlockingPeerChannel.ChannelState.CONNECT_FAIL, BlockingPeerChannel.ChannelState.STARTING, BlockingPeerChannel.ChannelState.DRAIN_INPUT};
        assertTrue(blockingPeerChannel.notStateTrans(channelStateArr, BlockingPeerChannel.ChannelState.ACCEPTED, "shouldn't"));
        assertTrue(blockingPeerChannel.notStateTrans(channelStateArr, BlockingPeerChannel.ChannelState.STARTING, "shouldn't"));
        assertFalse(blockingPeerChannel.notStateTrans(channelStateArr, BlockingPeerChannel.ChannelState.CLOSING));
        assertEquals(BlockingPeerChannel.ChannelState.STARTING, blockingPeerChannel.getState());
    }

    public void testReadHeader() throws IOException {
        setupComm1();
        byte[] bArr = {HEADER_CHECK, 1, 1, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 5};
        assertEquals(HEADER_LEN, bArr.length);
        byte[] bArr2 = new byte[HEADER_LEN];
        BlockingPeerChannel blockingPeerChannel = new BlockingPeerChannel(this.comm1, this.pid1, new ByteArrayInputStream(bArr), (OutputStream) null);
        long lastActiveTime = blockingPeerChannel.getLastActiveTime();
        assertTrue(blockingPeerChannel.readHeader());
        assertNotEquals(lastActiveTime, blockingPeerChannel.getLastActiveTime());
        bArr[HEADER_OFF_CHECK] = 42;
        try {
            new BlockingPeerChannel(this.comm1, this.pid1, new ByteArrayInputStream(bArr), (OutputStream) null).readHeader();
            fail("readHeader() of illegal header should throw");
        } catch (ProtocolException e) {
        }
        assertFalse(new BlockingPeerChannel(this.comm1, this.pid1, new StringInputStream(TestBaseCrawler.EMPTY_PAGE), (OutputStream) null).readHeader());
        try {
            new BlockingPeerChannel(this.comm1, this.pid1, new StringInputStream("\u007f"), (OutputStream) null).readHeader();
            fail("readHeader() of partial header should throw");
        } catch (ProtocolException e2) {
        }
    }

    public void testWriteHeader() throws IOException {
        setupComm1();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new BlockingPeerChannel(this.comm1, this.pid1, (InputStream) null, byteArrayOutputStream).writeHeader(5, 16L, 34);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        assertEquals(HEADER_LEN, byteArray.length);
        assertEquals(255, ByteArray.decodeByte(byteArray, HEADER_OFF_CHECK));
        assertEquals(5, byteArray[1]);
        assertEquals(16L, ByteArray.decodeLong(byteArray, 2));
        assertEquals(34, ByteArray.decodeInt(byteArray, HEADER_OFF_PROTO));
    }

    public void testIllSend() throws IOException {
        setupComm1();
        try {
            this.comm1.sendTo(null, this.pid2);
            fail("Null message should throw");
        } catch (NullPointerException e) {
        }
        try {
            this.comm1.sendTo(this.msg1, null);
            fail("Null peer should throw");
        } catch (NullPointerException e2) {
        }
    }

    public void testRefused() throws IOException {
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEmpty(getChannels(this.comm1));
        assertEmpty(getRcvChannels(this.comm1));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertNotNull(peerData);
        assertFalse(peerData.isRetryNeeded());
    }

    public void testRefusedRetry() throws IOException {
        TimeBase.setSimulated(10000L);
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "0");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.msg1.setExpiration(11000L);
        this.comm1.sendTo(this.msg1, this.pid2);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEmpty(getChannels(this.comm1));
        assertEmpty(getRcvChannels(this.comm1));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertNotNull(peerData);
        assertTrue(peerData.isRetryNeeded());
        assertEquals(1, peerData.sendQueue.size());
    }

    public void testSSLFailNoRetry() throws IOException {
        TimeBase.setSimulated(10000L);
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "0");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        ((MySocketFactory) this.comm1.getSocketFactory()).setNewSocketThrow(new SSLPeerUnverifiedException("Fake SSL exception"));
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.msg1.setExpiration(11000L);
        this.comm1.sendTo(this.msg1, this.pid2);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEmpty(getChannels(this.comm1));
        assertEmpty(getRcvChannels(this.comm1));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertNotNull(peerData);
        assertFalse(peerData.isRetryNeeded());
        assertNull(peerData.sendQueue);
    }

    public void testIncoming() throws IOException {
        setupComm1();
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            interrupter = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(this.pad1.getIPAddr(), this.pad1.getPort());
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    private boolean isAltAddrUsable() {
        int findUnboundTcpPort = TcpTestUtil.findUnboundTcpPort();
        try {
            new ServerSocket(findUnboundTcpPort);
            Socket socket = new Socket(InetAddress.getByName(ALT_IP_ADDR), findUnboundTcpPort);
            log.debug2("Socket opened to " + socket.getInetAddress());
            socket.close();
            return true;
        } catch (IOException e) {
            log.debug2("No IPv6 support", e);
            return false;
        }
    }

    private void logSkipped(String str) {
        log.warning("2nd local address (::1) not supported, skipping " + str);
    }

    public void testIncomingToAlternateAddress() throws IOException {
        if (!isAltAddrUsable()) {
            logSkipped("testIncomingToAlternateAddress");
            return;
        }
        setupComm1();
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            interrupter = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(IPAddr.getByName(ALT_IP_ADDR), this.pad1.getPort());
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    public void testIncomingBindLocalOnlySameIP() throws IOException {
        ConfigurationUtil.addFromArgs("org.lockss.scomm.bindToLocalIpOnly", "true");
        setupComm1();
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            interrupter = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(this.pad1.getIPAddr(), this.pad1.getPort());
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    public void testIncomingBindLocalOnlyWrongIP() throws IOException {
        if (!isAltAddrUsable()) {
            logSkipped("testIncomingBindLocalOnlyWrongIP");
            return;
        }
        ConfigurationUtil.addFromArgs("org.lockss.scomm.bindToLocalIpOnly", "true", "org.lockss.scomm.sendFromBindAddr", "false");
        setupComm1();
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        try {
            interrupter = interruptMeIn(TIMEOUT_SHOULD);
            this.comm1.getSocketFactory().newSocket(IPAddr.getByName(ALT_IP_ADDR), this.pad1.getPort());
            fail("connect to different IP then listen IP should fail");
            if (interrupter != null) {
                interrupter.cancel();
            }
        } catch (ConnectException e) {
            if (interrupter != null) {
                interrupter.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            throw th;
        }
    }

    public void testIncomingRcvPeerId(String str, boolean z) throws IOException {
        log.debug("Incoming rcv pid " + str);
        setupComm1();
        LockssTestCase.DoLater doLater = null;
        LockssTestCase.DoLater doLater2 = null;
        try {
            LockssTestCase.Interrupter interruptMeIn = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(this.pad1.getIPAddr(), this.pad1.getPort());
            LockssTestCase.SockAbort abortIn = abortIn(TIMEOUT_SHOULDNT, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            OutputStream outputStream = newSocket.getOutputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            this.comm1.setAssocQueue(this.assocQ);
            writePeerId(outputStream, str);
            if (z) {
                List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
                assertNotNull("Connecting channel didn't associate", list);
                assertEquals("Connecting channel didn't associate", "assoc", list.get(HEADER_OFF_CHECK));
                assertEquals(1, getChannels(this.comm1).size());
                assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            } else {
                List list2 = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
                assertNotNull("Connecting channel didn't dissociate", list2);
                assertEquals("Connecting channel didn't dissociate", "dissoc", list2.get(HEADER_OFF_CHECK));
                assertEquals(HEADER_OFF_CHECK, getChannels(this.comm1).size());
                assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            }
            if (interruptMeIn != null) {
                interruptMeIn.cancel();
            }
            if (abortIn != null) {
                abortIn.cancel();
            }
        } catch (Throwable th) {
            if (HEADER_OFF_CHECK != 0) {
                doLater.cancel();
            }
            if (HEADER_OFF_CHECK != 0) {
                doLater2.cancel();
            }
            throw th;
        }
    }

    public void testIncomingRcvGoodPeerId1() throws IOException {
        setupPid(2);
        testIncomingRcvPeerId(this.pid2.getIdString(), true);
    }

    public void testIncomingRcvBadPeerId1() throws IOException {
        testIncomingRcvPeerId("tcp:[127.0.0.1]:65541", false);
    }

    public void testIncomingRcvBadPeerId2() throws IOException {
        testIncomingRcvPeerId("127.0.0.1", false);
    }

    public void testOriginate() throws IOException {
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort2 = HEADER_OFF_CHECK;
        try {
            setupComm1();
            setupPid(2);
            ServerSocket newServerSocket = this.comm1.getSocketFactory().newServerSocket((String) null, this.pad2.getPort(), 3);
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newServerSocket);
            this.comm1.findOrMakeChannel(this.pid2);
            Socket accept = newServerSocket.accept();
            InputStream inputStream = accept.getInputStream();
            sockAbort2 = abortIn(TIMEOUT_SHOULDNT, accept);
            assertRcvHeader(inputStream, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            IOUtil.safeClose(newServerSocket);
            IOUtil.safeClose(accept);
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            if (sockAbort2 != null) {
                sockAbort2.cancel();
            }
        } catch (Throwable th) {
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            if (sockAbort2 != null) {
                sockAbort2.cancel();
            }
            throw th;
        }
    }

    public void testOriginateRcvPeerId(String str, boolean z) throws IOException {
        log.debug("Orig, send pid " + str);
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            setupComm1();
            this.comm1.setAssocQueue(this.assocQ);
            setupPid(2);
            log.debug2("Listening on " + this.pad2.getPort());
            ServerSocket newServerSocket = this.comm1.getSocketFactory().newServerSocket((String) null, this.pad2.getPort(), 3);
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newServerSocket);
            this.comm1.findOrMakeChannel(this.pid2);
            Socket accept = newServerSocket.accept();
            InputStream inputStream = accept.getInputStream();
            OutputStream outputStream = accept.getOutputStream();
            assertRcvHeader(inputStream, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            writePeerId(outputStream, str);
            if (z) {
                assertNull("Connecting channel shouldn't call associate", this.assocQ.get(TIMEOUT_SHOULD));
                assertEquals(1, getChannels(this.comm1).size());
                assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            } else {
                List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
                assertNotNull("Connecting channel didn't dissociate", list);
                assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
                assertEquals(HEADER_OFF_CHECK, getChannels(this.comm1).size());
                assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            }
            IOUtil.safeClose(newServerSocket);
            IOUtil.safeClose(accept);
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    public void testOriginateRcvGoodPeerId1() throws IOException {
        setupPid(2);
        testOriginateRcvPeerId(this.pid2.getIdString(), true);
    }

    public void testOriginateRcvBadPeerId1() throws IOException {
        testOriginateRcvPeerId("tcp:[127.0.0.1]:65542", false);
    }

    public void testOriginateRcvBadPeerId2() throws IOException {
        testOriginateRcvPeerId("127.0.0.1", false);
    }

    public void XXXtestHangingClose() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.setProperty("org.lockss.scomm.channelIdleTime", "5000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupPid(2);
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            interrupter = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(this.pad1.getIPAddr(), this.pad1.getPort());
            sockAbort = abortIn(TIMEOUT_SHOULDNT, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            OutputStream outputStream = newSocket.getOutputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            this.comm1.setAssocQueue(this.assocQ);
            writePeerId(outputStream, this.pid2);
            List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
            assertNotNull("Channel didn't assoc", list);
            assertEquals("Channel didn't assoc", "assoc", list.get(HEADER_OFF_CHECK));
            assertEquals(1, getChannels(this.comm1).size());
            this.msg1 = makePeerMessage(1, "1234567890123456789012345678901234567890", 100);
            long dataSize = this.msg1.getDataSize();
            for (long j = 0; j < 20 * (newSocket.getReceiveBufferSize() + newSocket.getSendBufferSize()); j += dataSize) {
            }
            assertEquals(1, getChannels(this.comm1).size());
            assertNotNull("Didn't find expected channel", getChannel(this.comm1, this.pid2));
            assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            TimeBase.step(6000L);
            assertEquals(HEADER_OFF_CHECK, inputStream.available());
            List list2 = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
            assertNotNull("Channel didn't close automatically after timeout", list2);
            assertEquals("Channel didn't close automatically after timeout", "dissoc", list2.get(HEADER_OFF_CHECK));
            assertFalse(interrupter.did());
            assertFalse(sockAbort.did());
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    public void testHangingSend() throws IOException {
        List list;
        LockssTestCase.Interrupter interrupter = HEADER_OFF_CHECK;
        LockssTestCase.SockAbort sockAbort = HEADER_OFF_CHECK;
        try {
            TimeBase.setSimulated(1000L);
            this.cprops.setProperty("org.lockss.scomm.channelIdleTime", "5000");
            ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
            setupComm1();
            setupPid(2);
            interrupter = interruptMeIn(TIMEOUT_SHOULDNT);
            Socket newSocket = this.comm1.getSocketFactory().newSocket(this.pad1.getIPAddr(), this.pad1.getPort());
            sockAbort = abortIn(TIMEOUT_SHOULDNT * 2, newSocket);
            InputStream inputStream = newSocket.getInputStream();
            OutputStream outputStream = newSocket.getOutputStream();
            StreamUtil.readBytes(inputStream, this.rcvHeader, HEADER_LEN);
            assertHeaderOp(this.rcvHeader, 1);
            assertEquals(this.pid1.getIdString(), rcvMsgData(inputStream));
            this.comm1.setAssocQueue(this.assocQ);
            writePeerId(outputStream, this.pid2);
            List list2 = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
            assertNotNull("Channel didn't assoc", list2);
            assertEquals("Channel didn't assoc", "assoc", list2.get(HEADER_OFF_CHECK));
            assertEquals(1, getChannels(this.comm1).size());
            this.msg1 = makePeerMessage(1, "123456789012345678901234567890", 1000);
            long dataSize = this.msg1.getDataSize();
            int receiveBufferSize = 200 * (newSocket.getReceiveBufferSize() + newSocket.getSendBufferSize());
            for (long j = 0; j < receiveBufferSize; j += dataSize) {
                this.comm1.sendTo(this.msg1, this.pid2);
            }
            assertEquals(1, getChannels(this.comm1).size());
            BlockingPeerChannel channel = getChannel(this.comm1, this.pid2);
            assertNotNull("Didn't find expected channel", channel);
            assertFalse("Send queue shouldn't be empty", channel.isSendIdle());
            assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
            while (true) {
                list = (List) this.assocQ.get(10L);
                if (HEADER_OFF_CHECK != list) {
                    break;
                } else {
                    TimeBase.step(6000L);
                }
            }
            assertNotNull("Channel didn't close automatically after timeout", list);
            assertEquals("Channel didn't close automatically after timeout", "dissoc", list.get(HEADER_OFF_CHECK));
            assertFalse(interrupter.did());
            assertFalse(sockAbort.did());
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
        } catch (Throwable th) {
            if (interrupter != null) {
                interrupter.cancel();
            }
            if (sockAbort != null) {
                sockAbort.cancel();
            }
            throw th;
        }
    }

    public void testFileMessage() throws IOException {
        this.cprops.setProperty("org.lockss.scomm.minFileMessageSize", "1000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", 100);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm1.sendTo(this.msg2, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        try {
            assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
            assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
            peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
            assertEqualsMessageFrom(this.msg2, this.pid1, peerMessage);
            assertTrue(peerMessage.toString(), peerMessage instanceof FilePeerMessage);
            peerMessage.delete();
        } catch (Throwable th) {
            peerMessage.delete();
            throw th;
        }
    }

    public void testMemMessage() throws IOException {
        this.cprops.setProperty("org.lockss.scomm.minFileMessageSize", "5000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", HEADER_OFF_PROTO);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm1.sendTo(this.msg2, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        try {
            assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
            assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
            peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
            assertEqualsMessageFrom(this.msg2, this.pid1, peerMessage);
            assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
            peerMessage.delete();
        } catch (Throwable th) {
            peerMessage.delete();
            throw th;
        }
    }

    public void testTooLargeMsg() throws IOException {
        this.cprops.setProperty("org.lockss.scomm.maxMessageSize", "2000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.comm2.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", 100);
        assertEqualsMessageFrom(this.msg1, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Channel didn't associate", list);
        assertEquals("Channel didn't associate", "assoc", list.get(HEADER_OFF_CHECK));
        this.comm1.sendTo(this.msg2, this.pid2);
        List list2 = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Channel didn't close on too-large message", list2);
        assertEquals("Channel didn't close automatically after timeout", "dissoc", list2.get(HEADER_OFF_CHECK));
    }

    public void testWaitingPeerTreeSet() throws IOException {
        setupPid(1);
        setupPid(2);
        PeerIdentity peerIdentity = setupPid(3);
        PeerIdentity peerIdentity2 = setupPid(4);
        setupComm1();
        BlockingStreamComm.PeerData makePeerData = this.comm1.makePeerData(this.pid1);
        BlockingStreamComm.PeerData makePeerData2 = this.comm1.makePeerData(this.pid2);
        BlockingStreamComm.PeerData makePeerData3 = this.comm1.makePeerData(peerIdentity);
        BlockingStreamComm.PeerData makePeerData4 = this.comm1.makePeerData(peerIdentity2);
        makePeerData.nextRetry = 1000L;
        makePeerData2.nextRetry = 100L;
        makePeerData3.nextRetry = 500L;
        makePeerData4.nextRetry = 500L;
        this.comm1.addPeerToRetry(makePeerData);
        assertIsomorphic(ListUtil.list(new BlockingStreamComm.PeerData[]{makePeerData}), this.comm1.peersToRetry);
        this.comm1.addPeerToRetry(makePeerData2);
        assertIsomorphic(ListUtil.list(new BlockingStreamComm.PeerData[]{makePeerData2, makePeerData}), this.comm1.peersToRetry);
        this.comm1.addPeerToRetry(makePeerData2);
        assertIsomorphic(ListUtil.list(new BlockingStreamComm.PeerData[]{makePeerData2, makePeerData}), this.comm1.peersToRetry);
        this.comm1.addPeerToRetry(makePeerData3);
        assertIsomorphic(ListUtil.list(new BlockingStreamComm.PeerData[]{makePeerData2, makePeerData3, makePeerData}), this.comm1.peersToRetry);
        this.comm1.addPeerToRetry(makePeerData4);
        assertEquals(4, this.comm1.peersToRetry.size());
        assertSame(makePeerData2, this.comm1.peersToRetry.first());
    }

    public void testRetry1() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.put("org.lockss.scomm.minFileMessageSize", "5000");
        this.cprops.put("org.lockss.scomm.maxPeerRetryInterval", "1000");
        this.cprops.put("org.lockss.scomm.minPeerRetryInterval", "10");
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "100");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.msg1.setExpiration(3000L);
        assertEquals(HEADER_OFF_CHECK, this.msg1.getRetryCount());
        this.comm1.sendTo(this.msg1, this.pid2);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertNotNull(peerData);
        assertNull(peerData.getPrimaryChannel());
        assertNull(peerData.getSecondaryChannel());
        assertTrue(peerData.isRetryNeeded());
        assertEquals(1, this.msg1.getRetryCount());
        setupComm2();
        TimeBase.step(800L);
        assertTrue(peerData.isRetryNeeded());
        TimeBase.step(300L);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
        assertEquals(1, this.msg1.getRetryCount());
    }

    public void testRetry2() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.put("org.lockss.scomm.minFileMessageSize", "5000");
        this.cprops.put("org.lockss.scomm.maxPeerRetryInterval", "1000");
        this.cprops.put("org.lockss.scomm.minPeerRetryInterval", "10");
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "100");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", 20);
        this.msg1.setExpiration(3000L);
        this.msg2.setExpiration(4000L);
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        int i = peerData.lastSendRpt;
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        assertNotNull(peerData);
        assertNull(peerData.getPrimaryChannel());
        assertNull(peerData.getSecondaryChannel());
        assertEquals(i, peerData.getOrigCnt());
        assertEquals(i, peerData.getFailCnt());
        assertEquals(HEADER_OFF_CHECK, peerData.getAcceptCnt());
        assertTrue(peerData.isRetryNeeded());
        setupComm2();
        TimeBase.step(800L);
        assertNull("Retried too early", (List) this.assocQ.get(TIMEOUT_SHOULD));
        assertTrue(peerData.isRetryNeeded());
        this.comm1.sendTo(this.msg2, this.pid2);
        assertTrue(peerData.isRetryNeeded());
        TimeBase.step(300L);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg2, this.pid1, peerMessage2);
        assertTrue(peerMessage2.toString(), peerMessage2 instanceof MemoryPeerMessage);
        assertEquals(1, this.msg1.getRetryCount());
        assertEquals(HEADER_OFF_CHECK, this.msg2.getRetryCount());
        assertEquals(i + 1, peerData.getOrigCnt());
        assertEquals(i, peerData.getFailCnt());
        assertEquals(HEADER_OFF_CHECK, peerData.getAcceptCnt());
    }

    public void testRetry3() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.put("org.lockss.scomm.minFileMessageSize", "5000");
        this.cprops.put("org.lockss.scomm.maxPeerRetryInterval", "5000");
        this.cprops.put("org.lockss.scomm.minPeerRetryInterval", "10");
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "1000");
        this.cprops.put("org.lockss.scomm.retryDelay", "100");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", 20);
        this.msg1.setExpiration(4000L);
        this.msg2.setExpiration(3000L);
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertNotNull(peerData);
        assertNull(peerData.getPrimaryChannel());
        assertNull(peerData.getSecondaryChannel());
        assertTrue(peerData.isRetryNeeded());
        setupComm2();
        TimeBase.step(1000L);
        assertNull("Retried too early", (List) this.assocQ.get(TIMEOUT_SHOULD));
        assertTrue(peerData.isRetryNeeded());
        this.comm1.sendTo(this.msg2, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertFalse(peerData.isRetryNeeded());
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg2, this.pid1, peerMessage2);
        assertTrue(peerMessage2.toString(), peerMessage2 instanceof MemoryPeerMessage);
    }

    public void testRetryOrder() throws IOException {
        TimeBase.setSimulated(100L);
        this.cprops.put("org.lockss.scomm.minFileMessageSize", "5000");
        this.cprops.put("org.lockss.scomm.maxPeerRetryInterval", "5000");
        this.cprops.put("org.lockss.scomm.minPeerRetryInterval", "10");
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "1000");
        this.cprops.put("org.lockss.scomm.retryDelay", "100");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupPid(2);
        PeerIdentity peerIdentity = setupPid(3);
        this.msg2 = makePeerMessage(1, "22222", 20);
        this.msg3 = makePeerMessage(1, "33333", 20);
        this.msg1.setExpiration(1000L);
        this.msg2.setExpiration(2000L);
        this.msg3.setExpiration(3000L);
        setupComm1();
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg3, peerIdentity);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("1st channel didn't dissociate", list);
        assertEquals("1st channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(peerIdentity);
        assertIsomorphic(SetUtil.set(new BlockingStreamComm.PeerData[]{peerData}), this.comm1.peersToRetry);
        this.comm1.sendTo(this.msg2, this.pid2);
        List list2 = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("2nd channel didn't dissociate", list2);
        assertEquals("2nd channel didn't dissociate", "dissoc", list2.get(HEADER_OFF_CHECK));
        BlockingStreamComm.PeerData peerData2 = this.comm1.getPeerData(this.pid2);
        assertIsomorphic(SetUtil.set(new BlockingStreamComm.PeerData[]{peerData2, peerData}), this.comm1.peersToRetry);
        this.comm1.sendTo(this.msg1, peerIdentity);
        assertIsomorphic(SetUtil.set(new BlockingStreamComm.PeerData[]{peerData, peerData2}), this.comm1.peersToRetry);
    }

    public void testRetryIncoming() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.put("org.lockss.scomm.minFileMessageSize", "5000");
        this.cprops.put("org.lockss.scomm.maxPeerRetryInterval", "5000");
        this.cprops.put("org.lockss.scomm.retryBeforeExpiration", "0");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        this.msg2 = makePeerMessage(1, "1234567890123456789012345678901234567890", 20);
        this.msg1.setExpiration(4000L);
        this.msg2.setExpiration(3000L);
        setupComm1();
        setupPid(2);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        int i = peerData.lastSendRpt;
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Connecting channel didn't dissociate", list);
        assertEquals("Connecting channel didn't dissociate", "dissoc", list.get(HEADER_OFF_CHECK));
        assertNotNull(peerData);
        assertNull(peerData.getPrimaryChannel());
        assertNull(peerData.getSecondaryChannel());
        assertTrue(peerData.isRetryNeeded());
        assertEquals(i, peerData.getOrigCnt());
        assertEquals(i, peerData.getFailCnt());
        assertEquals(HEADER_OFF_CHECK, peerData.getAcceptCnt());
        setupComm2();
        TimeBase.step(1000L);
        assertNull("Retried too early", (List) this.assocQ.get(TIMEOUT_SHOULD));
        assertTrue(peerData.isRetryNeeded());
        this.comm2.sendTo(this.msg2, this.pid1);
        BlockingStreamComm.PeerData peerData2 = this.comm2.getPeerData(this.pid1);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertFalse(peerData.isRetryNeeded());
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertTrue(peerMessage.toString(), peerMessage instanceof MemoryPeerMessage);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg2, this.pid2, peerMessage2);
        assertTrue(peerMessage2.toString(), peerMessage2 instanceof MemoryPeerMessage);
        assertEquals(i, peerData.getOrigCnt());
        assertEquals(i, peerData.getFailCnt());
        assertEquals(1, peerData.getAcceptCnt());
        assertEquals(1, peerData2.getOrigCnt());
        assertEquals(HEADER_OFF_CHECK, peerData2.getFailCnt());
        assertEquals(HEADER_OFF_CHECK, peerData2.getAcceptCnt());
    }

    public void testSingleConnect() throws IOException {
        setupComm1();
        setupComm2();
        this.comm1.sendTo(this.msg1, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertEquals(1, peerMessage.getProtocol());
        assertEquals(this.testStr1.length(), peerMessage.getDataSize());
        assertEquals(this.testStr1.length(), this.msg1.getDataSize());
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm2).size());
        assertTrue(this.rcvdMsgs1.isEmpty());
        assertTrue(this.rcvdMsgs2.isEmpty());
        this.comm1.sendTo(this.msg2, this.pid2);
        assertEqualsMessageFrom(this.msg2, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        this.comm2.sendTo(this.msg2, this.pid1);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT);
        assertNotNull(peerMessage2);
        assertEqualsMessageFrom(this.msg2, this.pid2, peerMessage2);
        this.comm2.sendTo(this.msg3, this.pid1);
        this.comm2.sendTo(this.msg1, this.pid1);
        this.comm1.sendTo(this.msg3, this.pid2);
        this.comm1.sendTo(this.msg2, this.pid2);
        assertEquals(1, getChannels(this.comm1).size());
        assertEmpty(getRcvChannels(this.comm1));
        assertEquals(1, getChannels(this.comm2).size());
        assertEmpty(getRcvChannels(this.comm2));
        assertEqualsMessageFrom(this.msg3, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg1, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg3, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg2, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
    }

    public void testSendRateLimit() throws IOException {
        TimeBase.setSimulated(1000L);
        ConfigurationUtil.addFromArgs("org.lockss.scomm.peerSendMessageRateLimit", "3/1000");
        setupComm1();
        setupComm2();
        this.comm1.sendTo(this.msg1, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertEquals(1, peerMessage.getProtocol());
        assertEquals(this.testStr1.length(), peerMessage.getDataSize());
        assertEquals(this.testStr1.length(), this.msg1.getDataSize());
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm2).size());
        assertTrue(this.rcvdMsgs1.isEmpty());
        assertTrue(this.rcvdMsgs2.isEmpty());
        this.comm1.sendTo(this.msg2, this.pid2);
        assertEqualsMessageFrom(this.msg2, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        this.comm2.sendTo(this.msg2, this.pid1);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT);
        assertNotNull(peerMessage2);
        assertEqualsMessageFrom(this.msg2, this.pid2, peerMessage2);
        BlockingStreamComm.PeerData peerData = this.comm2.getPeerData(this.pid1);
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        this.comm2.sendTo(this.msg3, this.pid1);
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        this.comm2.sendTo(this.msg2, this.pid1);
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        this.comm2.sendTo(this.msg1, this.pid1);
        assertEquals(1, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        assertEqualsMessageFrom(this.msg3, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg2, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertNull(this.rcvdMsgs1.get(TIMEOUT_SHOULD));
        TimeBase.step(1000L);
        this.comm2.sendTo(this.msg3, this.pid1);
        this.comm2.sendTo(this.msg2, this.pid1);
        assertEquals(1, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        assertEqualsMessageFrom(this.msg3, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg2, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
    }

    public void testRcvRateLimit() throws IOException {
        TimeBase.setSimulated(1000L);
        ConfigurationUtil.addFromArgs("org.lockss.scomm.peerReceiveMessageRateLimit", "2/1000");
        setupComm1();
        setupComm2();
        this.comm1.sendTo(this.msg1, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        assertEquals(1, peerMessage.getProtocol());
        assertEquals(this.testStr1.length(), peerMessage.getDataSize());
        assertEquals(this.testStr1.length(), this.msg1.getDataSize());
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm2).size());
        assertTrue(this.rcvdMsgs1.isEmpty());
        assertTrue(this.rcvdMsgs2.isEmpty());
        this.comm1.sendTo(this.msg2, this.pid2);
        assertEqualsMessageFrom(this.msg2, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        this.comm2.sendTo(this.msg2, this.pid1);
        PeerMessage peerMessage2 = (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT);
        assertNotNull(peerMessage2);
        assertEqualsMessageFrom(this.msg2, this.pid2, peerMessage2);
        BlockingStreamComm.PeerData peerData = this.comm1.getPeerData(this.pid2);
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(HEADER_OFF_CHECK, peerData.getRcvRateLimited());
        this.comm2.sendTo(this.msg3, this.pid1);
        this.comm2.sendTo(this.msg2, this.pid1);
        this.comm2.sendTo(this.msg1, this.pid1);
        assertEqualsMessageFrom(this.msg3, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertNull(this.rcvdMsgs1.get(TIMEOUT_SHOULD));
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(2, peerData.getRcvRateLimited());
        TimeBase.step(1000L);
        this.comm2.sendTo(this.msg3, this.pid1);
        this.comm2.sendTo(this.msg2, this.pid1);
        assertEqualsMessageFrom(this.msg3, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEqualsMessageFrom(this.msg2, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEquals(HEADER_OFF_CHECK, peerData.getSendRateLimited());
        assertEquals(2, peerData.getRcvRateLimited());
    }

    public void testSimultaneousConnect1() throws IOException {
        setupComm1();
        setupComm2();
        this.comm2.setAcceptSem(this.sem2);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm2.sendTo(this.msg2, this.pid1);
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEqualsMessageFrom(this.msg2, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEquals(1, getRcvChannels(this.comm1).size());
        assertTrue(this.rcvdMsgs2.isEmpty());
        this.sem2.give();
        assertEqualsMessageFrom(this.msg1, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
    }

    public void testSimultaneousConnect2() throws IOException {
        setupComm1();
        setupComm2();
        this.comm1.setAcceptSem(this.sem1);
        this.comm2.setAcceptSem(this.sem2);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm2.sendTo(this.msg2, this.pid1);
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm2).size());
        assertTrue(this.rcvdMsgs1.isEmpty());
        assertTrue(this.rcvdMsgs2.isEmpty());
        this.sem1.give();
        assertEqualsMessageFrom(this.msg2, this.pid2, (PeerMessage) this.rcvdMsgs1.get(TIMEOUT_SHOULDNT));
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(1, getRcvChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm2).size());
        this.sem2.give();
        assertEqualsMessageFrom(this.msg1, this.pid1, (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        assertEquals(1, getChannels(this.comm1).size());
        assertEquals(1, getChannels(this.comm2).size());
        assertEquals(1, getRcvChannels(this.comm1).size());
        assertEquals(1, getRcvChannels(this.comm2).size());
    }

    public void testChannelCloseAfterTimeout() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.setProperty("org.lockss.scomm.channelIdleTime", "5000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.comm2.setChannelIdleTime(10000L);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm2.sendTo(this.msg2, this.pid1);
        MyBlockingPeerChannel myBlockingPeerChannel = (MyBlockingPeerChannel) getChannel(this.comm1, this.pid2);
        assertNotNull(myBlockingPeerChannel);
        myBlockingPeerChannel.setCalcSendWaitSem(this.sem1);
        myBlockingPeerChannel.setStopSem(this.sem2);
        this.comm1.sendTo(this.msg1, this.pid2);
        assertTrue(this.sem1.take(TIMEOUT_SHOULDNT));
        if (myBlockingPeerChannel.calcSendWaitCtr < 3) {
            assertTrue(this.sem1.take(TIMEOUT_SHOULDNT));
        }
        assertEquals(3, myBlockingPeerChannel.calcSendWaitCtr);
        TimeBase.step(4000L);
        assertEquals(1, getChannels(this.comm1).size());
        TimeBase.step(2000L);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Channel wasn't dissociated after timeout", list);
        assertEquals("Channel wasn't dissociated after timeout", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEquals(HEADER_OFF_CHECK, getChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        assertTrue("Channel didn't stop", this.sem2.take(TIMEOUT_SHOULDNT));
    }

    public void testChannelCloseAfterTimeoutDrainTimeout() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.setProperty("org.lockss.scomm.channelIdleTime", "5000");
        this.cprops.setProperty("org.lockss.scomm.drainInputTime", "3000");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.comm2.setChannelIdleTime(20000L);
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        this.comm2.sendTo(this.msg2, this.pid1);
        MyBlockingPeerChannel myBlockingPeerChannel = (MyBlockingPeerChannel) getChannel(this.comm1, this.pid2);
        MyBlockingPeerChannel myBlockingPeerChannel2 = (MyBlockingPeerChannel) getChannel(this.comm2, this.pid1);
        assertNotNull(myBlockingPeerChannel);
        assertNotNull(myBlockingPeerChannel2);
        myBlockingPeerChannel.setCalcSendWaitSem(this.sem1);
        myBlockingPeerChannel.setStopSem(this.sem2);
        myBlockingPeerChannel2.setSimulateSendBusy(true);
        this.comm1.sendTo(this.msg1, this.pid2);
        assertTrue(this.sem1.take(TIMEOUT_SHOULDNT));
        if (myBlockingPeerChannel.calcSendWaitCtr < 3) {
            assertTrue(this.sem1.take(TIMEOUT_SHOULDNT));
        }
        assertEquals(3, myBlockingPeerChannel.calcSendWaitCtr);
        TimeBase.step(4000L);
        assertEquals(1, getChannels(this.comm1).size());
        TimeBase.step(2000L);
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Channel wasn't dissociated after timeout", list);
        assertEquals("Channel wasn't dissociated after timeout", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEquals(HEADER_OFF_CHECK, getChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
        if (!isSsl()) {
            assertContains(this.comm1.drainingChannels, myBlockingPeerChannel);
        }
        if (this.shutdownOutputSupported) {
            assertFalse("Channel stopped before drain input timer", this.sem2.take(TIMEOUT_SHOULD));
        }
        TimeBase.step(4000L);
        assertTrue("Drain input timer didn't stop channel", this.sem2.take(TIMEOUT_SHOULDNT));
        assertEmpty(this.comm1.drainingChannels);
    }

    public void testReadTimeout() throws IOException {
        TimeBase.setSimulated(1000L);
        this.cprops.setProperty("org.lockss.scomm.channelIdleTime", "10h");
        this.cprops.setProperty("org.lockss.scomm.timeout.data", "100");
        ConfigurationUtil.setCurrentConfigFromProps(this.cprops);
        setupComm1();
        setupComm2();
        this.comm1.setAssocQueue(this.assocQ);
        this.comm1.sendTo(this.msg1, this.pid2);
        PeerMessage peerMessage = (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT);
        if (peerMessage != null) {
            assertEqualsMessageFrom(this.msg1, this.pid1, peerMessage);
        }
        List list = (List) this.assocQ.get(TIMEOUT_SHOULDNT);
        assertNotNull("Channel didn't close automatically after timeout", list);
        assertEquals("Channel didn't close automatically after timeout", "dissoc", list.get(HEADER_OFF_CHECK));
        assertEquals(HEADER_OFF_CHECK, getChannels(this.comm1).size());
        assertEquals(HEADER_OFF_CHECK, getRcvChannels(this.comm1).size());
    }

    public void testMultipleChannels() throws IOException {
        for (int i = HEADER_OFF_CHECK; i < 5; i++) {
            setupComm(i);
        }
        for (int i2 = HEADER_OFF_CHECK; i2 < 5; i2++) {
            for (int i3 = HEADER_OFF_CHECK; i3 < 5; i3++) {
                this.comms[i2].sendTo(makePeerMessage(1, this.pids[i2].getIdString() + ">" + this.pids[i3].getIdString()), this.pids[i3]);
            }
        }
        for (int i4 = HEADER_OFF_CHECK; i4 < 5; i4++) {
            Set allPeers = allPeers();
            while (!allPeers.isEmpty()) {
                PeerMessage peerMessage = (PeerMessage) this.rcvdMsgss[i4].get(TIMEOUT_SHOULDNT * 4);
                assertNotNull("Comm" + i4 + " didn't receive messages from " + allPeers, peerMessage);
                allPeers.remove(peerMessage.getSender());
            }
        }
    }

    Set allPeers() {
        return SetUtil.fromArray(this.pids);
    }

    public void testSockOpts(boolean z, boolean z2) throws IOException {
        setupComm1();
        setupComm2();
        this.msg2 = makePeerMessage(1, "1234567890", HEADER_OFF_PROTO);
        this.comm1.sendTo(this.msg1, this.pid2);
        assertNotNull("Didn't receive message", (PeerMessage) this.rcvdMsgs2.get(TIMEOUT_SHOULDNT));
        MyBlockingPeerChannel myBlockingPeerChannel = (MyBlockingPeerChannel) getChannel(this.comm1, this.pid2);
        MyBlockingPeerChannel myBlockingPeerChannel2 = (MyBlockingPeerChannel) getChannel(this.comm2, this.pid1);
        Socket socket = myBlockingPeerChannel.getSocket();
        Socket socket2 = myBlockingPeerChannel2.getSocket();
        assertEquals("NODELAY 1", z, socket.getTcpNoDelay());
        assertEquals("KEEPALIVE 1", z2, socket.getKeepAlive());
        assertEquals("NODELAY 2", z, socket2.getTcpNoDelay());
        assertEquals("KEEPALIVE 2", z2, socket2.getKeepAlive());
    }

    public static Test suite() {
        return variantSuites(new Class[]{Buff.class, UnBuffNoDelay.class, HighPri.class});
    }
}
