package com.addthis.meshy.service.stream;

import com.addthis.basis.util.Bytes;
import com.addthis.basis.util.Parameter;
import com.addthis.meshy.Meshy;
import com.addthis.meshy.MeshyConstants;
import com.addthis.meshy.SendWatcher;
import com.addthis.meshy.TargetHandler;
import com.addthis.meshy.VirtualFileInput;
import com.addthis.meshy.VirtualFileReference;
import com.addthis.meshy.VirtualFileSystem;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addthis/meshy/service/stream/StreamTarget.class */
public class StreamTarget extends TargetHandler implements Runnable, SendWatcher {
    protected static final Logger log = LoggerFactory.getLogger(StreamTarget.class);
    private static final int MAX_SEND_BUFFER = (Parameter.intValue("meshy.send.buffer", 5) * 1024) * 1024;
    static final int MAX_OPEN_STREAMS = Parameter.intValue("meshy.stream.maxopen", 1000);
    static final int SENDER_THREADS = Parameter.intValue("meshy.senders", 1);
    protected static final ConcurrentHashMap<String, VirtualFileReference> openStreamMap = new ConcurrentHashMap<>();
    static final LinkedBlockingQueue<Runnable> senderQueue = new LinkedBlockingQueue<>(MAX_OPEN_STREAMS - SENDER_THREADS);
    private static final ExecutorService senderPool = MoreExecutors.getExitingExecutorService(new ThreadPoolExecutor(SENDER_THREADS, SENDER_THREADS, 0, TimeUnit.MILLISECONDS, senderQueue, new ThreadFactoryBuilder().setNameFormat("sender-%d").build()), 1, TimeUnit.SECONDS);
    private static final ExecutorService inMemorySenderPool = MoreExecutors.getExitingExecutorService(new ThreadPoolExecutor(SENDER_THREADS, SENDER_THREADS, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("inMemorySender-%d").build()), 1, TimeUnit.SECONDS);
    private String fileName;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean complete = new AtomicBoolean(false);
    private final AtomicInteger sendRemain = new AtomicInteger(0);
    private final AtomicBoolean queueState = new AtomicBoolean(false);
    private final AtomicBoolean init = new AtomicBoolean(false);
    private final AtomicBoolean more = new AtomicBoolean(false);
    private int maxSend = 0;
    private VirtualFileInput fileIn = null;
    private StreamSource remoteSource = null;
    private int recvMore = 0;
    private int sentBytes = 0;

    public static void debugOpenTargets() {
        for (Map.Entry<String, VirtualFileReference> entry : openStreamMap.entrySet()) {
            log.info("OPEN: {} = {}", entry.getKey(), entry.getValue());
        }
    }

    @Override // com.addthis.meshy.TargetHandler
    public String toString() {
        return super.toString() + ";closed=" + this.closed + ";complete=" + this.complete + ";more=" + this.recvMore + ";sent=" + this.sentBytes + ";sendRemain=" + this.sendRemain + ";fileIn=" + this.fileName + ";remoteSource=" + this.remoteSource;
    }

    private void sendFail(String str) {
        if (log.isDebugEnabled()) {
            log.debug(this + " sendFail " + str);
        }
        send(Bytes.cat(new byte[]{2}, Bytes.toBytes(str)));
        sendComplete();
    }

    @Override // com.addthis.meshy.TargetHandler, com.addthis.meshy.SessionHandler
    public boolean sendComplete() {
        boolean sendComplete = super.sendComplete();
        getChannelState().removeHandlerOnComplete(this);
        return sendComplete;
    }

    @Override // com.addthis.meshy.TargetHandler
    public void channelClosed() {
        if (this.remoteSource != null) {
            if (log.isDebugEnabled()) {
                log.debug(this + " closing remote on channel down");
            }
            this.remoteSource.requestClose();
        }
    }

    private VirtualFileReference locateFile(VirtualFileSystem virtualFileSystem, String str) {
        if (log.isTraceEnabled()) {
            log.trace("locate " + virtualFileSystem + " --> " + str);
        }
        String[] strArr = virtualFileSystem.tokenizePath(str);
        VirtualFileReference fileRoot = virtualFileSystem.getFileRoot();
        for (String str2 : strArr) {
            fileRoot = fileRoot.getFile(str2);
            if (fileRoot == null) {
                if (!log.isTraceEnabled()) {
                    return null;
                }
                log.trace("no file " + virtualFileSystem + " --> " + str + " --> " + fileRoot);
                return null;
            }
        }
        if (log.isTraceEnabled()) {
            log.trace("located " + virtualFileSystem + " --> " + str + " --> " + fileRoot);
        }
        return fileRoot;
    }

    @Override // com.addthis.meshy.TargetHandler
    public void receive(int i, ChannelBuffer channelBuffer) throws Exception {
        byte[] bytes = Meshy.getBytes(i, channelBuffer);
        if (this.remoteSource != null) {
            if (log.isTraceEnabled()) {
                log.trace(this + " recv proxy to " + this.remoteSource);
            }
            this.remoteSource.send(bytes);
            return;
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        int read = byteArrayInputStream.read();
        if (log.isTraceEnabled()) {
            log.trace(this + " recv mode=" + read + " len=" + i);
        }
        switch (read) {
            case MeshyConstants.KEY_RESPONSE /* 0 */:
            case 4:
                this.init.set(true);
                String readString = Bytes.readString(byteArrayInputStream);
                this.fileName = Bytes.readString(byteArrayInputStream);
                this.maxSend = Bytes.readInt(byteArrayInputStream);
                if (this.more.get()) {
                    log.info(this + " was more true : uuid=" + readString + " file=" + this.fileName + " max=" + this.maxSend);
                }
                if (log.isTraceEnabled()) {
                    log.trace(this + " start uuid=" + readString + " file=" + this.fileName + " max=" + this.maxSend);
                }
                if (this.maxSend <= 0) {
                    sendFail("invalid buffer size: " + this.maxSend);
                    return;
                }
                if (StreamService.openStreams.count() > MAX_OPEN_STREAMS) {
                    log.warn("max open files: rejecting " + this.fileName);
                    sendFail(StreamService.ERROR_EXCEED_OPEN);
                    return;
                }
                HashMap hashMap = null;
                if (read == 4) {
                    int readInt = Bytes.readInt(byteArrayInputStream);
                    hashMap = new HashMap();
                    while (true) {
                        int i2 = readInt;
                        readInt--;
                        if (i2 > 0) {
                            hashMap.put(Bytes.readString(byteArrayInputStream), Bytes.readString(byteArrayInputStream));
                        }
                    }
                }
                if (!readString.equals(getChannelMaster().getUUID())) {
                    this.remoteSource = new StreamSource(getChannelMaster(), readString, readString, this.fileName, hashMap, this.maxSend) { // from class: com.addthis.meshy.service.stream.StreamTarget.1
                        @Override // com.addthis.meshy.service.stream.StreamSource, com.addthis.meshy.SourceHandler
                        public void receive(int i3, ChannelBuffer channelBuffer2) throws Exception {
                            if (StreamService.DIRECT_COPY) {
                                StreamTarget.this.send(channelBuffer2, i3);
                            } else {
                                StreamTarget.this.send(Meshy.getBytes(i3, channelBuffer2));
                            }
                        }

                        @Override // com.addthis.meshy.service.stream.StreamSource, com.addthis.meshy.SourceHandler
                        public void receiveComplete() {
                            StreamTarget.this.sendComplete();
                        }
                    };
                    if (log.isTraceEnabled()) {
                        log.trace(this + " start remote=" + this.remoteSource + " #" + this.remoteSource.getPeerCount());
                    }
                    if (this.remoteSource.getPeerCount() == 0) {
                        sendFail("no matching peers");
                        this.remoteSource = null;
                        return;
                    }
                    return;
                }
                VirtualFileReference virtualFileReference = null;
                VirtualFileSystem[] fileSystems = getChannelMaster().getFileSystems();
                int length = fileSystems.length;
                int i3 = 0;
                while (true) {
                    if (i3 < length) {
                        VirtualFileSystem virtualFileSystem = fileSystems[i3];
                        virtualFileReference = locateFile(virtualFileSystem, this.fileName);
                        if (virtualFileReference != null) {
                            this.fileIn = virtualFileReference.getInput(hashMap);
                            if (this.fileIn == null && log.isDebugEnabled()) {
                                log.debug("null file :: vfs=" + virtualFileSystem.getClass().getName() + " ref=" + virtualFileReference + " param=" + hashMap + " fileIn=" + this.fileIn);
                            }
                        } else {
                            i3++;
                        }
                    }
                }
                if (virtualFileReference == null || this.fileIn == null) {
                    sendFail("No Such File: " + this.fileName);
                    return;
                }
                StreamService.newStreamMeter.mark();
                StreamService.openStreams.inc();
                StreamService.newOpenStreams.incrementAndGet();
                openStreamMap.put(this.fileName, virtualFileReference);
                if (log.isTraceEnabled()) {
                    log.trace(this + " start local=" + this.fileIn);
                    return;
                }
                return;
            case 1:
                this.more.set(true);
                if (!this.init.get()) {
                }
                if (log.isTraceEnabled()) {
                    log.trace(this + " more request");
                }
                if (this.sendRemain.addAndGet(this.maxSend) - this.maxSend <= 0 && this.queueState.compareAndSet(false, true)) {
                    scheduleForSending();
                }
                this.recvMore++;
                return;
            case 2:
            default:
                log.warn(getChannelMaster().getUUID() + " target unknown mode: " + read);
                return;
            case 3:
                if (log.isTraceEnabled()) {
                    log.trace(this + " close request");
                }
                modeClose();
                return;
        }
    }

    @Override // com.addthis.meshy.TargetHandler
    public void receiveComplete() throws Exception {
        if (this.remoteSource != null) {
            this.remoteSource.sendComplete();
        } else {
            complete();
        }
    }

    private void modeClose() throws Exception {
        if (log.isDebugEnabled()) {
            log.debug(this + " close " + this.fileIn + " remote=" + this.remoteSource + " closed=" + this.closed);
        }
        if (this.closed.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug(this + " sending close");
            }
            send(new byte[]{3});
            sendComplete();
            complete();
        }
    }

    private void complete() throws Exception {
        if (log.isDebugEnabled()) {
            log.debug(this + " complete " + this.fileIn + " remote=" + this.remoteSource + " complete=" + this.complete);
        }
        if (!this.complete.compareAndSet(false, true) || this.fileIn == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug(this + " close file on complete");
        }
        this.fileIn.close();
        StreamService.closedStreams.incrementAndGet();
        StreamService.openStreams.dec();
        openStreamMap.remove(this.fileName);
    }

    private boolean maybeDropSend() {
        if (this.sendRemain.get() <= 0) {
            this.queueState.set(false);
            if (this.sendRemain.get() <= 0 || !this.queueState.compareAndSet(false, true)) {
                return true;
            }
        }
        return this.closed.get() || this.complete.get() || this.fileIn == null;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        int i2 = 0;
        while (i < this.maxSend) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                StreamService.totalReads.incrementAndGet();
                if (i2 > 0) {
                    StreamService.seqReads.incrementAndGet();
                }
                i2 = doSendMore(this);
                if (i2 <= 0) {
                    break;
                }
                i += i2;
                StreamService.sendWaiting.addAndGet(i2);
                StreamService.readWaitTime.addAndGet((int) (System.currentTimeMillis() - currentTimeMillis));
                if (StreamService.sendWaiting.get() > MAX_SEND_BUFFER) {
                    if (log.isTraceEnabled()) {
                        log.trace(this + " sleeping " + StreamService.sendWaiting + " > " + MAX_SEND_BUFFER);
                    }
                    Thread.sleep(10L);
                    StreamService.sleeps.incrementAndGet();
                }
            } catch (Exception e) {
                e.printStackTrace();
                sendFail(e.toString());
                return;
            }
        }
        if (i2 < 0 || maybeDropSend()) {
            return;
        }
        scheduleForSending();
    }

    private void scheduleForSending() {
        if (log.isTraceEnabled()) {
            log.trace(this + " scheduleTask");
        }
        if (this.fileName.startsWith("/meshy/")) {
            inMemorySenderPool.execute(this);
        } else {
            senderPool.execute(this);
        }
        if (log.isTraceEnabled()) {
            log.trace(this + " scheduled ");
        }
    }

    private int doSendMore(SendWatcher sendWatcher) {
        if (log.isTraceEnabled()) {
            log.trace(this + " sendMore for " + sendWatcher + " rem=" + this.sendRemain + " closed=" + this.closed + " complete=" + this.complete + " fileIn=" + this.fileIn);
        }
        try {
            if (maybeDropSend()) {
                if (!StreamService.LOG_DROP_MORE && !log.isDebugEnabled()) {
                    return -1;
                }
                log.debug(this + " sendMore drop request sr=" + this.sendRemain + ", cl=" + this.closed + ", co=" + this.complete + ", fi=" + this.fileIn);
                return -1;
            }
            byte[] nextBytes = this.fileIn.nextBytes(StreamService.READ_WAIT);
            if (nextBytes == null) {
                if (this.fileIn.isEOF()) {
                    if (log.isTraceEnabled()) {
                        log.trace(this + " read close on null. sendRemain=" + this.sendRemain);
                    }
                    modeClose();
                    return -1;
                }
                if (!log.isDebugEnabled()) {
                    return 0;
                }
                log.debug(this + " read timeout. sendRemain=" + this.sendRemain);
                return 0;
            }
            if (log.isTraceEnabled()) {
                log.trace(this + " send add read=" + nextBytes.length);
            }
            StreamService.readBytes.addAndGet(nextBytes.length);
            ChannelBuffer sendBuffer = getSendBuffer(nextBytes.length + 1);
            sendBuffer.writeByte(1);
            sendBuffer.writeBytes(nextBytes);
            int send = send(sendBuffer, sendWatcher);
            this.sendRemain.addAndGet(-send);
            this.sentBytes += send;
            if (log.isTraceEnabled()) {
                log.trace(this + " read read=" + nextBytes.length + " sendRemain=" + this.sendRemain);
            }
            return send;
        } catch (Exception e) {
            e.printStackTrace();
            return -1;
        }
    }

    @Override // com.addthis.meshy.SendWatcher
    public void sendFinished(int i) {
        if (log.isTraceEnabled()) {
            log.trace(this + " send finished " + i);
        }
        StreamService.sendWaiting.addAndGet(-i);
    }
}
