package com.addthis.meshy.service.stream;

import com.addthis.basis.util.LessBytes;
import com.addthis.basis.util.Parameter;
import com.addthis.meshy.ChannelMaster;
import com.addthis.meshy.ChannelState;
import com.addthis.meshy.Meshy;
import com.addthis.meshy.MeshyConstants;
import com.addthis.meshy.SourceHandler;
import com.addthis.meshy.TargetHandler;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addthis/meshy/service/stream/StreamSource.class */
public class StreamSource extends SourceHandler {
    protected static final Logger log;
    private static final int REFILL_FACTOR;
    private static final int DEFAULT_MAX_SEND;
    private static final boolean FETCH_ON_OPEN;

    @Nullable
    private final BlockingQueue<byte[]> messageQueue;
    private final String fileName;
    private final String nodeUuid;
    private final int maxBufferSize;
    private final int refillThreshold;
    private final boolean isProxy;
    private final AtomicLong expectingBytes;
    private final AtomicLong moreRequests;
    private final AtomicLong recvBytes;
    private final AtomicBoolean closeSent;

    @Nullable
    private volatile String err;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamSource(ChannelMaster channelMaster, String str, String str2, int i) throws IOException {
        this(channelMaster, MeshyConstants.LINK_ALL, str, str2, null, i);
    }

    public StreamSource(ChannelMaster channelMaster, String str, String str2, Map<String, String> map, int i) throws IOException {
        this(channelMaster, MeshyConstants.LINK_ALL, str, str2, map, i);
    }

    public StreamSource(ChannelMaster channelMaster, String str, String str2, String str3, @Nullable Map<String, String> map, int i) {
        super(channelMaster, (Class<? extends TargetHandler>) StreamTarget.class, str);
        this.err = StreamService.ERROR_UNKNOWN;
        this.fileName = str3;
        this.nodeUuid = str2;
        if (i <= 0) {
            this.maxBufferSize = DEFAULT_MAX_SEND;
        } else {
            this.maxBufferSize = i;
        }
        this.refillThreshold = this.maxBufferSize / REFILL_FACTOR;
        this.expectingBytes = new AtomicLong();
        this.recvBytes = new AtomicLong();
        this.moreRequests = new AtomicLong();
        this.closeSent = new AtomicBoolean();
        if (str == MeshyConstants.LINK_ALL || str == str2) {
            this.isProxy = false;
            this.messageQueue = new LinkedBlockingQueue();
        } else {
            this.isProxy = true;
            this.messageQueue = null;
        }
        log.debug("{} new file={} sendBuffer={} prefetch={}", new Object[]{this, str3, Integer.valueOf(this.maxBufferSize), Boolean.valueOf(FETCH_ON_OPEN)});
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            byteArrayOutputStream.write(map == null ? 0 : 4);
            LessBytes.writeString(str2, byteArrayOutputStream);
            LessBytes.writeString(str3, byteArrayOutputStream);
            LessBytes.writeInt(this.maxBufferSize, byteArrayOutputStream);
            if (map != null) {
                LessBytes.writeInt(map.size(), byteArrayOutputStream);
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    LessBytes.writeString(entry.getKey(), byteArrayOutputStream);
                    LessBytes.writeString(entry.getValue(), byteArrayOutputStream);
                }
            }
            if (!send(byteArrayOutputStream.toByteArray())) {
                log.warn("Failed to send stream init data for {}", this);
            }
            if (this.isProxy || !FETCH_ON_OPEN) {
                return;
            }
            requestMoreData();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public BlockingQueue<byte[]> getMessageQueue() {
        maybePrime();
        return this.messageQueue;
    }

    public SourceInputStream getInputStream() {
        return new SourceInputStream(this);
    }

    @Override // com.addthis.meshy.SourceHandler
    public void receive(ChannelState channelState, int i, ByteBuf byteBuf) throws Exception {
        if (!$assertionsDisabled && this.messageQueue == null) {
            throw new AssertionError("must override receive for proxy mode");
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(Meshy.getBytes(i, byteBuf));
        int read = byteArrayInputStream.read();
        log.trace("{} recv mode={} len={}", new Object[]{this, Integer.valueOf(read), Integer.valueOf(i)});
        switch (read) {
            case StreamService.STREAM_BYTE_OVERHEAD /* 1 */:
                byte[] readBytes = LessBytes.readBytes(byteArrayInputStream, byteArrayInputStream.available());
                this.recvBytes.addAndGet(readBytes.length);
                this.recvBytes.addAndGet(13L);
                this.messageQueue.put(readBytes);
                return;
            case 2:
                this.err = LessBytes.toString(LessBytes.readFully(byteArrayInputStream));
                this.messageQueue.put(StreamService.FAIL_BYTES);
                return;
            case 3:
                this.messageQueue.put(StreamService.CLOSE_BYTES);
                return;
            default:
                log.warn("source unknown mode: {}", Integer.valueOf(read));
                return;
        }
    }

    @Override // com.addthis.meshy.SourceHandler
    public void receiveComplete() throws Exception {
        log.trace("{} recv send complete", this);
        sendComplete();
        if (!$assertionsDisabled && this.messageQueue == null) {
            throw new AssertionError("must override receiveComplete for proxy mode");
        }
        this.messageQueue.put(StreamService.FAIL_BYTES);
    }

    @Override // com.addthis.meshy.SourceHandler
    public void channelClosed(ChannelState channelState) {
        if (this.messageQueue != null) {
            try {
                this.err = StreamService.ERROR_CHANNEL_LOST;
                this.messageQueue.put(StreamService.FAIL_BYTES);
            } catch (InterruptedException e) {
                Throwables.propagate(e);
            }
        }
    }

    public void requestClose() {
        if (!this.closeSent.compareAndSet(false, true)) {
            log.debug("superfluous requestClose call performed");
            return;
        }
        log.trace("{} send close", this);
        send(new byte[]{3});
        sendComplete();
    }

    public final void performBufferAccounting(byte[] bArr) {
        int length = bArr.length + 12 + 1;
        long andAdd = this.expectingBytes.getAndAdd(-length);
        if (andAdd >= this.refillThreshold && andAdd - length < this.refillThreshold) {
            int i = (length / this.maxBufferSize) + 1;
            if (i != 1) {
                log.warn("Sending {} sendMore requests due to pathologically large chunk of size {}", Integer.valueOf(i), Integer.valueOf(length));
            }
            requestMoreData(i);
        }
        log.trace("{} fill take={}", this, Integer.valueOf(bArr.length));
    }

    public final boolean isCloseSignal(byte[] bArr) {
        if (bArr.length != 0 || bArr == StreamService.FAIL_BYTES) {
            return false;
        }
        if (bArr == StreamService.CLOSE_BYTES) {
            return true;
        }
        log.debug("size zero application layer data found");
        return true;
    }

    public final void throwIfErrorSignal(byte[] bArr) throws IOException {
        if (bArr == StreamService.FAIL_BYTES) {
            throw new IOException(this.err);
        }
    }

    public final void requestMoreData() {
        requestMoreData(1L);
    }

    public final void requestMoreData(long j) {
        this.expectingBytes.addAndGet(this.maxBufferSize * j);
        for (int i = 0; i < j; i++) {
            this.moreRequests.incrementAndGet();
            requestMoreDataInternal();
        }
    }

    public boolean maybePrime() {
        if (!this.moreRequests.compareAndSet(0L, 1L)) {
            return false;
        }
        this.expectingBytes.addAndGet(this.maxBufferSize);
        requestMoreDataInternal();
        return true;
    }

    private void requestMoreDataInternal() {
        log.trace("{} send request more", this);
        send(new byte[]{1});
    }

    @Override // com.addthis.meshy.SourceHandler
    public String toString() {
        return MoreObjects.toStringHelper(this).add("messageQueue.size", this.messageQueue == null ? "(queue is null)" : Integer.valueOf(this.messageQueue.size())).add("fileName", this.fileName).add("nodeUuid", this.nodeUuid).add("maxBufferSize", this.maxBufferSize).add("refillThreshold", this.refillThreshold).add("isProxy", this.isProxy).add("expectingBytes", this.expectingBytes).add("moreRequests", this.moreRequests).add("recvBytes", this.recvBytes).add("closeSent", this.closeSent).add("err", this.err).toString();
    }

    static {
        $assertionsDisabled = !StreamSource.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(StreamSource.class);
        REFILL_FACTOR = Parameter.intValue("meshy.refill.factor", 2);
        DEFAULT_MAX_SEND = Parameter.intValue("meshy.stream.buffer", 1) * 1024 * 1024;
        FETCH_ON_OPEN = Parameter.boolValue("meshy.stream.prefetch", false);
    }
}
