package com.abusix.knsq.connection;

import com.abusix.knsq.config.ClientConfig;
import com.abusix.knsq.protocol.Message;
import com.abusix.knsq.subscribe.Subscription;
import com.google.common.net.HostAndPort;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: SubConnection.kt */
@Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\\\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\t\n\u0002\u0010\u000e\n\u0002\b\u000b\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n\u0002\b\u0007\b��\u0018�� 62\u00020\u0001:\u00016B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bJ\b\u0010$\u001a\u00020\u0017H\u0016J\b\u0010%\u001a\u00020\u0017H\u0002J\u000e\u0010&\u001a\u00020\u00172\u0006\u0010'\u001a\u00020!J\b\u0010(\u001a\u00020\u0017H\u0002J\u0010\u0010)\u001a\u00020\u00172\u0006\u0010*\u001a\u00020\u0016H\u0016J\u0018\u0010+\u001a\u00020\u00172\u0006\u0010'\u001a\u00020!2\b\b\u0002\u0010,\u001a\u00020-J\u0018\u0010.\u001a\u00020\u00172\u0006\u0010\u0011\u001a\u00020\u000f2\b\b\u0002\u0010/\u001a\u000200J\b\u00101\u001a\u00020\u0017H\u0002J\b\u00102\u001a\u00020!H\u0016J\b\u00103\u001a\u00020\u0017H\u0016J\b\u00104\u001a\u00020!H\u0016J\u000e\u00105\u001a\u00020\u00172\u0006\u0010'\u001a\u00020!R\u000e\u0010\t\u001a\u00020\nX\u0082\u000e¢\u0006\u0002\n��R\u001e\u0010\u000b\u001a\u0012\u0012\u0002\b\u0003 \r*\b\u0012\u0002\b\u0003\u0018\u00010\f0\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u000e¢\u0006\u0002\n��R\u001e\u0010\u0011\u001a\u00020\u000f2\u0006\u0010\u0010\u001a\u00020\u000f@BX\u0086\u000e¢\u0006\b\n��\u001a\u0004\b\u0012\u0010\u0013R(\u0010\u0014\u001a\u0010\u0012\u0004\u0012\u00020\u0016\u0012\u0004\u0012\u00020\u0017\u0018\u00010\u0015X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0018\u0010\u0019\"\u0004\b\u001a\u0010\u001bR(\u0010\u001c\u001a\u0010\u0012\u0004\u0012\u00020\u0016\u0012\u0004\u0012\u00020\u0017\u0018\u00010\u0015X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u001d\u0010\u0019\"\u0004\b\u001e\u0010\u001bR\u000e\u0010\u001f\u001a\u00020\nX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010 \u001a\u00020!8F¢\u0006\u0006\u001a\u0004\b\"\u0010#¨\u00067"}, d2 = {"Lcom/abusix/knsq/connection/SubConnection;", "Lcom/abusix/knsq/connection/Connection;", "clientConfig", "Lcom/abusix/knsq/config/ClientConfig;", "host", "Lcom/google/common/net/HostAndPort;", "subscription", "Lcom/abusix/knsq/subscribe/Subscription;", "(Lcom/abusix/knsq/config/ClientConfig;Lcom/google/common/net/HostAndPort;Lcom/abusix/knsq/subscribe/Subscription;)V", "finishedCount", "", "flushTask", "Ljava/util/concurrent/ScheduledFuture;", "kotlin.jvm.PlatformType", "inFlight", "", "<set-?>", "maxInFlight", "getMaxInFlight", "()I", "onFailedMessage", "Lkotlin/Function1;", "Lcom/abusix/knsq/protocol/Message;", "", "getOnFailedMessage", "()Lkotlin/jvm/functions/Function1;", "setOnFailedMessage", "(Lkotlin/jvm/functions/Function1;)V", "onMessage", "getOnMessage", "setOnMessage", "requeuedCount", "topic", "", "getTopic", "()Ljava/lang/String;", "connect", "delayedFlush", "finish", "id", "messageDone", "onIncomingMessage", "message", "requeue", "delay", "Ljava/time/Duration;", "setMaxInFlight", "isActive", "", "softFlush", "stateDesc", "stop", "toString", "touch", "Companion", "knsq"})
/* loaded from: input_file:com/abusix/knsq/connection/SubConnection.class */
public final class SubConnection extends Connection {

    @NotNull
    private final Subscription subscription;
    private final ScheduledFuture<?> flushTask;
    private int inFlight;
    private int maxInFlight;
    private long finishedCount;
    private long requeuedCount;

    @Nullable
    private Function1<? super Message, Unit> onMessage;

    @Nullable
    private Function1<? super Message, Unit> onFailedMessage;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(SubConnection.class);

    /* compiled from: SubConnection.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0006"}, d2 = {"Lcom/abusix/knsq/connection/SubConnection$Companion;", "", "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "knsq"})
    /* loaded from: input_file:com/abusix/knsq/connection/SubConnection$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubConnection(@NotNull ClientConfig clientConfig, @NotNull HostAndPort hostAndPort, @NotNull Subscription subscription) {
        super(clientConfig, hostAndPort, subscription.getScheduledExecutor$knsq());
        Intrinsics.checkNotNullParameter(clientConfig, "clientConfig");
        Intrinsics.checkNotNullParameter(hostAndPort, "host");
        Intrinsics.checkNotNullParameter(subscription, "subscription");
        this.subscription = subscription;
        this.flushTask = getExecutor().scheduleAtFixedRate(this::delayedFlush, this.subscription.getMaxFlushDelay$knsq().toMillis() / 2, this.subscription.getMaxFlushDelay$knsq().toMillis() / 2, TimeUnit.MILLISECONDS);
    }

    public final int getMaxInFlight() {
        return this.maxInFlight;
    }

    @NotNull
    public final String getTopic() {
        return this.subscription.getTopic();
    }

    @Nullable
    public final Function1<Message, Unit> getOnMessage() {
        return this.onMessage;
    }

    public final void setOnMessage(@Nullable Function1<? super Message, Unit> function1) {
        this.onMessage = function1;
    }

    @Nullable
    public final Function1<Message, Unit> getOnFailedMessage() {
        return this.onFailedMessage;
    }

    public final void setOnFailedMessage(@Nullable Function1<? super Message, Unit> function1) {
        this.onFailedMessage = function1;
    }

    @Override // com.abusix.knsq.connection.Connection
    public synchronized void onIncomingMessage(@NotNull Message message) {
        Intrinsics.checkNotNullParameter(message, "message");
        this.inFlight++;
        if (message.getAttempts() <= this.subscription.getMaxMessageAttempts$knsq()) {
            this.subscription.getHandlerExecutor$knsq().execute(() -> {
                m10onIncomingMessage$lambda1(r1, r2);
            });
        } else {
            this.subscription.getHandlerExecutor$knsq().execute(() -> {
                m9onIncomingMessage$lambda0(r1, r2);
            });
            finish(message.getId());
        }
    }

    public final synchronized void finish(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "id");
        if (isRunning()) {
            try {
                writeCommand("FIN", str);
                this.finishedCount++;
                messageDone();
            } catch (IOException e) {
                stop();
            }
        }
    }

    public final synchronized void requeue(@NotNull String str, @NotNull Duration duration) {
        Intrinsics.checkNotNullParameter(str, "id");
        Intrinsics.checkNotNullParameter(duration, "delay");
        if (isRunning()) {
            try {
                writeCommand("REQ", str, Long.valueOf(duration.toMillis()));
                this.requeuedCount++;
                messageDone();
            } catch (IOException e) {
                stop();
            }
        }
    }

    public static /* synthetic */ void requeue$default(SubConnection subConnection, String str, Duration duration, int i, Object obj) {
        if ((i & 2) != 0) {
            Duration duration2 = Duration.ZERO;
            Intrinsics.checkNotNullExpressionValue(duration2, "ZERO");
            duration = duration2;
        }
        subConnection.requeue(str, duration);
    }

    private final void messageDone() {
        this.inFlight = Math.max(this.inFlight - 1, 0);
        if (this.inFlight != 0 || isRunning()) {
            softFlush();
        } else {
            flush();
        }
    }

    public final synchronized void touch(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "id");
        if (isRunning()) {
            try {
                writeCommand("TOUCH", str);
                softFlush();
            } catch (IOException e) {
                stop();
            }
        }
    }

    private final synchronized void delayedFlush() {
        try {
            if (getUnflushedBytes() <= 0 || !Instant.now().isAfter(getLastActionFlush().plusMillis((this.subscription.getMaxFlushDelay$knsq().toMillis() / 2) + 10))) {
                return;
            }
            flush();
        } catch (Exception e) {
            logger.error("delayedFlush error. {}", stateDesc(), e);
            Function1<Exception, Unit> onException = getOnException();
            if (onException != null) {
                onException.invoke(e);
            }
            stop();
        }
    }

    private final void softFlush() {
        if (getUnflushedBytes() >= this.subscription.getMaxUnflushedBytes$knsq()) {
            flush();
        }
    }

    public final synchronized void setMaxInFlight(int i, boolean z) {
        if (this.maxInFlight == i) {
            return;
        }
        this.maxInFlight = i;
        writeCommand("RDY", Integer.valueOf(i));
        if (z) {
            flush();
        } else {
            getOutput().flush();
            setUnflushedBytes(0);
        }
    }

    public static /* synthetic */ void setMaxInFlight$default(SubConnection subConnection, int i, boolean z, int i2, Object obj) {
        if ((i2 & 2) != 0) {
            z = true;
        }
        subConnection.setMaxInFlight(i, z);
    }

    @Override // com.abusix.knsq.connection.Connection
    public synchronized void connect() {
        super.connect();
        writeCommand("SUB", this.subscription.getTopic(), this.subscription.getChannel());
        flushAndReadOK();
    }

    @Override // com.abusix.knsq.connection.Connection
    public synchronized void stop() {
        this.flushTask.cancel(true);
        try {
            try {
                if (this.inFlight > 0) {
                    setMaxInFlight$default(this, 0, false, 2, null);
                }
                flush();
                try {
                    this.subscription.connectionClosed$knsq(this);
                    super.stop();
                } finally {
                }
            } catch (IOException e) {
                logger.debug("IOException while stopping SubConnection", e);
                try {
                    this.subscription.connectionClosed$knsq(this);
                    super.stop();
                } finally {
                }
            }
        } catch (Throwable th) {
            try {
                this.subscription.connectionClosed$knsq(this);
                super.stop();
                throw th;
            } finally {
                super.stop();
            }
        }
    }

    @NotNull
    public String toString() {
        return "SubConnection: " + getHost() + ' ' + this.subscription.getTopic() + '.' + this.subscription.getChannel();
    }

    @Override // com.abusix.knsq.connection.Connection
    @NotNull
    public synchronized String stateDesc() {
        return super.stateDesc() + " inFlight:" + this.inFlight + " maxInFlight:" + this.maxInFlight + " fin:" + this.finishedCount + " req:" + this.requeuedCount;
    }

    /* renamed from: onIncomingMessage$lambda-0 */
    private static final void m9onIncomingMessage$lambda0(SubConnection subConnection, Message message) {
        Intrinsics.checkNotNullParameter(subConnection, "this$0");
        Intrinsics.checkNotNullParameter(message, "$message");
        try {
            Function1<Message, Unit> onFailedMessage = subConnection.getOnFailedMessage();
            if (onFailedMessage == null) {
                return;
            }
            onFailedMessage.invoke(message);
        } catch (Exception e) {
            logger.error("Exception while processing incoming message", e);
            Function1<Exception, Unit> onException = subConnection.getOnException();
            if (onException == null) {
                return;
            }
            onException.invoke(e);
        }
    }

    /* renamed from: onIncomingMessage$lambda-1 */
    private static final void m10onIncomingMessage$lambda1(SubConnection subConnection, Message message) {
        Intrinsics.checkNotNullParameter(subConnection, "this$0");
        Intrinsics.checkNotNullParameter(message, "$message");
        try {
            Function1<Message, Unit> onMessage = subConnection.getOnMessage();
            if (onMessage == null) {
                return;
            }
            onMessage.invoke(message);
        } catch (Exception e) {
            logger.error("Exception while processing incoming message", e);
            Function1<Exception, Unit> onException = subConnection.getOnException();
            if (onException == null) {
                return;
            }
            onException.invoke(e);
        }
    }
}
