package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperator.class */
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable {
    private static final long serialVersionUID = 1;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    private transient StreamTask<?, ?> container;
    private transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    private transient KeySelector<?, ?> stateKeySelector1;
    private transient KeySelector<?, ?> stateKeySelector2;
    protected MetricGroup metrics;
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private AbstractStateBackend stateBackend = null;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperator$CountingOutput.class */
    public class CountingOutput implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            this.output.emitWatermark(watermark);
        }

        public void collect(StreamRecord<OUT> streamRecord) {
            this.numRecordsOut.inc();
            this.output.collect(streamRecord);
        }

        public void close() {
            this.output.close();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        this.container = streamTask;
        this.config = streamConfig;
        this.metrics = this.container.getEnvironment().getMetricGroup().addOperator(streamConfig.getOperatorName());
        this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
        this.runtimeContext = new StreamingRuntimeContext(this, this.container.getEnvironment(), this.container.getAccumulatorMap());
        this.stateKeySelector1 = streamConfig.getStatePartitioner(0, getUserCodeClassloader());
        this.stateKeySelector2 = streamConfig.getStatePartitioner(1, getUserCodeClassloader());
        try {
            this.stateBackend = this.container.createStateBackend(getClass().getSimpleName() + "_" + streamConfig.getVertexID() + "_" + this.runtimeContext.getIndexOfThisSubtask(), streamConfig.getStateKeySerializer(getUserCodeClassloader()));
        } catch (Exception e) {
            throw new RuntimeException("Could not initialize state backend. ", e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public MetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() {
        if (this.stateBackend != null) {
            try {
                this.stateBackend.close();
                this.stateBackend.dispose();
            } catch (Exception e) {
                throw new RuntimeException("Error while closing/disposing state backend.", e);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState streamTaskState = new StreamTaskState();
        if (this.stateBackend != null) {
            try {
                HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState = this.stateBackend.snapshotPartitionedState(j, j2);
                if (snapshotPartitionedState != null) {
                    streamTaskState.setKvStates(snapshotPartitionedState);
                }
            } catch (Exception e) {
                throw new Exception("Failed to snapshot partitioned state for operator " + getOperatorName() + '.', e);
            }
        }
        return streamTaskState;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        if (this.stateBackend != null) {
            this.stateBackend.injectKeyValueStateSnapshots(streamTaskState.getKvStates());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void notifyOfCompletedCheckpoint(long j) throws Exception {
        if (this.stateBackend != null) {
            this.stateBackend.notifyOfCompletedCheckpoint(j);
        }
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getOperatorName() {
        return this.runtimeContext != null ? this.runtimeContext.getTaskNameWithSubtasks() : getClass().getSimpleName();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public AbstractStateBackend getStateBackend() {
        return this.stateBackend;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ScheduledFuture<?> registerTimer(long j, Triggerable triggerable) {
        return this.container.registerTimer(j, triggerable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getCurrentProcessingTime() {
        return this.container.getCurrentProcessingTime();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getStateBackend().getPartitionedState((Object) null, VoidSerializer.INSTANCE, stateDescriptor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <S extends State, N> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getStateBackend().getPartitionedState(n, typeSerializer, stateDescriptor);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement1(StreamRecord streamRecord) throws Exception {
        if (this.stateKeySelector1 != null) {
            getStateBackend().setCurrentKey(this.stateKeySelector1.getKey(streamRecord.getValue()));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement2(StreamRecord streamRecord) throws Exception {
        if (this.stateKeySelector2 != null) {
            getStateBackend().setCurrentKey(this.stateKeySelector2.getKey(streamRecord.getValue()));
        }
    }

    public void setKeyContext(Object obj) {
        if (this.stateKeySelector1 != null) {
            this.stateBackend.setCurrentKey(obj);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public final void setChainingStrategy(ChainingStrategy chainingStrategy) {
        this.chainingStrategy = chainingStrategy;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public final ChainingStrategy getChainingStrategy() {
        return this.chainingStrategy;
    }
}
