package org.apache.flink.statefun.flink.core.feedback;

import java.util.Iterator;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
import org.apache.flink.statefun.flink.core.common.SerializableFunction;
import org.apache.flink.statefun.flink.core.logger.Loggers;
import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.class */
public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> implements FeedbackConsumer<T>, OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1;
    private final FeedbackKey<T> feedbackKey;
    private final SerializableFunction<T, OptionalLong> isBarrierMessage;
    private final SerializableFunction<T, ?> keySelector;
    private final long totalMemoryUsedForFeedbackCheckpointing;
    private final TypeSerializer<T> elementSerializer;
    private transient Checkpoints<T> checkpoints;
    private transient boolean closedOrDisposed;
    private transient MailboxExecutor mailboxExecutor;
    private transient StreamRecord<T> reusable;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FeedbackUnionOperator(FeedbackKey<T> feedbackKey, SerializableFunction<T, OptionalLong> serializableFunction, SerializableFunction<T, ?> serializableFunction2, long j, TypeSerializer<T> typeSerializer, MailboxExecutor mailboxExecutor, ProcessingTimeService processingTimeService) {
        this.feedbackKey = (FeedbackKey) Objects.requireNonNull(feedbackKey);
        this.isBarrierMessage = (SerializableFunction) Objects.requireNonNull(serializableFunction);
        this.keySelector = (SerializableFunction) Objects.requireNonNull(serializableFunction2);
        this.totalMemoryUsedForFeedbackCheckpointing = j;
        this.elementSerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer);
        this.mailboxExecutor = (MailboxExecutor) Objects.requireNonNull(mailboxExecutor);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        ((AbstractStreamOperator) this).processingTimeService = processingTimeService;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<T> streamRecord) {
        sendDownstream(streamRecord.getValue());
    }

    @Override // org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer
    public void processFeedback(T t) {
        if (this.closedOrDisposed) {
            return;
        }
        OptionalLong apply = this.isBarrierMessage.apply(t);
        if (apply.isPresent()) {
            this.checkpoints.commitCheckpointsUntil(apply.getAsLong());
        } else {
            sendDownstream(t);
            this.checkpoints.append(t);
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        IOManager iOManager = getContainingTask().getEnvironment().getIOManager();
        int maxNumberOfParallelSubtasks = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        this.reusable = new StreamRecord<>((Object) null);
        UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory = Loggers.unboundedSpillableLoggerFactory(iOManager, maxNumberOfParallelSubtasks, this.totalMemoryUsedForFeedbackCheckpointing, this.elementSerializer, this.keySelector);
        unboundedSpillableLoggerFactory.getClass();
        this.checkpoints = new Checkpoints<>(unboundedSpillableLoggerFactory::create);
        UnboundedFeedbackLogger<?> create = unboundedSpillableLoggerFactory.create();
        Iterator<T> it2 = stateInitializationContext.getRawKeyedStateInputs().iterator();
        while (it2.hasNext()) {
            create.replyLoggedEnvelops(((KeyGroupStatePartitionStreamProvider) it2.next()).getStream(), this);
        }
        registerFeedbackConsumer(new MailboxExecutorFacade(this.mailboxExecutor, "Feedback Consumer"));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.checkpoints.startLogging(stateSnapshotContext.getCheckpointId(), stateSnapshotContext.getRawKeyedOperatorStateOutput());
    }

    public void close() throws Exception {
        closeInternally();
        super.close();
    }

    public void dispose() throws Exception {
        closeInternally();
        super.dispose();
    }

    private void closeInternally() {
        IOUtils.closeQuietly(this.checkpoints);
        this.checkpoints = null;
        this.closedOrDisposed = true;
    }

    private void registerFeedbackConsumer(Executor executor) {
        FeedbackChannelBroker.get().getChannel(this.feedbackKey.withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber())).registerConsumer(this, executor);
    }

    private void sendDownstream(T t) {
        this.reusable.replace(t);
        this.output.collect(this.reusable);
    }
}
