package com.addthis.meshy.service.stream;

import com.addthis.basis.util.LessBytes;
import com.addthis.basis.util.Parameter;
import com.addthis.meshy.ChannelState;
import com.addthis.meshy.Meshy;
import com.addthis.meshy.MeshyConstants;
import com.addthis.meshy.SendWatcher;
import com.addthis.meshy.TargetHandler;
import com.addthis.meshy.VirtualFileInput;
import com.addthis.meshy.VirtualFileReference;
import com.addthis.meshy.VirtualFileSystem;
import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addthis/meshy/service/stream/StreamTarget.class */
public class StreamTarget extends TargetHandler implements Runnable, SendWatcher {
    protected static final Logger log = LoggerFactory.getLogger(StreamTarget.class);
    private static final int MAX_SEND_BUFFER = (Parameter.intValue("meshy.send.buffer", 5) * 1024) * 1024;
    static final int MAX_OPEN_STREAMS = Parameter.intValue("meshy.stream.maxopen", 1000);
    static final int SENDER_THREADS = Parameter.intValue("meshy.senders", 1);
    static final LinkedBlockingQueue<Runnable> senderQueue = new LinkedBlockingQueue<>(MAX_OPEN_STREAMS - SENDER_THREADS);
    private static final ExecutorService senderPool = new ThreadPoolExecutor(SENDER_THREADS, SENDER_THREADS, 0, TimeUnit.MILLISECONDS, senderQueue, new ThreadFactoryBuilder().setNameFormat("sender-%d").setDaemon(true).build());
    private static final ExecutorService inMemorySenderPool = new ThreadPoolExecutor(SENDER_THREADS, SENDER_THREADS, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("inMemorySender-%d").setDaemon(true).build());
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean streamComplete = new AtomicBoolean(false);
    private final AtomicInteger sendRemain = new AtomicInteger(0);
    private final AtomicBoolean queueState = new AtomicBoolean(false);
    private int maxSend = 0;
    private int recvMore = 0;
    private int sentBytes = 0;
    private VirtualFileInput fileIn = null;
    private StreamSource remoteSource = null;
    private String fileName = null;
    private boolean startFrameReceived = false;

    /* loaded from: input_file:com/addthis/meshy/service/stream/StreamTarget$ProxyStreamSource.class */
    private class ProxyStreamSource extends StreamSource {
        public ProxyStreamSource(String str, Map<String, String> map) {
            super(StreamTarget.this.getChannelMaster(), str, str, StreamTarget.this.fileName, map, StreamTarget.this.maxSend);
        }

        @Override // com.addthis.meshy.service.stream.StreamSource, com.addthis.meshy.SourceHandler
        public void receive(ChannelState channelState, int i, ByteBuf byteBuf) throws Exception {
            if (StreamService.DIRECT_COPY) {
                StreamTarget.this.send(byteBuf, i);
            } else {
                StreamTarget.this.send(Meshy.getBytes(i, byteBuf));
            }
        }

        @Override // com.addthis.meshy.service.stream.StreamSource, com.addthis.meshy.SourceHandler
        public void channelClosed(ChannelState channelState) {
            StreamTarget.this.sendFail("Remote Channel Connection Lost for channel " + channelState.getChannel());
        }

        @Override // com.addthis.meshy.service.stream.StreamSource, com.addthis.meshy.SourceHandler
        public void receiveComplete() {
            StreamTarget.this.sendComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.addthis.meshy.TargetHandler
    public MoreObjects.ToStringHelper toStringHelper() {
        return super.toStringHelper().add("closed", this.closed).add("streamComplete", this.streamComplete).add("sendRemain", this.sendRemain).add("queueState", this.queueState).add("fileIn", this.fileIn).add("remoteSource", this.remoteSource).add("fileName", this.fileName).add("recvMore", this.recvMore).add("sentBytes", this.sentBytes).add("maxSend", this.maxSend);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendFail(String str) {
        log.debug("{} sendFail {}", this, str);
        send(LessBytes.cat(new byte[]{2}, LessBytes.toBytes(str)));
        sendComplete();
    }

    @Override // com.addthis.meshy.TargetHandler, com.addthis.meshy.SessionHandler
    public boolean sendComplete() {
        boolean sendComplete = super.sendComplete();
        getChannelState().removeHandlerOnComplete(this);
        return sendComplete;
    }

    @Override // com.addthis.meshy.TargetHandler
    public void channelClosed() {
        if (this.remoteSource != null) {
            log.debug("{} closing remote on channel down", this);
            this.remoteSource.requestClose();
        }
    }

    private static VirtualFileReference locateFile(VirtualFileSystem virtualFileSystem, String str) {
        log.trace("locate {} --> {}", virtualFileSystem, str);
        String[] strArr = virtualFileSystem.tokenizePath(str);
        VirtualFileReference fileRoot = virtualFileSystem.getFileRoot();
        for (String str2 : strArr) {
            fileRoot = fileRoot.getFile(str2);
            if (fileRoot == null) {
                log.trace("no file {} --> {} --> {}", new Object[]{virtualFileSystem, str, str2});
                return null;
            }
        }
        log.trace("located {} --> {} --> {}", new Object[]{virtualFileSystem, str, fileRoot});
        return fileRoot;
    }

    @Override // com.addthis.meshy.TargetHandler
    public void receive(int i, ByteBuf byteBuf) throws Exception {
        byte[] bytes = Meshy.getBytes(i, byteBuf);
        if (this.remoteSource != null) {
            log.trace("{} recv proxy to {}", this, this.remoteSource);
            this.remoteSource.send(bytes);
            return;
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        int read = byteArrayInputStream.read();
        log.trace("{} recv mode={} len={}", new Object[]{this, Integer.valueOf(read), Integer.valueOf(i)});
        switch (read) {
            case MeshyConstants.KEY_RESPONSE /* 0 */:
            case 4:
                this.startFrameReceived = true;
                String readString = LessBytes.readString(byteArrayInputStream);
                this.fileName = LessBytes.readString(byteArrayInputStream);
                this.maxSend = LessBytes.readInt(byteArrayInputStream);
                log.trace("{} start uuid={} file={} max={}", new Object[]{this, readString, this.fileName, Integer.valueOf(this.maxSend)});
                if (this.maxSend <= 0) {
                    sendFail("invalid buffer size: " + this.maxSend);
                    return;
                }
                if (StreamService.openStreams.count() > MAX_OPEN_STREAMS) {
                    log.warn("max open files: rejecting {}", this.fileName);
                    sendFail(StreamService.ERROR_EXCEED_OPEN);
                    return;
                }
                HashMap hashMap = null;
                if (read == 4) {
                    int readInt = LessBytes.readInt(byteArrayInputStream);
                    hashMap = new HashMap(readInt);
                    while (true) {
                        int i2 = readInt;
                        readInt--;
                        if (i2 > 0) {
                            hashMap.put(LessBytes.readString(byteArrayInputStream), LessBytes.readString(byteArrayInputStream));
                        }
                    }
                }
                if (!readString.equals(getChannelMaster().getUUID())) {
                    try {
                        this.remoteSource = new ProxyStreamSource(readString, hashMap);
                        if (log.isTraceEnabled()) {
                            log.trace("{} start remote={} #{}", new Object[]{this, this.remoteSource, Integer.valueOf(this.remoteSource.getPeerCount())});
                        }
                        return;
                    } catch (Exception e) {
                        sendFail(e.getMessage());
                        return;
                    }
                }
                VirtualFileReference virtualFileReference = null;
                VirtualFileSystem[] fileSystems = getChannelMaster().getFileSystems();
                int length = fileSystems.length;
                int i3 = 0;
                while (true) {
                    if (i3 < length) {
                        VirtualFileSystem virtualFileSystem = fileSystems[i3];
                        virtualFileReference = locateFile(virtualFileSystem, this.fileName);
                        if (virtualFileReference != null) {
                            this.fileIn = virtualFileReference.getInput(hashMap);
                            if (this.fileIn == null && log.isDebugEnabled()) {
                                log.debug("null file :: vfs={} ref={} param={} fileIn={}", new Object[]{virtualFileSystem.getClass().getName(), virtualFileReference, hashMap, this.fileIn});
                            }
                        } else {
                            i3++;
                        }
                    }
                }
                if (virtualFileReference == null || this.fileIn == null) {
                    sendFail("No Such File: " + this.fileName);
                    return;
                }
                StreamService.newStreamMeter.mark();
                StreamService.openStreams.inc();
                StreamService.newOpenStreams.incrementAndGet();
                log.trace("{} start local={}", this, this.fileIn);
                return;
            case StreamService.STREAM_BYTE_OVERHEAD /* 1 */:
                if (!this.startFrameReceived) {
                    sendFail("'send more' frame received before start frame; see: creation frames");
                    return;
                }
                log.trace("{} more request", this);
                if (this.sendRemain.addAndGet(this.maxSend) - this.maxSend <= 0 && this.queueState.compareAndSet(false, true)) {
                    scheduleForSending();
                }
                this.recvMore++;
                return;
            case 2:
            default:
                log.warn("{} target unknown mode: {}", getChannelMaster().getUUID(), Integer.valueOf(read));
                sendFail("frame mode: " + read + " not recognized");
                return;
            case 3:
                if (!this.startFrameReceived) {
                    sendFail("close frame received before start frame; see: creation frames");
                    return;
                } else {
                    log.trace("{} close request", this);
                    modeClose();
                    return;
                }
        }
    }

    @Override // com.addthis.meshy.TargetHandler
    public void receiveComplete() throws Exception {
        if (this.remoteSource != null) {
            this.remoteSource.sendComplete();
        } else {
            complete();
        }
    }

    private void modeClose() throws Exception {
        log.debug("{} close {} remote={} closed={}", new Object[]{this, this.fileIn, this.remoteSource, this.closed});
        if (this.closed.compareAndSet(false, true)) {
            log.debug("{} sending close", this);
            send(new byte[]{3});
            sendComplete();
            complete();
        }
    }

    private void complete() throws Exception {
        log.debug("{} complete {} remote={} streamComplete={}", new Object[]{this, this.fileIn, this.remoteSource, this.streamComplete});
        if (!this.streamComplete.compareAndSet(false, true) || this.fileIn == null) {
            return;
        }
        log.debug("{} close file on complete", this);
        this.fileIn.close();
        StreamService.closedStreams.incrementAndGet();
        StreamService.openStreams.dec();
    }

    private boolean maybeDropSend() {
        if (this.sendRemain.get() <= 0) {
            this.queueState.set(false);
            if (this.sendRemain.get() <= 0 || !this.queueState.compareAndSet(false, true)) {
                return true;
            }
        }
        return this.closed.get() || this.streamComplete.get() || this.fileIn == null;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        int i2 = 0;
        while (i < this.maxSend) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                StreamService.totalReads.incrementAndGet();
                if (i2 > 0) {
                    StreamService.seqReads.incrementAndGet();
                }
                i2 = doSendMore(this);
                if (i2 <= 0) {
                    break;
                }
                i += i2;
                StreamService.sendWaiting.addAndGet(i2);
                StreamService.readWaitTime.addAndGet((int) (System.currentTimeMillis() - currentTimeMillis));
                if (StreamService.sendWaiting.get() > MAX_SEND_BUFFER) {
                    log.trace("{} sleeping {} > {}", new Object[]{this, StreamService.sendWaiting, Integer.valueOf(MAX_SEND_BUFFER)});
                    Thread.sleep(10L);
                    StreamService.sleeps.incrementAndGet();
                }
            } catch (Exception e) {
                e.printStackTrace();
                sendFail(e.toString());
                return;
            }
        }
        if (i2 < 0 || maybeDropSend()) {
            return;
        }
        scheduleForSending();
    }

    private void scheduleForSending() {
        log.trace("{} scheduleTask", this);
        if (this.fileName.startsWith("/meshy/")) {
            inMemorySenderPool.execute(this);
        } else {
            senderPool.execute(this);
        }
        log.trace("{} scheduled ", this);
    }

    private int doSendMore(SendWatcher sendWatcher) {
        if (log.isTraceEnabled()) {
            log.trace("{} sendMore for {} rem={} closed={} streamComplete={} fileIn={}", new Object[]{this, sendWatcher, this.sendRemain, this.closed, this.streamComplete, this.fileIn});
        }
        try {
            if (maybeDropSend()) {
                if (!StreamService.LOG_DROP_MORE && !log.isDebugEnabled()) {
                    return -1;
                }
                log.debug("{} sendMore drop request sr={}, cl={}, co={}, fi={}", new Object[]{this, this.sendRemain, this.closed, this.streamComplete, this.fileIn});
                return -1;
            }
            byte[] nextBytes = this.fileIn.nextBytes(StreamService.READ_WAIT);
            if (nextBytes == null) {
                if (this.fileIn.isEOF()) {
                    if (log.isTraceEnabled()) {
                        log.trace("{} read close on null. sendRemain={}", this, this.sendRemain);
                    }
                    modeClose();
                    return -1;
                }
                if (!log.isDebugEnabled()) {
                    return 0;
                }
                log.debug("{} read timeout. sendRemain={}", this, this.sendRemain);
                return 0;
            }
            if (log.isTraceEnabled()) {
                log.trace("{} send add read={}", this, Integer.valueOf(nextBytes.length));
            }
            StreamService.readBytes.addAndGet(nextBytes.length);
            ByteBuf sendBuffer = getSendBuffer(nextBytes.length + 1);
            sendBuffer.writeByte(1);
            sendBuffer.writeBytes(nextBytes);
            int send = send(sendBuffer, sendWatcher);
            this.sendRemain.addAndGet(-send);
            this.sentBytes += send;
            if (log.isTraceEnabled()) {
                log.trace("{} read read={} sendRemain={}", new Object[]{this, Integer.valueOf(nextBytes.length), this.sendRemain});
            }
            return send;
        } catch (Exception e) {
            e.printStackTrace();
            return -1;
        }
    }

    @Override // com.addthis.meshy.SendWatcher
    public void sendFinished(int i) {
        if (log.isTraceEnabled()) {
            log.trace("{} send finished {}", this, Integer.valueOf(i));
        }
        StreamService.sendWaiting.addAndGet(-i);
    }
}
