package software.chronicle.enterprise.queue;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.EngineWireNetworkContext;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.MessageAdaptor;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.network.cluster.AbstractSubHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@UsedViaReflection
/* loaded from: input_file:WEB-INF/lib/chronicle-queue-enterprise-1.3.19.jar:software/chronicle/enterprise/queue/QueueSyncReplicationHandler.class */
public class QueueSyncReplicationHandler extends AbstractSubHandler<EngineWireNetworkContext> implements Demarshallable, WriteMarshallable {
    private static final Logger LOG;
    private final Class topicType;
    private final Class elementType;
    private final boolean acknowledgement;
    private Asset rootAsset;
    private ExcerptAppender appender;
    private ChronicleQueueView chronicleQueueView;
    private ChronicleQueue chronicleQueue;
    private WireType wireType;
    private Wire wire;

    @Nullable
    private MessageAdaptor messageAdaptor;
    private long lastNanoTimeStamp;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final QueueReplicationEvent queueReplicationEvent = new QueueReplicationEvent();
    long second = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
    long messagesReadPerSecond = 0;
    volatile long nextIndexToWrite = 0;
    long lastIndex = -1;

    @UsedViaReflection
    private QueueSyncReplicationHandler(@NotNull WireIn wireIn) {
        this.messageAdaptor = null;
        this.topicType = wireIn.read(() -> {
            return "topicType";
        }).typeLiteral();
        this.elementType = wireIn.read(() -> {
            return "elementType";
        }).typeLiteral();
        this.acknowledgement = wireIn.read(() -> {
            return "acknowledgement";
        }).bool();
        this.messageAdaptor = (MessageAdaptor) wireIn.read(() -> {
            return "messageAdaptor";
        }).typedMarshallable();
        this.wireType = (WireType) wireIn.read(() -> {
            return "wireType";
        }).asEnum(WireType.class);
        this.wire = this.wireType.apply(this.queueReplicationEvent.payload());
    }

    @UsedViaReflection
    public QueueSyncReplicationHandler(@NotNull Class cls, @NotNull Class cls2, boolean z, @Nullable MessageAdaptor messageAdaptor, @NotNull WireType wireType) {
        this.messageAdaptor = null;
        Objects.requireNonNull(cls);
        Objects.requireNonNull(cls2);
        Objects.requireNonNull(wireType);
        this.topicType = cls;
        this.elementType = cls2;
        this.acknowledgement = z;
        this.messageAdaptor = messageAdaptor;
        this.wireType = wireType;
    }

    @Override // net.openhft.chronicle.wire.WriteMarshallable
    public void writeMarshallable(@NotNull WireOut wireOut) {
        wireOut.write(() -> {
            return "topicType";
        }).typeLiteral(this.topicType);
        wireOut.write(() -> {
            return "elementType";
        }).typeLiteral(this.elementType);
        wireOut.write(() -> {
            return "acknowledgement";
        }).bool(Boolean.valueOf(this.acknowledgement));
        wireOut.write(() -> {
            return "messageAdaptor";
        }).typedMarshallable(this.messageAdaptor);
        wireOut.write(() -> {
            return "wireType";
        }).asEnum(this.wireType);
    }

    @Override // net.openhft.chronicle.network.api.session.SubHandler
    public void onInitialize(@NotNull WireOut wireOut) {
        this.rootAsset = nc().rootAsset();
        Asset acquireAsset = this.rootAsset.acquireAsset(csp());
        this.chronicleQueueView = (ChronicleQueueView) acquireAsset.acquireView(QueueView.class, RequestContext.requestContext(acquireAsset.fullName()).topicType(this.topicType).elementType(this.elementType));
        this.chronicleQueue = this.chronicleQueueView.chronicleQueue();
        if (this.chronicleQueue.wireType() != this.wireType) {
            throw new IllegalStateException("Incompatible Wire Types: For queue replication the source and sync wire types have to be the same, sync wireTpe=" + this.chronicleQueue.wireType() + ", source wireType=" + this.wireType);
        }
        ExcerptTailer end = this.chronicleQueue.createTailer().toEnd();
        this.appender = this.chronicleQueue.acquireAppender();
        this.nextIndexToWrite = end.index();
        if (this.messageAdaptor != null) {
            this.messageAdaptor.onInitialize(acquireAsset);
        }
        wireOut.writeDocument(true, wireOut2 -> {
            wireOut2.writeEventName(CoreFields.csp).text(csp()).writeEventName(CoreFields.cid).int64(cid()).writeEventName(CoreFields.handler).typedMarshallable(ChronicleQueueView.newSource(this.nextIndexToWrite, this.topicType, this.elementType, this.acknowledgement, this.messageAdaptor));
        });
    }

    @Override // net.openhft.chronicle.network.cluster.AbstractSubHandler, net.openhft.chronicle.network.api.session.SubHandler
    public void onRead(@NotNull WireIn wireIn, @NotNull WireOut wireOut) {
        if (!$assertionsDisabled && wireIn.bytes().isEmpty()) {
            throw new AssertionError();
        }
        StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
        wireIn.readEventName(acquireStringBuilder);
        long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        if (seconds >= this.second + 10) {
            this.second = seconds;
            LOG.info("replicationEvents read per second=" + (this.messagesReadPerSecond / 10));
            this.messagesReadPerSecond = 0L;
        }
        this.messagesReadPerSecond++;
        if (!"re".contentEquals(acquireStringBuilder)) {
            Jvm.warn().on(getClass(), "", new IllegalStateException("unsupported eventName=" + ((Object) acquireStringBuilder)));
            return;
        }
        QueueReplicationEvent queueReplicationEvent = this.queueReplicationEvent;
        this.queueReplicationEvent.readMarshallable(wireIn);
        try {
            long index = queueReplicationEvent.index();
            Bytes payload = queueReplicationEvent.payload();
            if (this.messageAdaptor == null) {
                ((SingleChronicleQueueExcerpts.InternalAppender) this.appender).writeBytes(index, payload);
            } else {
                Bytes<?> bytes = this.wire.bytes();
                long readPosition = bytes.readPosition();
                DocumentContext writingDocument = this.appender.writingDocument(index);
                Throwable th = null;
                try {
                    try {
                        Bytes<?> bytes2 = writingDocument.wire().bytes();
                        long writePosition = bytes2.writePosition();
                        boolean z = false;
                        try {
                            z = this.messageAdaptor.apply(this.wire, writingDocument.wire()).booleanValue();
                        } catch (Throwable th2) {
                            Jvm.warn().on(getClass(), "inWire=" + wireIn.readingPeekYaml(), th2);
                        }
                        if (!z) {
                            bytes2.writePosition(writePosition);
                            bytes.readPosition(readPosition);
                            bytes2.write((BytesStore) payload);
                        }
                        if (writingDocument != null) {
                            if (0 != 0) {
                                try {
                                    writingDocument.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                writingDocument.close();
                            }
                        }
                    } catch (Throwable th4) {
                        th = th4;
                        throw th4;
                    }
                } catch (Throwable th5) {
                    if (writingDocument != null) {
                        if (th != null) {
                            try {
                                writingDocument.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            writingDocument.close();
                        }
                    }
                    throw th5;
                }
            }
            this.nextIndexToWrite = index;
            if (this.acknowledgement) {
                this.lastIndex = index;
                this.lastNanoTimeStamp = queueReplicationEvent.nanoTimeStamp;
            }
        } catch (IllegalStateException e) {
            LOG.info("Skipping message " + wireIn.readingPeekYaml() + e);
        } catch (Exception e2) {
            Jvm.warn().on(getClass(), "inWire=" + wireIn.readingPeekYaml(), e2);
        }
    }

    @Override // net.openhft.chronicle.network.api.session.SubHandler
    public void onWrite(@NotNull WireOut wireOut) {
        if (this.lastIndex >= 0) {
            acknowledgeIndex(wireOut, this.lastIndex, this.lastNanoTimeStamp);
        }
        this.lastIndex = -1L;
    }

    private void acknowledgeIndex(@NotNull WireOut wireOut, long j, long j2) {
        Throwable th;
        DocumentContext writingDocument = wireOut.writingDocument(true);
        Throwable th2 = null;
        try {
            try {
                writingDocument.wire().write(CoreFields.cid).int64(cid());
                if (writingDocument != null) {
                    if (0 != 0) {
                        try {
                            writingDocument.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        writingDocument.close();
                    }
                }
                writingDocument = wireOut.writingDocument(false);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    writingDocument.wire().write("idx").int64_0x(j);
                    writingDocument.wire().write("ns").int64(j2);
                    if (writingDocument != null) {
                        if (0 == 0) {
                            writingDocument.close();
                            return;
                        }
                        try {
                            writingDocument.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    static {
        $assertionsDisabled = !QueueSyncReplicationHandler.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) QueueSyncReplicationHandler.class);
        ClassAliasPool.CLASS_ALIASES.addAlias(QueueSyncReplicationHandler.class);
    }
}
