package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.collect.HashMultiset;
import org.apache.flink.shaded.com.google.common.collect.Multiset;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.class */
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected TypeSerializer<IN> inputSerializer;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
    protected transient long currentWatermark;
    protected transient WindowOperator<K, IN, ACC, OUT, W>.Context context;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
    protected transient Set<Timer<K, W>> processingTimeTimers;
    protected transient Multiset<Long> processingTimeTimerTimestamps;
    protected transient Set<Timer<K, W>> watermarkTimers;
    protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
    protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Context.class */
    public class Context implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;

        public Context(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.currentWatermark;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            Objects.requireNonNull(cls, "The state type class must not be null");
            try {
                return getKeyValueState(str, (TypeInformation<TypeInformation<S>>) TypeExtractor.getForClass(cls), (TypeInformation<S>) s);
            } catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + cls.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            Objects.requireNonNull(str, "The name of the state must not be null");
            Objects.requireNonNull(typeInformation, "The state type information must not be null");
            return getPartitionedState(new ValueStateDescriptor(str, typeInformation.createSerializer(WindowOperator.this.getExecutionConfig()), s));
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.size() <= 0) {
                return;
            }
            try {
                WindowOperator.this.getStateBackend().mergePartitionedStates(this.window, this.mergedWindows, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return WindowOperator.this.getCurrentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            Timer<K, W> timer = new Timer<>(j, this.key, this.window);
            if (WindowOperator.this.processingTimeTimers.add(timer)) {
                WindowOperator.this.processingTimeTimersQueue.add(timer);
                if (WindowOperator.this.processingTimeTimerTimestamps.add(Long.valueOf(j), 1) == 0) {
                    WindowOperator.this.processingTimeTimerFutures.put(Long.valueOf(j), WindowOperator.this.registerTimer(j, WindowOperator.this));
                }
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            Timer<K, W> timer = new Timer<>(j, this.key, this.window);
            if (WindowOperator.this.watermarkTimers.add(timer)) {
                WindowOperator.this.watermarkTimersQueue.add(timer);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            ScheduledFuture<?> remove;
            Timer timer = new Timer(j, this.key, this.window);
            if (WindowOperator.this.processingTimeTimers.remove(timer)) {
                WindowOperator.this.processingTimeTimersQueue.remove(timer);
            }
            if (WindowOperator.this.processingTimeTimerTimestamps.remove(Long.valueOf(j), 1) != 1 || (remove = WindowOperator.this.processingTimeTimerFutures.remove(Long.valueOf(timer.timestamp))) == null || remove.isDone()) {
                return;
            }
            remove.cancel(false);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            Timer timer = new Timer(j, this.key, this.window);
            if (WindowOperator.this.watermarkTimers.remove(timer)) {
                WindowOperator.this.watermarkTimersQueue.remove(timer);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            return WindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public TriggerResult onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window, this);
        }

        public TriggerResult onMerge(Collection<W> collection) throws Exception {
            this.mergedWindows = collection;
            return WindowOperator.this.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Timer.class */
    public static class Timer<K, W extends Window> implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long j, K k, W w) {
            this.timestamp = j;
            this.key = k;
            this.window = w;
        }

        @Override // java.lang.Comparable
        public int compareTo(Timer<K, W> timer) {
            return Long.compare(this.timestamp, timer.timestamp);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Timer timer = (Timer) obj;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.timestamp ^ (this.timestamp >>> 32)))) + this.key.hashCode())) + this.window.hashCode();
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends AppendingState<IN, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, long j) {
        super(internalWindowFunction);
        this.currentWatermark = Long.MIN_VALUE;
        this.context = new Context(null, null);
        this.windowAssigner = (WindowAssigner) Objects.requireNonNull(windowAssigner);
        this.windowSerializer = typeSerializer;
        this.keySelector = (KeySelector) Objects.requireNonNull(keySelector);
        this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer2);
        this.windowStateDescriptor = stateDescriptor;
        this.trigger = (Trigger) Objects.requireNonNull(trigger);
        Preconditions.checkArgument(j >= 0);
        this.allowedLateness = j;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.inputSerializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashSet();
            this.watermarkTimersQueue = new PriorityQueue<>(100);
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashSet();
            this.processingTimeTimerTimestamps = HashMultiset.create();
            this.processingTimeTimersQueue = new PriorityQueue<>(100);
        }
        this.processingTimeTimerFutures = new HashMap();
        this.context = new Context(null, null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.1
            @Override // org.apache.flink.streaming.api.windowing.assigners.WindowAssigner.WindowAssignerContext
            public long getCurrentProcessingTime() {
                return WindowOperator.this.getCurrentProcessingTime();
            }
        };
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            this.mergingWindowsByKey = new HashMap();
        }
        this.currentWatermark = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.watermarkTimers = null;
        this.watermarkTimersQueue = null;
        this.processingTimeTimers = null;
        this.processingTimeTimersQueue = null;
        this.context = null;
        this.windowAssignerContext = null;
        this.mergingWindowsByKey = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() {
        super.dispose();
        this.timestampedCollector = null;
        this.watermarkTimers = null;
        this.watermarkTimersQueue = null;
        this.processingTimeTimers = null;
        this.processingTimeTimersQueue = null;
        this.context = null;
        this.windowAssignerContext = null;
        this.mergingWindowsByKey = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        final K k = (K) getStateBackend().getCurrentKey();
        if (!(this.windowAssigner instanceof MergingWindowAssigner)) {
            for (W w : assignWindows) {
                if (!isLate(w)) {
                    AppendingState partitionedState = getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
                    partitionedState.add(streamRecord.getValue());
                    this.context.key = k;
                    this.context.window = w;
                    TriggerResult onElement = this.context.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Object obj = partitionedState.get();
                        if (obj != null) {
                            fire(w, obj);
                        }
                    }
                    if (onElement.isPurge()) {
                        cleanup(w, partitionedState, null);
                    } else {
                        registerCleanupTimer(w);
                    }
                }
            }
            return;
        }
        MergingWindowSet mergingWindowSet = getMergingWindowSet();
        for (W w2 : assignWindows) {
            final Tuple1 tuple1 = new Tuple1(TriggerResult.CONTINUE);
            W w3 = (W) mergingWindowSet.addWindow(w2, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.2
                public void merge(W w4, Collection<W> collection, W w5, Collection<W> collection2) throws Exception {
                    WindowOperator.this.context.key = (K) k;
                    WindowOperator.this.context.window = w4;
                    tuple1.f0 = WindowOperator.this.context.onMerge(collection);
                    for (W w6 : collection) {
                        WindowOperator.this.context.window = w6;
                        WindowOperator.this.context.clear();
                        WindowOperator.this.deleteCleanupTimer(w6);
                    }
                    WindowOperator.this.getStateBackend().mergePartitionedStates(w5, collection2, WindowOperator.this.windowSerializer, WindowOperator.this.windowStateDescriptor);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                public /* bridge */ /* synthetic */ void merge(Object obj2, Collection collection, Object obj3, Collection collection2) throws Exception {
                    merge((Collection) obj2, (Collection<Collection>) collection, (Collection) obj3, (Collection<Collection>) collection2);
                }
            });
            if (isLate(w3)) {
                mergingWindowSet.retireWindow(w3);
            } else {
                Window stateWindow = mergingWindowSet.getStateWindow(w3);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + w2 + " is not in in-flight window set.");
                }
                AppendingState partitionedState2 = getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                partitionedState2.add(streamRecord.getValue());
                this.context.key = k;
                this.context.window = w3;
                TriggerResult merge = TriggerResult.merge(this.context.onElement(streamRecord), (TriggerResult) tuple1.f0);
                if (merge.isFire()) {
                    Object obj2 = partitionedState2.get();
                    if (obj2 != null) {
                        fire(w3, obj2);
                    }
                }
                if (merge.isPurge()) {
                    cleanup(w3, partitionedState2, mergingWindowSet);
                } else {
                    registerCleanupTimer(w3);
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processWatermark(Watermark watermark) throws Exception {
        boolean z;
        AppendingState partitionedState;
        do {
            Timer<K, W> peek = this.watermarkTimersQueue.peek();
            if (peek == null || peek.timestamp > watermark.getTimestamp()) {
                z = false;
            } else {
                z = true;
                this.watermarkTimers.remove(peek);
                this.watermarkTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                MergingWindowSet mergingWindowSet = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindowSet = getMergingWindowSet();
                    Window stateWindow = mergingWindowSet.getStateWindow(this.context.window);
                    if (stateWindow != null) {
                        partitionedState = (AppendingState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                    }
                } else {
                    partitionedState = getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Object obj = partitionedState.get();
                if (obj != null) {
                    TriggerResult onEventTime = this.context.onEventTime(peek.timestamp);
                    if (onEventTime.isFire()) {
                        fire(this.context.window, obj);
                    }
                    if (onEventTime.isPurge() || (this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, peek.timestamp))) {
                        cleanup(this.context.window, partitionedState, mergingWindowSet);
                    }
                }
            }
        } while (z);
        this.output.emitWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void trigger(long j) throws Exception {
        boolean z;
        AppendingState partitionedState;
        this.processingTimeTimerFutures.remove(Long.valueOf(j));
        this.processingTimeTimerTimestamps.remove(Long.valueOf(j), this.processingTimeTimerTimestamps.count(Long.valueOf(j)));
        do {
            Timer<K, W> peek = this.processingTimeTimersQueue.peek();
            if (peek == null || peek.timestamp > j) {
                z = false;
            } else {
                z = true;
                this.processingTimeTimers.remove(peek);
                this.processingTimeTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                MergingWindowSet mergingWindowSet = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindowSet = getMergingWindowSet();
                    Window stateWindow = mergingWindowSet.getStateWindow(this.context.window);
                    if (stateWindow != null) {
                        partitionedState = (AppendingState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                    }
                } else {
                    partitionedState = getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Object obj = partitionedState.get();
                if (obj != null) {
                    TriggerResult onProcessingTime = this.context.onProcessingTime(peek.timestamp);
                    if (onProcessingTime.isFire()) {
                        fire(this.context.window, obj);
                    }
                    if (onProcessingTime.isPurge() || (!this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, peek.timestamp))) {
                        cleanup(this.context.window, partitionedState, mergingWindowSet);
                    }
                }
            }
        } while (z);
    }

    private void cleanup(W w, AppendingState<IN, ACC> appendingState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        appendingState.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
        }
        this.context.clear();
    }

    private void fire(W w, ACC acc) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, acc, this.timestampedCollector);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public MergingWindowSet<W> getMergingWindowSet() throws Exception {
        MergingWindowSet<W> mergingWindowSet = this.mergingWindowsByKey.get(getStateBackend().getCurrentKey());
        if (mergingWindowSet == null) {
            ListState partitionedState = getStateBackend().getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ListStateDescriptor("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer})));
            mergingWindowSet = new MergingWindowSet<>((MergingWindowAssigner) this.windowAssigner, partitionedState);
            partitionedState.clear();
            this.mergingWindowsByKey.put(getStateBackend().getCurrentKey(), mergingWindowSet);
        }
        return mergingWindowSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isLate(W w) {
        return this.windowAssigner.isEventTime() && cleanupTime(w) <= this.currentWatermark;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (this.windowAssigner.isEventTime()) {
            this.context.registerEventTimeTimer(cleanupTime);
        } else {
            this.context.registerProcessingTimeTimer(cleanupTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (this.windowAssigner.isEventTime()) {
            this.context.deleteEventTimeTimer(cleanupTime);
        } else {
            this.context.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W w) {
        long maxTimestamp = w.maxTimestamp() + this.allowedLateness;
        if (maxTimestamp >= w.maxTimestamp()) {
            return maxTimestamp;
        }
        return Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isCleanupTime(W w, long j) {
        return cleanupTime(w) == j;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        if (this.mergingWindowsByKey != null) {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer}));
            for (Map.Entry<K, MergingWindowSet<W>> entry : this.mergingWindowsByKey.entrySet()) {
                setKeyContext(entry.getKey());
                ListState<Tuple2<W, W>> listState = (ListState) getStateBackend().getPartitionedState((Object) null, VoidSerializer.INSTANCE, listStateDescriptor);
                listState.clear();
                entry.getValue().persist(listState);
            }
        }
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        try {
            AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
            try {
                snapshotTimers(createCheckpointStateOutputView);
                try {
                    snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
                    return snapshotOperatorState;
                } catch (Exception e) {
                    try {
                        snapshotOperatorState.discardState();
                    } catch (Exception e2) {
                        LOG.warn("Could not discard stream task state of {}.", getOperatorName(), e2);
                    }
                    throw new Exception("Could not close and get state handle from checkpoint output view of " + getOperatorName() + '.', e);
                }
            } catch (Exception e3) {
                try {
                    snapshotOperatorState.discardState();
                } catch (Exception e4) {
                    LOG.warn("Could not discard stream task state of {}.", getOperatorName(), e4);
                }
                try {
                    createCheckpointStateOutputView.close();
                } catch (Exception e5) {
                    LOG.warn("Could not close the checkpoint state output view of {}. The written data might not be deleted.", getOperatorName(), e5);
                }
                throw new Exception("Could not snapshot the window operators timers of " + getOperatorName() + '.', e3);
            }
        } catch (Exception e6) {
            try {
                snapshotOperatorState.discardState();
            } catch (Exception e7) {
                LOG.warn("Could not discard stream task state of {}.", getOperatorName(), e7);
            }
            throw new Exception("Could not create checkpoint state output view for " + getOperatorName() + '.', e6);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        restoreTimers((DataInputView) streamTaskState.getOperatorState().getState(getUserCodeClassloader()));
    }

    private void restoreTimers(DataInputView dataInputView) throws IOException {
        int readInt = dataInputView.readInt();
        this.watermarkTimers = new HashSet(readInt);
        this.watermarkTimersQueue = new PriorityQueue<>(Math.max(readInt, 1));
        for (int i = 0; i < readInt; i++) {
            Timer<K, W> timer = new Timer<>(dataInputView.readLong(), this.keySerializer.deserialize(dataInputView), (Window) this.windowSerializer.deserialize(dataInputView));
            this.watermarkTimers.add(timer);
            this.watermarkTimersQueue.add(timer);
        }
        int readInt2 = dataInputView.readInt();
        this.processingTimeTimersQueue = new PriorityQueue<>(Math.max(readInt2, 1));
        this.processingTimeTimers = new HashSet();
        for (int i2 = 0; i2 < readInt2; i2++) {
            Timer<K, W> timer2 = new Timer<>(dataInputView.readLong(), this.keySerializer.deserialize(dataInputView), (Window) this.windowSerializer.deserialize(dataInputView));
            this.processingTimeTimersQueue.add(timer2);
            this.processingTimeTimers.add(timer2);
        }
        int readInt3 = dataInputView.readInt();
        this.processingTimeTimerTimestamps = HashMultiset.create();
        for (int i3 = 0; i3 < readInt3; i3++) {
            this.processingTimeTimerTimestamps.add(Long.valueOf(dataInputView.readLong()), dataInputView.readInt());
        }
    }

    private void snapshotTimers(DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(this.watermarkTimersQueue.size());
        Iterator<Timer<K, W>> it = this.watermarkTimersQueue.iterator();
        while (it.hasNext()) {
            Timer<K, W> next = it.next();
            this.keySerializer.serialize(next.key, dataOutputView);
            this.windowSerializer.serialize(next.window, dataOutputView);
            dataOutputView.writeLong(next.timestamp);
        }
        dataOutputView.writeInt(this.processingTimeTimers.size());
        for (Timer<K, W> timer : this.processingTimeTimers) {
            this.keySerializer.serialize(timer.key, dataOutputView);
            this.windowSerializer.serialize(timer.window, dataOutputView);
            dataOutputView.writeLong(timer.timestamp);
        }
        dataOutputView.writeInt(this.processingTimeTimerTimestamps.entrySet().size());
        for (Multiset.Entry<Long> entry : this.processingTimeTimerTimestamps.entrySet()) {
            dataOutputView.writeLong(entry.getElement().longValue());
            dataOutputView.writeInt(entry.getCount());
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
