package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({AbstractStreamOperator.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.class */
public class WindowOperatorTest {
    private static AtomicInteger closeCalled = new AtomicInteger(0);

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$EventTimeTriggerAccumGC.class */
    public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1;
        private long cleanupTime;

        private EventTimeTriggerAccumGC() {
            this.cleanupTime = 0L;
        }

        public EventTimeTriggerAccumGC(long j) {
            this.cleanupTime = j;
        }

        public TriggerResult onElement(Object obj, long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
                return TriggerResult.FIRE;
            }
            triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onEventTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) {
            return (j == timeWindow.maxTimestamp() || j == timeWindow.maxTimestamp() + this.cleanupTime) ? TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
        }

        public TriggerResult onProcessingTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        public void clear(TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
        }

        public boolean canMerge() {
            return true;
        }

        public TriggerResult onMerge(TimeWindow timeWindow, Trigger.OnMergeContext onMergeContext) {
            onMergeContext.registerEventTimeTimer(timeWindow.maxTimestamp());
            return TriggerResult.CONTINUE;
        }

        public String toString() {
            return "EventTimeTrigger()";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PassThroughFunction.class */
    private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private PassThroughFunction() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PassThroughFunction2.class */
    private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private PassThroughFunction2() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception {
            collector.collect("GOT: " + Joiner.on(",").join(iterable));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PointSessionWindows.class */
    public static class PointSessionWindows extends EventTimeSessionWindows {
        private static final long serialVersionUID = 1;

        private PointSessionWindows(long j) {
            super(j);
        }

        public Collection<TimeWindow> assignWindows(Object obj, long j, WindowAssigner.WindowAssignerContext windowAssignerContext) {
            return ((obj instanceof Tuple2) && ((Integer) ((Tuple2) obj).f1).intValue() == 33) ? Collections.singletonList(new TimeWindow(j, j)) : Collections.singletonList(new TimeWindow(j, j + this.sessionTimeout));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$ReducedSessionWindowFunction.class */
    public static class ReducedSessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(new Tuple3(str + "-" + it.next().f1, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd())));
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$RichSumReducer.class */
    public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1;
        private boolean openCalled = false;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            WindowOperatorTest.closeCalled.incrementAndGet();
        }

        public void apply(String str, W w, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple2(str, Integer.valueOf(i)));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (String) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$SessionWindowFunction.class */
    public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple3(str + "-" + i, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd())));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$SumReducer.class */
    public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
            return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$Tuple2ResultSortComparator.class */
    public static class Tuple2ResultSortComparator implements Comparator<Object> {
        private Tuple2ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple2) streamRecord.getValue()).f0).compareTo((String) ((Tuple2) streamRecord2.getValue()).f0);
            return compareTo != 0 ? compareTo : ((Integer) ((Tuple2) streamRecord.getValue()).f1).intValue() - ((Integer) ((Tuple2) streamRecord2.getValue()).f1).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$Tuple3ResultSortComparator.class */
    private static class Tuple3ResultSortComparator implements Comparator<Object> {
        private Tuple3ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple3) streamRecord.getValue()).f0).compareTo((String) ((Tuple3) streamRecord2.getValue()).f0);
            if (compareTo != 0) {
                return compareTo;
            }
            int longValue = (int) (((Long) ((Tuple3) streamRecord.getValue()).f1).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f1).longValue());
            return longValue != 0 ? longValue : (int) (((Long) ((Tuple3) streamRecord.getValue()).f1).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f1).longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$TupleKeySelector.class */
    private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
            return (String) tuple2.f0;
        }
    }

    private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness) throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 3999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 3000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 20L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), 3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(6999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
    }

    @Test
    public void testSlidingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(parse, new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        testSlidingEventTimeWindows(oneInputStreamOperatorTestHarness);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSlidingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", parse.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(parse, new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        testSlidingEventTimeWindows(oneInputStreamOperatorTestHarness);
        oneInputStreamOperatorTestHarness.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness) throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 3999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 3000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 20L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key1", 1), 999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(6999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
    }

    @Test
    public void testTumblingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        testTumblingEventTimeWindows(oneInputStreamOperatorTestHarness);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTumblingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        testTumblingEventTimeWindows(oneInputStreamOperatorTestHarness);
        oneInputStreamOperatorTestHarness.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testReduceSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSessionWindowsWithCountTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 6500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 7000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-10", 0L, 6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 10), 4500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-22", 10L, 10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), ContinuousEventTimeTrigger.of(Time.seconds(2L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 1500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-1", 1500L, 4500L), 4499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new Watermark(2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 4000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new Watermark(3000L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 0L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 4000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-3", 1500L, 7000L), 6999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-15", 0L, 7000L), 6999L));
        concurrentLinkedQueue.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testMergeAndEvictor() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = LocalStreamEnvironment.createLocalEnvironment();
        try {
            createLocalEnvironment.fromElements(new String[]{"Hello", "Ciao"}).keyBy(new KeySelector<String, String>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.1
                private static final long serialVersionUID = 1;

                public String getKey(String str) throws Exception {
                    return str;
                }
            }).window(EventTimeSessionWindows.withGap(Time.seconds(5L))).evictor(CountEvictor.of(13L));
            Assert.fail("The evictor call should fail.");
            createLocalEnvironment.execute();
        } catch (UnsupportedOperationException e) {
        }
    }

    @Test
    public void testPointSessions() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(new PointSessionWindows(3000L), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 33), 1000L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 33), 2500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 33), 2500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-36", 10L, 4000L), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-67", 0L, 3000L), 2999L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testContinuousWatermarkTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ContinuousEventTimeTrigger.of(Time.of(3L, TimeUnit.SECONDS)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1000L));
        concurrentLinkedQueue.add(new Watermark(1000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new Watermark(3000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4000L));
        concurrentLinkedQueue.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new Watermark(6000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(8000L));
        concurrentLinkedQueue.add(new Watermark(7000L));
        concurrentLinkedQueue.add(new Watermark(8000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCountTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.restore(snapshot, 10L);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 4), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreAndSnapshotAreInSync() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig()));
        WindowOperator windowOperator = new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        WindowOperator.Timer timer = new WindowOperator.Timer(1L, "key1", new TimeWindow(1L, 2L));
        WindowOperator.Timer timer2 = new WindowOperator.Timer(3L, "key1", new TimeWindow(1L, 2L));
        WindowOperator.Timer timer3 = new WindowOperator.Timer(2L, "key1", new TimeWindow(1L, 2L));
        windowOperator.processingTimeTimers.add(timer);
        windowOperator.processingTimeTimers.add(timer2);
        windowOperator.processingTimeTimers.add(timer3);
        windowOperator.processingTimeTimersQueue.add(timer);
        windowOperator.processingTimeTimersQueue.add(timer2);
        windowOperator.processingTimeTimersQueue.add(timer3);
        windowOperator.processingTimeTimerTimestamps.add(1L, 10);
        windowOperator.processingTimeTimerTimestamps.add(2L, 5);
        windowOperator.processingTimeTimerTimestamps.add(3L, 1);
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        WindowOperator windowOperator2 = new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(windowOperator2);
        oneInputStreamOperatorTestHarness2.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator2.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.restore(snapshot, 0L);
        oneInputStreamOperatorTestHarness2.open();
        Assert.assertEquals(windowOperator.processingTimeTimers, windowOperator2.processingTimeTimers);
        Assert.assertArrayEquals(windowOperator.processingTimeTimersQueue.toArray(), windowOperator2.processingTimeTimersQueue.toArray());
        Assert.assertEquals(windowOperator.processingTimeTimerTimestamps, windowOperator2.processingTimeTimerTimestamps);
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Throwable {
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L);
        TestTimeServiceProvider testTimeServiceProvider = new TestTimeServiceProvider();
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator, new ExecutionConfig(), testTimeServiceProvider);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        testTimeServiceProvider.setCurrentTime(3L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        testTimeServiceProvider.setCurrentTime(5000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        testTimeServiceProvider.setCurrentTime(7000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeSlidingWindows() throws Throwable {
        WindowOperator windowOperator = new WindowOperator(SlidingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L);
        TestTimeServiceProvider testTimeServiceProvider = new TestTimeServiceProvider();
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator, new ExecutionConfig(), testTimeServiceProvider);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        testTimeServiceProvider.setCurrentTime(3L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        testTimeServiceProvider.setCurrentTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        testTimeServiceProvider.setCurrentTime(2000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        testTimeServiceProvider.setCurrentTime(3000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        testTimeServiceProvider.setCurrentTime(7000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 5), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 5), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Throwable {
        WindowOperator windowOperator = new WindowOperator(ProcessingTimeSessionWindows.withGap(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L);
        TestTimeServiceProvider testTimeServiceProvider = new TestTimeServiceProvider();
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator, new ExecutionConfig(), testTimeServiceProvider);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        testTimeServiceProvider.setCurrentTime(3L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1L));
        testTimeServiceProvider.setCurrentTime(1000L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1002L));
        testTimeServiceProvider.setCurrentTime(5000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 5000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 5000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        testTimeServiceProvider.setCurrentTime(10000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 7999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        Assert.assertEquals(concurrentLinkedQueue.size(), oneInputStreamOperatorTestHarness.getOutput().size());
        Iterator<Object> it = oneInputStreamOperatorTestHarness.getOutput().iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (next instanceof StreamRecord) {
                Assert.assertTrue(concurrentLinkedQueue.contains((StreamRecord) next));
            }
        }
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testLateness() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 500L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1500L));
        concurrentLinkedQueue.add(new Watermark(1500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1300L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2300L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 1999L));
        concurrentLinkedQueue.add(new Watermark(2300L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1997L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7000L));
        concurrentLinkedQueue.add(new Watermark(7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimeOverflow() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig()));
        TumblingEventTimeWindows of = TumblingEventTimeWindows.of(Time.milliseconds(1000L));
        final WindowOperator windowOperator = new WindowOperator(of, new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 2000L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        TimeWindow timeWindow = (TimeWindow) Iterables.getOnlyElement(of.assignWindows(new Tuple2("key2", 1), 9223372036854774057L, new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.2
            public long getCurrentProcessingTime() {
                return windowOperator.windowAssignerContext.getCurrentProcessingTime();
            }
        }));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 9223372036854774057L));
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < 9223372036854774307L);
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(9223372036854774307L));
        Assert.assertTrue(9223372036854774307L < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() < Long.MAX_VALUE);
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(timeWindow.maxTimestamp()));
        concurrentLinkedQueue.add(new Watermark(9223372036854774307L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), timeWindow.maxTimestamp()));
        concurrentLinkedQueue.add(new Watermark(timeWindow.maxTimestamp()));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessTumbling() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1985L));
        concurrentLinkedQueue.add(new Watermark(1985L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1980L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2001L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSliding() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 2999L));
        concurrentLinkedQueue.add(new Watermark(3000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3900L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(6000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(25000L));
        concurrentLinkedQueue.add(new Watermark(25000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10100L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionZeroLateness() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 10L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionWithLateness() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-2", 10000L, 14600L), 14599L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-3", 10000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 10000L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 10000L, 13000L), 12999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10000L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-7", 1000L, 14600L), 14599L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-8", 1000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", parse.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction2()), new EventTimeTriggerAccumGC(100L), 100L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1599L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2100L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord("GOT: (key2,1)", 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2100L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", parse.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1599L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), parse.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1599L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new FoldingStateDescriptor("window-contents", new Tuple2((String) null, 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.3
            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }, parse), new InternalSingleValueWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1599L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", parse.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 10L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 1000L, 4000L), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new FoldingStateDescriptor("window-contents", new Tuple2((String) null, 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.4
            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }, parse), new InternalSingleValueWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 10L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        windowOperator.setInputType(parse, new ExecutionConfig());
        oneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupInCaseOfFailingSnapshotCall() throws Exception {
        StreamTaskState streamTaskState = (StreamTaskState) Mockito.mock(StreamTaskState.class);
        AbstractStateBackend abstractStateBackend = (AbstractStateBackend) Mockito.mock(AbstractStateBackend.class);
        AbstractStateBackend.CheckpointStateOutputStream checkpointStateOutputStream = (AbstractStateBackend.CheckpointStateOutputStream) Mockito.mock(AbstractStateBackend.CheckpointStateOutputStream.class);
        ((AbstractStateBackend.CheckpointStateOutputStream) Mockito.doThrow(new IOException("Test Exception")).when(checkpointStateOutputStream)).write(Matchers.anyInt());
        Mockito.when(abstractStateBackend.createCheckpointStateOutputView(Matchers.anyLong(), Matchers.anyLong())).thenCallRealMethod();
        Mockito.when(abstractStateBackend.createCheckpointStateOutputStream(Matchers.anyLong(), Matchers.anyLong())).thenReturn(checkpointStateOutputStream);
        PowerMockito.whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
        WindowOperator windowOperator = new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(10L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.setStateBackend(abstractStateBackend);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        try {
            oneInputStreamOperatorTestHarness.snapshot(1L, 42L);
            Assert.fail("Expected Exception here.");
        } catch (Exception e) {
        }
        ((AbstractStateBackend.CheckpointStateOutputStream) Mockito.verify(checkpointStateOutputStream)).close();
        ((StreamTaskState) Mockito.verify(streamTaskState)).discardState();
    }

    @Test
    public void testCleanupInCaseOfFailingCheckpointStateOutputStreamCreation() throws Exception {
        StreamTaskState streamTaskState = (StreamTaskState) Mockito.mock(StreamTaskState.class);
        AbstractStateBackend abstractStateBackend = (AbstractStateBackend) Mockito.mock(AbstractStateBackend.class);
        Mockito.when(abstractStateBackend.createCheckpointStateOutputView(Matchers.anyLong(), Matchers.anyLong())).thenThrow(new Throwable[]{new IOException("Test Exception")});
        PowerMockito.whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
        WindowOperator windowOperator = new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(10L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.setStateBackend(abstractStateBackend);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        try {
            oneInputStreamOperatorTestHarness.snapshot(1L, 42L);
            Assert.fail("Expected Exception here.");
        } catch (Exception e) {
        }
        ((StreamTaskState) Mockito.verify(streamTaskState)).discardState();
    }

    @Test
    public void testCleanupInCaseOfFailingCloseAndGetHandleInSnapshotMethod() throws Exception {
        StreamTaskState streamTaskState = (StreamTaskState) Mockito.mock(StreamTaskState.class);
        AbstractStateBackend abstractStateBackend = (AbstractStateBackend) Mockito.mock(AbstractStateBackend.class);
        AbstractStateBackend.CheckpointStateOutputStream checkpointStateOutputStream = (AbstractStateBackend.CheckpointStateOutputStream) Mockito.mock(AbstractStateBackend.CheckpointStateOutputStream.class);
        ((AbstractStateBackend.CheckpointStateOutputStream) Mockito.doThrow(new IOException("Test Exception")).when(checkpointStateOutputStream)).closeAndGetHandle();
        Mockito.when(abstractStateBackend.createCheckpointStateOutputView(Matchers.anyLong(), Matchers.anyLong())).thenCallRealMethod();
        Mockito.when(abstractStateBackend.createCheckpointStateOutputStream(Matchers.anyLong(), Matchers.anyLong())).thenReturn(checkpointStateOutputStream);
        PowerMockito.whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
        WindowOperator windowOperator = new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), TypeInfoParser.parse("Tuple2<String, Integer>").createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(10L)), 0L);
        windowOperator.setInputType(TypeInfoParser.parse("Tuple2<String, Integer>"), new ExecutionConfig());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(windowOperator);
        oneInputStreamOperatorTestHarness.setStateBackend(abstractStateBackend);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        try {
            oneInputStreamOperatorTestHarness.snapshot(1L, 42L);
            Assert.fail("Expected Exception here.");
        } catch (Exception e) {
        }
        ((AbstractStateBackend.CheckpointStateOutputStream) Mockito.verify(checkpointStateOutputStream)).closeAndGetHandle();
        ((StreamTaskState) Mockito.verify(streamTaskState)).discardState();
    }
}
