package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.shaded.guava18.com.google.common.base.Function;
import org.apache.flink.shaded.guava18.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.class */
public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
    private transient EvictingWindowOperator<K, IN, OUT, W>.EvictorContext evictorContext;
    private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator$EvictorContext.class */
    public class EvictorContext implements Evictor.EvictorContext {
        protected K key;
        protected W window;

        public EvictorContext(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentProcessingTime() {
            return EvictingWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentWatermark() {
            return EvictingWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public MetricGroup getMetricGroup() {
            return EvictingWindowOperator.this.getMetricGroup();
        }

        public K getKey() {
            return this.key;
        }

        void evictBefore(Iterable<TimestampedValue<IN>> iterable, int i) {
            EvictingWindowOperator.this.evictor.evictBefore(iterable, i, this.window, this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> iterable, int i) {
            EvictingWindowOperator.this.evictor.evictAfter(iterable, i, this.window, this);
        }
    }

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> stateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long j, OutputTag<IN> outputTag) {
        super(windowAssigner, typeSerializer, keySelector, typeSerializer2, null, internalWindowFunction, trigger, j, outputTag);
        this.evictor = (Evictor) Preconditions.checkNotNull(evictor);
        this.evictingWindowStateDescriptor = (StateDescriptor) Preconditions.checkNotNull(stateDescriptor);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        boolean z = true;
        final K currentKey = getKeyedStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet mergingWindowSet = getMergingWindowSet();
            for (W w : assignWindows) {
                W w2 = (W) mergingWindowSet.addWindow(w, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.1
                    public void merge(W w3, Collection<W> collection, W w4, Collection<W> collection2) throws Exception {
                        if (EvictingWindowOperator.this.windowAssigner.isEventTime() && w3.maxTimestamp() + EvictingWindowOperator.this.allowedLateness <= EvictingWindowOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + EvictingWindowOperator.this.internalTimerService.currentWatermark() + " window: " + w3);
                        }
                        if (!EvictingWindowOperator.this.windowAssigner.isEventTime() && w3.maxTimestamp() <= EvictingWindowOperator.this.internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + EvictingWindowOperator.this.internalTimerService.currentProcessingTime() + " window: " + w3);
                        }
                        EvictingWindowOperator.this.triggerContext.key = (K) currentKey;
                        EvictingWindowOperator.this.triggerContext.window = w3;
                        EvictingWindowOperator.this.triggerContext.onMerge(collection);
                        for (W w5 : collection) {
                            EvictingWindowOperator.this.triggerContext.window = w5;
                            EvictingWindowOperator.this.triggerContext.clear();
                            EvictingWindowOperator.this.deleteCleanupTimer(w5);
                        }
                        EvictingWindowOperator.this.evictingWindowState.mergeNamespaces(w4, collection2);
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                    public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
                        merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
                    }
                });
                if (isWindowLate(w2)) {
                    mergingWindowSet.retireWindow(w2);
                } else {
                    z = false;
                    Window stateWindow = mergingWindowSet.getStateWindow(w2);
                    if (stateWindow == null) {
                        throw new IllegalStateException("Window " + w + " is not in in-flight window set.");
                    }
                    this.evictingWindowState.setCurrentNamespace(stateWindow);
                    this.evictingWindowState.add(streamRecord);
                    this.triggerContext.key = currentKey;
                    this.triggerContext.window = w2;
                    this.evictorContext.key = currentKey;
                    this.evictorContext.window = w2;
                    TriggerResult onElement = this.triggerContext.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Iterable iterable = this.evictingWindowState.get();
                        if (iterable != null) {
                            emitWindowContents(w2, iterable, this.evictingWindowState);
                        }
                    }
                    if (onElement.isPurge()) {
                        this.evictingWindowState.clear();
                    }
                    registerCleanupTimer(w2);
                }
            }
            mergingWindowSet.persist();
        } else {
            for (W w3 : assignWindows) {
                if (!isWindowLate(w3)) {
                    z = false;
                    this.evictingWindowState.setCurrentNamespace(w3);
                    this.evictingWindowState.add(streamRecord);
                    this.triggerContext.key = currentKey;
                    this.triggerContext.window = w3;
                    this.evictorContext.key = currentKey;
                    this.evictorContext.window = w3;
                    TriggerResult onElement2 = this.triggerContext.onElement(streamRecord);
                    if (onElement2.isFire()) {
                        Iterable iterable2 = this.evictingWindowState.get();
                        if (iterable2 != null) {
                            emitWindowContents(w3, iterable2, this.evictingWindowState);
                        }
                    }
                    if (onElement2.isPurge()) {
                        this.evictingWindowState.clear();
                    }
                    registerCleanupTimer(w3);
                }
            }
        }
        if (z && isElementLate(streamRecord)) {
            if (this.lateDataOutputTag != null) {
                sideOutput(streamRecord);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        Iterable<StreamRecord<IN>> iterable;
        this.triggerContext.key = internalTimer.getKey();
        this.triggerContext.window = internalTimer.getNamespace();
        this.evictorContext.key = internalTimer.getKey();
        this.evictorContext.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            } else {
                this.evictingWindowState.setCurrentNamespace(stateWindow);
            }
        } else {
            this.evictingWindowState.setCurrentNamespace(this.triggerContext.window);
        }
        TriggerResult onEventTime = this.triggerContext.onEventTime(internalTimer.getTimestamp());
        if (onEventTime.isFire() && (iterable = (Iterable) this.evictingWindowState.get()) != null) {
            emitWindowContents(this.triggerContext.window, iterable, this.evictingWindowState);
        }
        if (onEventTime.isPurge()) {
            this.evictingWindowState.clear();
        }
        if (this.windowAssigner.isEventTime() && isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
            clearAllState((EvictingWindowOperator<K, IN, OUT, W>) this.triggerContext.window, (ListState) this.evictingWindowState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        Iterable<StreamRecord<IN>> iterable;
        this.triggerContext.key = internalTimer.getKey();
        this.triggerContext.window = internalTimer.getNamespace();
        this.evictorContext.key = internalTimer.getKey();
        this.evictorContext.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            } else {
                this.evictingWindowState.setCurrentNamespace(stateWindow);
            }
        } else {
            this.evictingWindowState.setCurrentNamespace(this.triggerContext.window);
        }
        TriggerResult onProcessingTime = this.triggerContext.onProcessingTime(internalTimer.getTimestamp());
        if (onProcessingTime.isFire() && (iterable = (Iterable) this.evictingWindowState.get()) != null) {
            emitWindowContents(this.triggerContext.window, iterable, this.evictingWindowState);
        }
        if (onProcessingTime.isPurge()) {
            this.evictingWindowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
            clearAllState((EvictingWindowOperator<K, IN, OUT, W>) this.triggerContext.window, (ListState) this.evictingWindowState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    private void emitWindowContents(W w, Iterable<StreamRecord<IN>> iterable, ListState<StreamRecord<IN>> listState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        FluentIterable transform = FluentIterable.from(iterable).transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.2
            @Override // org.apache.flink.shaded.guava18.com.google.common.base.Function
            public TimestampedValue<IN> apply(StreamRecord<IN> streamRecord) {
                return TimestampedValue.from(streamRecord);
            }
        });
        this.evictorContext.evictBefore(transform, Iterables.size(transform));
        FluentIterable transform2 = transform.transform(new Function<TimestampedValue<IN>, IN>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.3
            @Override // org.apache.flink.shaded.guava18.com.google.common.base.Function
            public IN apply(TimestampedValue<IN> timestampedValue) {
                return timestampedValue.getValue();
            }
        });
        this.processContext.window = (W) this.triggerContext.window;
        ((InternalWindowFunction) this.userFunction).process(this.triggerContext.key, this.triggerContext.window, this.processContext, transform2, this.timestampedCollector);
        this.evictorContext.evictAfter(transform, Iterables.size(transform));
        listState.clear();
        Iterator<E> it = transform.iterator();
        while (it.hasNext()) {
            listState.add(((TimestampedValue) it.next()).getStreamRecord());
        }
    }

    private void clearAllState(W w, ListState<StreamRecord<IN>> listState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        listState.clear();
        this.triggerContext.clear();
        this.processContext.window = w;
        this.processContext.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
            mergingWindowSet.persist();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(null, null);
        this.evictingWindowState = (InternalListState) getOrCreateKeyedState(this.windowSerializer, this.evictingWindowStateDescriptor);
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.util.Disposable
    public void dispose() throws Exception {
        super.dispose();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.evictingWindowStateDescriptor;
    }
}
