package org.apache.beam.runners.flink.translation.wrappers.streaming;

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.class */
public class DoFnOperatorTest {
    private static final long WINDOW_MSECS_1 = 100;
    private static final long WINDOW_MSECS_2 = 500;
    private PCollectionView<Iterable<String>> view1;
    private PCollectionView<Iterable<String>> view2;
    private int numStartBundleCalled = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest$IdentityDoFn.class */
    public static class IdentityDoFn<T> extends DoFn<T, T> {
        private IdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
            processContext.output(processContext.element());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest$MultiOutputDoFn.class */
    private static class MultiOutputDoFn extends DoFn<String, String> {
        private TupleTag<String> additionalOutput1;
        private TupleTag<String> additionalOutput2;

        public MultiOutputDoFn(TupleTag<String> tupleTag, TupleTag<String> tupleTag2) {
            this.additionalOutput1 = tupleTag;
            this.additionalOutput2 = tupleTag2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            if ("one".equals(processContext.element())) {
                processContext.output(this.additionalOutput1, "extra: one");
            } else {
                if ("two".equals(processContext.element())) {
                    processContext.output(this.additionalOutput2, "extra: two");
                    return;
                }
                processContext.output("got: " + ((String) processContext.element()));
                processContext.output(this.additionalOutput1, "got: " + ((String) processContext.element()));
                processContext.output(this.additionalOutput2, "got: " + ((String) processContext.element()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest$TestHarnessFactory.class */
    public interface TestHarnessFactory<T> {
        T create() throws Exception;
    }

    @Before
    public void setUp() {
        PCollection apply = Pipeline.create().apply(Create.of("1", new String[0]));
        this.view1 = apply.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_1)))).apply(View.asIterable());
        this.view2 = apply.apply(Window.into(FixedWindows.of(new Duration(WINDOW_MSECS_2)))).apply(View.asIterable());
    }

    @Test
    public void testSingleOutput() throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, valueOnlyCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("Hello")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("Hello")}));
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testMultiOutputOutput() throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        TupleTag tupleTag2 = new TupleTag("output-1");
        TupleTag tupleTag3 = new TupleTag("output-2");
        ImmutableMap build = ImmutableMap.builder().put(tupleTag2, new OutputTag<WindowedValue<String>>(tupleTag2.getId()) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.2
        }).put(tupleTag3, new OutputTag<WindowedValue<String>>(tupleTag3.getId()) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.1
        }).build();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new DoFnOperator(new MultiOutputDoFn(tupleTag2, tupleTag3), "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, ImmutableList.of(tupleTag2, tupleTag3), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, build, ImmutableMap.builder().put(tupleTag, valueOnlyCoder).put(tupleTag2, valueOnlyCoder).put(tupleTag3, valueOnlyCoder).build(), ImmutableMap.builder().put(tupleTag, 0).put(tupleTag2, 1).put(tupleTag3, 2).build(), new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("one")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("two")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("hello")));
        MatcherAssert.assertThat(stripStreamRecord(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("got: hello")}));
        MatcherAssert.assertThat(stripStreamRecord(oneInputStreamOperatorTestHarness.getSideOutput((OutputTag) build.get(tupleTag2))), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("extra: one"), WindowedValue.valueInGlobalWindow("got: hello")}));
        MatcherAssert.assertThat(stripStreamRecord(oneInputStreamOperatorTestHarness.getSideOutput((OutputTag) build.get(tupleTag3))), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("extra: two"), WindowedValue.valueInGlobalWindow("got: hello")}));
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testWatermarkContract() throws Exception {
        final Instant instant = new Instant(1000L);
        final Instant minus = instant.minus(1L);
        WindowingStrategy of = WindowingStrategy.of(FixedWindows.of(new Duration(10000L)));
        DoFn<Integer, String> doFn = new DoFn<Integer, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.3

            @DoFn.TimerId("eventTimer")
            private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.TimerId("eventTimer2")
            private final TimerSpec eventTimer2 = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.TimerId("processingTimer")
            private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, String>.ProcessContext processContext, @DoFn.TimerId("eventTimer") Timer timer, @DoFn.TimerId("eventTimer2") Timer timer2, @DoFn.TimerId("processingTimer") Timer timer3) {
                timer.set(instant);
                timer2.withOutputTimestamp(minus).set(instant);
                timer3.offset(Duration.millis(instant.getMillis())).setRelative();
            }

            @DoFn.OnTimer("eventTimer")
            public void onEventTime(DoFn<Integer, String>.OnTimerContext onTimerContext) {
                Assert.assertEquals("Timer timestamp must match set timestamp.", instant, onTimerContext.timestamp());
                onTimerContext.outputWithTimestamp("Event timer fired: eventTimer", onTimerContext.timestamp());
            }

            @DoFn.OnTimer("eventTimer2")
            public void onEventTime2(DoFn<Integer, String>.OnTimerContext onTimerContext) {
                Assert.assertEquals("Timer timestamp must match set timestamp.", instant, onTimerContext.fireTimestamp());
                onTimerContext.output("Event timer fired: eventTimer2");
            }

            @DoFn.OnTimer("processingTimer")
            public void onProcessingTime(DoFn<Integer, String>.OnTimerContext onTimerContext) {
                Assert.assertEquals("Timer timestamp must match current input watermark", instant.plus(1L), onTimerContext.timestamp());
                onTimerContext.outputWithTimestamp("Processing timer fired", onTimerContext.timestamp());
            }
        };
        VarIntCoder of2 = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of2, of.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder fullCoder2 = WindowedValue.getFullCoder(StringUtf8Coder.of(), of.getWindowFn().windowCoder());
        KeySelector keySelector = windowedValue -> {
            return FlinkKeyUtils.encodeKey((Integer) windowedValue.getValue(), of2);
        };
        TupleTag tupleTag = new TupleTag("main-output");
        DoFnOperator doFnOperator = new DoFnOperator(doFn, "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder2, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), of, new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), of2, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(doFnOperator, keySelector, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
        keyedOneInputStreamOperatorTestHarness.setup(new CoderTypeSerializer(fullCoder2, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(13, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), Matchers.emptyIterable());
        keyedOneInputStreamOperatorTestHarness.processWatermark(instant.getMillis());
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(instant.getMillis());
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), Matchers.emptyIterable());
        MatcherAssert.assertThat(doFnOperator.keyedStateInternals.minWatermarkHoldMs(), Matchers.is(Long.valueOf(minus.getMillis())));
        keyedOneInputStreamOperatorTestHarness.processWatermark(instant.getMillis() + 1);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.of("Event timer fired: eventTimer", instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of("Event timer fired: eventTimer2", instant.minus(1L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(instant.getMillis() + 1);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("Processing timer fired", instant.plus(1L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testLateDroppingForStatefulFn() throws Exception {
        WindowingStrategy of = WindowingStrategy.of(FixedWindows.of(new Duration(10L)));
        DoFn<Integer, String> doFn = new DoFn<Integer, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.4

            @DoFn.StateId("state")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
                processContext.output(((Integer) processContext.element()).toString());
            }
        };
        VarIntCoder of2 = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of2, of.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder fullCoder2 = WindowedValue.getFullCoder(StringUtf8Coder.of(), of.getWindowFn().windowCoder());
        KeySelector keySelector = windowedValue -> {
            return FlinkKeyUtils.encodeKey((Integer) windowedValue.getValue(), of2);
        };
        TupleTag tupleTag = new TupleTag("main-output");
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new DoFnOperator(doFn, "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder2, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), of, new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), of2, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), keySelector, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(13, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("13", new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.processWatermark(9L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(17, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("17", new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.processWatermark(10L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(17, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), Matchers.emptyIterable());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testStateGCForStatefulFn() throws Exception {
        WindowingStrategy withAllowedLateness = WindowingStrategy.of(FixedWindows.of(new Duration(10L))).withAllowedLateness(Duration.ZERO);
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> harness = getHarness(withAllowedLateness, 5000, boundedWindow -> {
            return new Instant(boundedWindow.maxTimestamp());
        }, 4093);
        harness.open();
        harness.processWatermark(0L);
        Assert.assertEquals(0L, harness.numKeyedStateEntries());
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10L));
        harness.processElement(new StreamRecord(WindowedValue.of(KV.of("key1", 5), new Instant(1L), intervalWindow, PaneInfo.NO_FIRING)));
        harness.processElement(new StreamRecord(WindowedValue.of(KV.of("key2", 7), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(harness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of(KV.of("key1", 5005), new Instant(1L), intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("key2", 5007), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING)}));
        Assert.assertEquals(4L, harness.numKeyedStateEntries());
        harness.getOutput().clear();
        harness.processWatermark(intervalWindow.maxTimestamp().plus(withAllowedLateness.getAllowedLateness()).plus(1L).getMillis() + 1);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(harness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of(KV.of("key1", 4093), new Instant(9L), intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("key2", 4093), new Instant(9L), intervalWindow, PaneInfo.NO_FIRING)}));
        harness.close();
    }

    @Test
    public void testGCForGlobalWindow() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> harness = getHarness(WindowingStrategy.globalDefault(), 5000, boundedWindow -> {
            return new Instant(50L);
        }, 4092);
        harness.open();
        harness.processWatermark(0L);
        Assert.assertEquals(0L, harness.numKeyedStateEntries());
        harness.processElement(new StreamRecord(WindowedValue.of(KV.of("key1", 5), new Instant(23L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
        harness.processElement(new StreamRecord(WindowedValue.of(KV.of("key2", 6), new Instant(42L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(Integer.valueOf(harness.numEventTimeTimers()), Matchers.is(2));
        MatcherAssert.assertThat(Integer.valueOf(harness.numKeyedStateEntries()), Matchers.is(4));
        harness.processWatermark(51L);
        MatcherAssert.assertThat(Integer.valueOf(harness.numEventTimeTimers()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(harness.numKeyedStateEntries()), Matchers.is(2));
        harness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(1L).getMillis());
        MatcherAssert.assertThat(Integer.valueOf(harness.numEventTimeTimers()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(harness.numKeyedStateEntries()), Matchers.is(2));
        harness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(2L).getMillis());
        MatcherAssert.assertThat(Integer.valueOf(harness.numEventTimeTimers()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(harness.numKeyedStateEntries()), Matchers.is(0));
        harness.processElement(new StreamRecord(WindowedValue.of(KV.of("key2", 6), new Instant(42L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
        harness.close();
        MatcherAssert.assertThat(Integer.valueOf(harness.numEventTimeTimers()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(harness.numKeyedStateEntries()), Matchers.is(0));
    }

    private static KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> getHarness(WindowingStrategy windowingStrategy, final int i, final Function<BoundedWindow, Instant> function, final int i2) throws Exception {
        DoFn<KV<String, Integer>, KV<String, Integer>> doFn = new DoFn<KV<String, Integer>, KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.5

            @DoFn.TimerId("boo")
            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.StateId("dazzle")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, KV<String, Integer>>.ProcessContext processContext, @DoFn.TimerId("boo") Timer timer, @DoFn.StateId("dazzle") ValueState<String> valueState, BoundedWindow boundedWindow) {
                timer.set((Instant) function.apply(boundedWindow));
                valueState.write((String) ((KV) processContext.element()).getKey());
                processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), Integer.valueOf(((Integer) ((KV) processContext.element()).getValue()).intValue() + i)));
            }

            @DoFn.OnTimer("boo")
            public void onTimer(DoFn<KV<String, Integer>, KV<String, Integer>>.OnTimerContext onTimerContext, @DoFn.StateId("dazzle") ValueState<String> valueState) {
                onTimerContext.output(KV.of((String) valueState.read(), Integer.valueOf(i2)));
            }
        };
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), windowingStrategy.getWindowFn().windowCoder());
        TupleTag tupleTag = new TupleTag("main-output");
        KeySelector keySelector = windowedValue -> {
            return FlinkKeyUtils.encodeKey((String) ((KV) windowedValue.getValue()).getKey(), StringUtf8Coder.of());
        };
        return new KeyedOneInputStreamOperatorTestHarness<>(new DoFnOperator(doFn, "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), StringUtf8Coder.of(), keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), keySelector, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
    }

    @Test
    public void testNormalParDoSideInputs() throws Exception {
        testSideInputs(false);
    }

    @Test
    public void testKeyedParDoSideInputs() throws Exception {
        testSideInputs(true);
    }

    void testSideInputs(boolean z) throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        ImmutableMap build = ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build();
        Coder of = StringUtf8Coder.of();
        KeySelector keySelector = null;
        if (z) {
            keySelector = windowedValue -> {
                return FlinkKeyUtils.encodeKey((String) windowedValue.getValue(), of);
            };
        }
        DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, valueOnlyCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_MSECS_1))), build, ImmutableList.of(this.view1, this.view2), FlinkPipelineOptions.defaults(), z ? of : null, z ? keySelector : null, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedTwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(doFnOperator);
        if (z) {
            twoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(doFnOperator, keySelector, (KeySelector) null, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
        }
        twoInputStreamOperatorTestHarness.open();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_1));
        IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_2));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view1.getPipeline().getOptions(), View.asIterable(), new Object[]{"hello", "ciao"}), new Instant(0L), intervalWindow))));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view2.getPipeline().getOptions(), View.asIterable(), new Object[]{"foo", "bar"}), new Instant(0L), intervalWindow2))));
        WindowedValue valueInWindow = valueInWindow("Hello", new Instant(0L), intervalWindow);
        WindowedValue valueInWindow2 = valueInWindow("World", new Instant(1000L), intervalWindow);
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(valueInWindow));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(valueInWindow2));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view1.getPipeline().getOptions(), View.asIterable(), new Object[]{"hello", "ciao"}), new Instant(1000L), intervalWindow))));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view2.getPipeline().getOptions(), View.asIterable(), new Object[]{"foo", "bar"}), new Instant(1000L), intervalWindow2))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(twoInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{valueInWindow, valueInWindow2}));
        twoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testStateRestore() throws Exception {
        DoFn<KV<String, Long>, KV<String, Long>> doFn = new DoFn<KV<String, Long>, KV<String, Long>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.6

            @DoFn.StateId("counter")
            private final StateSpec<ValueState<Long>> counterSpec = StateSpecs.value(VarLongCoder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Long>, KV<String, Long>>.ProcessContext processContext, @DoFn.StateId("counter") ValueState<Long> valueState) {
                long longValue = ((Long) Optional.ofNullable((Long) valueState.read()).orElse(0L)).longValue() + 1;
                valueState.write(Long.valueOf(longValue));
                KV kv = (KV) processContext.element();
                if (longValue == ((Long) kv.getValue()).longValue()) {
                    processContext.output(kv);
                }
            }
        };
        WindowingStrategy<Object, ?> globalDefault = WindowingStrategy.globalDefault();
        TupleTag tupleTag = new TupleTag("main-output");
        StringUtf8Coder of = StringUtf8Coder.of();
        KvToByteBufferKeySelector kvToByteBufferKeySelector = new KvToByteBufferKeySelector(of, (SerializablePipelineOptions) null);
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(KvCoder.of(of, VarLongCoder.of()), globalDefault.getWindowFn().windowCoder());
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults());
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(globalDefault, doFn, fullCoder, fullCoder, of, tupleTag, coderTypeInformation, kvToByteBufferKeySelector);
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", Long.valueOf(WINDOW_MSECS_1)))));
        createTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", Long.valueOf(WINDOW_MSECS_1)))));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(globalDefault, doFn, fullCoder, fullCoder, of, tupleTag, coderTypeInformation, kvToByteBufferKeySelector);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", Long.valueOf(WINDOW_MSECS_1)))));
        createTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", 4L))));
        createTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", 5L))));
        createTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("a", Long.valueOf(WINDOW_MSECS_1)))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(createTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("a", 4L)), WindowedValue.valueInGlobalWindow(KV.of("a", 5L))}));
        createTestHarness2.close();
    }

    @Test
    public void nonKeyedParDoSideInputCheckpointing() throws Exception {
        sideInputCheckpointing(() -> {
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder());
            TupleTag tupleTag = new TupleTag("main-output");
            return new TwoInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build(), ImmutableList.of(this.view1, this.view2), FlinkPipelineOptions.defaults(), (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap()));
        });
    }

    @Test
    public void keyedParDoSideInputCheckpointing() throws Exception {
        sideInputCheckpointing(() -> {
            StringUtf8Coder of = StringUtf8Coder.of();
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of, IntervalWindow.getCoder());
            TupleTag tupleTag = new TupleTag("main-output");
            KeySelector keySelector = windowedValue -> {
                return FlinkKeyUtils.encodeKey((String) windowedValue.getValue(), of);
            };
            return new KeyedTwoInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_MSECS_1))), ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build(), ImmutableList.of(this.view1, this.view2), FlinkPipelineOptions.defaults(), of, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), keySelector, (KeySelector) null, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
        });
    }

    void sideInputCheckpointing(TestHarnessFactory<TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>>> testHarnessFactory) throws Exception {
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> create = testHarnessFactory.create();
        create.open();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_1));
        IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_2));
        create.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view1.getPipeline().getOptions(), View.asIterable(), new Object[]{"hello", "ciao"}), new Instant(0L), intervalWindow))));
        create.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view2.getPipeline().getOptions(), View.asIterable(), new Object[]{"foo", "bar"}), new Instant(0L), intervalWindow2))));
        OperatorSubtaskState snapshot = create.snapshot(0L, 0L);
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> create2 = testHarnessFactory.create();
        create2.initializeState(snapshot);
        create2.open();
        WindowedValue valueInWindow = valueInWindow("Hello", new Instant(0L), intervalWindow);
        WindowedValue valueInWindow2 = valueInWindow("World", new Instant(1000L), intervalWindow);
        create2.processElement1(new StreamRecord(valueInWindow));
        create2.processElement1(new StreamRecord(valueInWindow2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(create2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{valueInWindow, valueInWindow2}));
        create2.close();
    }

    @Test
    public void nonKeyedParDoPushbackDataCheckpointing() throws Exception {
        pushbackDataCheckpointing(() -> {
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder());
            TupleTag tupleTag = new TupleTag("main-output");
            return new TwoInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_MSECS_1))), ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build(), ImmutableList.of(this.view1, this.view2), FlinkPipelineOptions.defaults(), (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap()));
        });
    }

    @Test
    public void keyedParDoPushbackDataCheckpointing() throws Exception {
        pushbackDataCheckpointing(() -> {
            StringUtf8Coder of = StringUtf8Coder.of();
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of, IntervalWindow.getCoder());
            TupleTag tupleTag = new TupleTag("main-output");
            KeySelector keySelector = windowedValue -> {
                return FlinkKeyUtils.encodeKey((String) windowedValue.getValue(), of);
            };
            return new KeyedTwoInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", fullCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_MSECS_1))), ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build(), ImmutableList.of(this.view1, this.view2), FlinkPipelineOptions.defaults(), of, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), keySelector, (KeySelector) null, new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults()));
        });
    }

    void pushbackDataCheckpointing(TestHarnessFactory<TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>>> testHarnessFactory) throws Exception {
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> create = testHarnessFactory.create();
        create.open();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_1));
        IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_2));
        WindowedValue valueInWindow = valueInWindow("Hello", new Instant(0L), intervalWindow);
        WindowedValue valueInWindow2 = valueInWindow("World", new Instant(1000L), intervalWindow);
        create.processElement1(new StreamRecord(valueInWindow));
        create.processElement1(new StreamRecord(valueInWindow2));
        OperatorSubtaskState snapshot = create.snapshot(0L, 0L);
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> create2 = testHarnessFactory.create();
        create2.initializeState(snapshot);
        create2.open();
        create2.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view1.getPipeline().getOptions(), View.asIterable(), new Object[]{"hello", "ciao"}), new Instant(0L), intervalWindow))));
        create2.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(PCollectionViewTesting.materializeValuesFor(this.view2.getPipeline().getOptions(), View.asIterable(), new Object[]{"foo", "bar"}), new Instant(0L), intervalWindow2))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(create2.getOutput()), Matchers.containsInAnyOrder(new WindowedValue[]{valueInWindow, valueInWindow2}));
        create2.close();
    }

    @Test
    public void testTimersRestore() throws Exception {
        final Instant instant = new Instant(1000L);
        WindowingStrategy<Object, ?> of = WindowingStrategy.of(FixedWindows.of(new Duration(10000L)));
        DoFn<Integer, String> doFn = new DoFn<Integer, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.7
            private static final String EVENT_TIMER_ID = "eventTimer";

            @DoFn.TimerId(EVENT_TIMER_ID)
            private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, String>.ProcessContext processContext, @DoFn.TimerId("eventTimer") Timer timer) {
                timer.set(instant);
            }

            @DoFn.OnTimer(EVENT_TIMER_ID)
            public void onEventTime(DoFn<Integer, String>.OnTimerContext onTimerContext) {
                Assert.assertEquals("Timer timestamp must match set timestamp.", instant, onTimerContext.timestamp());
                onTimerContext.outputWithTimestamp("Timer fired", onTimerContext.timestamp());
            }
        };
        VarIntCoder of2 = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of2, of.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder fullCoder2 = WindowedValue.getFullCoder(StringUtf8Coder.of(), of.getWindowFn().windowCoder());
        TupleTag tupleTag = new TupleTag("main-output");
        CoderTypeSerializer coderTypeSerializer = new CoderTypeSerializer(fullCoder2, new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation(FlinkKeyUtils.ByteBufferCoder.of(), FlinkPipelineOptions.defaults());
        KeySelector keySelector = windowedValue -> {
            return FlinkKeyUtils.encodeKey((Integer) windowedValue.getValue(), of2);
        };
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(of, doFn, fullCoder, fullCoder2, of2, tupleTag, coderTypeInformation, keySelector);
        createTestHarness.setup(coderTypeSerializer);
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10000L));
        createTestHarness.processElement(new StreamRecord(WindowedValue.of(13, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(createTestHarness.getOutput()), Matchers.emptyIterable());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(of, doFn, fullCoder, fullCoder2, VarIntCoder.of(), tupleTag, coderTypeInformation, keySelector);
        createTestHarness2.setup(coderTypeSerializer);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(instant.getMillis() + 1);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(createTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("Timer fired", new Instant(instant), intervalWindow, PaneInfo.NO_FIRING)}));
        createTestHarness2.close();
    }

    private <K, InT, OutT> OneInputStreamOperatorTestHarness<WindowedValue<InT>, WindowedValue<OutT>> createTestHarness(WindowingStrategy<Object, ?> windowingStrategy, DoFn<InT, OutT> doFn, WindowedValue.FullWindowedValueCoder<InT> fullWindowedValueCoder, WindowedValue.FullWindowedValueCoder<OutT> fullWindowedValueCoder2, Coder<?> coder, TupleTag<OutT> tupleTag, TypeInformation<K> typeInformation, KeySelector<WindowedValue<InT>, K> keySelector) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(new DoFnOperator(doFn, "stepName", fullWindowedValueCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullWindowedValueCoder2, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), FlinkPipelineOptions.defaults(), coder, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), keySelector, typeInformation);
    }

    @Test
    public void testBundle() throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(2L);
        defaults.setMaxBundleTimeMills(10L);
        IdentityDoFn<String> identityDoFn = new IdentityDoFn<String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.8
            @DoFn.FinishBundle
            public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
                finishBundleContext.output("finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults));
        DoFnOperator doFnOperator = new DoFnOperator(identityDoFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(doFnOperator);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("a")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("b")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("c")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("c")}));
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        PushedBackElementsHandler pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler;
        MatcherAssert.assertThat(pushedBackElementsHandler, Matchers.instanceOf(NonKeyedPushedBackElementsHandler.class));
        MatcherAssert.assertThat((List) pushedBackElementsHandler.getElements().collect(Collectors.toList()), IsIterableContainingInOrder.contains(new KV[]{KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle"))}));
        oneInputStreamOperatorTestHarness.close();
        DoFnOperator doFnOperator2 = new DoFnOperator(identityDoFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(doFnOperator2);
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("d")));
        oneInputStreamOperatorTestHarness2.setProcessingTime(10L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle")}));
        oneInputStreamOperatorTestHarness2.close();
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle")}));
        doFnOperator2.dispose();
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle")}));
    }

    @Test
    public void testBundleKeyed() throws Exception {
        StringUtf8Coder of = StringUtf8Coder.of();
        KvToByteBufferKeySelector kvToByteBufferKeySelector = new KvToByteBufferKeySelector(of, new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
        KvCoder of2 = KvCoder.of(of, StringUtf8Coder.of());
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(of2);
        TupleTag tupleTag = new TupleTag("main-output");
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(2L);
        defaults.setMaxBundleTimeMills(10L);
        DoFn<KV<String, String>, String> doFn = new DoFn<KV<String, String>, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.9
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) {
                processContext.output((String) ((KV) processContext.element()).getValue());
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<KV<String, String>, String>.FinishBundleContext finishBundleContext) {
                finishBundleContext.output("finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(of2.getValueCoder(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults));
        DoFnOperator doFnOperator = new DoFnOperator(doFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, of, kvToByteBufferKeySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(doFnOperator, kvToByteBufferKeySelector, kvToByteBufferKeySelector.getProducedType());
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "a"))));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "b"))));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("c")}));
        OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        PushedBackElementsHandler pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler;
        MatcherAssert.assertThat(pushedBackElementsHandler, Matchers.instanceOf(NonKeyedPushedBackElementsHandler.class));
        MatcherAssert.assertThat((List) pushedBackElementsHandler.getElements().collect(Collectors.toList()), IsIterableContainingInOrder.contains(new KV[]{KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle"))}));
        keyedOneInputStreamOperatorTestHarness.close();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(new DoFnOperator(doFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, of, kvToByteBufferKeySelector, DoFnSchemaInformation.create(), Collections.emptyMap()), kvToByteBufferKeySelector, kvToByteBufferKeySelector.getProducedType());
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "d"))));
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(10L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle")}));
        keyedOneInputStreamOperatorTestHarness2.close();
    }

    @Test
    public void testCheckpointBufferingWithMultipleBundles() throws Exception {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(10L);
        defaults.setCheckpointingInterval(1L);
        TupleTag tupleTag = new TupleTag("main-output");
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults));
        Supplier supplier = () -> {
            return new DoFnOperator(new IdentityDoFn(), "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap());
        };
        DoFnOperator doFnOperator = (DoFnOperator) supplier.get();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(doFnOperator);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("regular element")));
        doFnOperator.setBundleFinishedCallback(() -> {
            try {
                doFnOperator.setBundleFinishedCallback((Runnable) null);
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("trigger another bundle")));
                doFnOperator.invokeFinishBundle();
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("check that the previous element is not flushed")));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("regular element")}));
        oneInputStreamOperatorTestHarness.processWatermark(Long.MAX_VALUE);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("regular element"), WindowedValue.valueInGlobalWindow("trigger another bundle"), WindowedValue.valueInGlobalWindow("check that the previous element is not flushed")}));
        oneInputStreamOperatorTestHarness.close();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) supplier.get());
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("after restore")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("trigger another bundle"), WindowedValue.valueInGlobalWindow("check that the previous element is not flushed"), WindowedValue.valueInGlobalWindow("after restore")}));
    }

    @Test
    public void testExactlyOnceBuffering() throws Exception {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(2L);
        defaults.setCheckpointingInterval(1L);
        TupleTag tupleTag = new TupleTag("main-output");
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        this.numStartBundleCalled = 0;
        DoFn<String, String> doFn = new DoFn<String, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.10
            @DoFn.StartBundle
            public void startBundle(DoFn<String, String>.StartBundleContext startBundleContext) {
                DoFnOperatorTest.access$112(DoFnOperatorTest.this, 1);
            }

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output((String) processContext.element());
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
                finishBundleContext.output("finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults));
        Supplier supplier = () -> {
            return new DoFnOperator(doFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap());
        };
        DoFnOperator doFnOperator = (DoFnOperator) supplier.get();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(doFnOperator);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("a")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("b")));
        MatcherAssert.assertThat(Integer.valueOf(Iterables.size(oneInputStreamOperatorTestHarness.getOutput())), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(0));
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(1));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), WindowedValue.valueInGlobalWindow("finishBundle")}));
        DoFnOperator doFnOperator2 = (DoFnOperator) supplier.get();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(doFnOperator2);
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        oneInputStreamOperatorTestHarness2.open();
        doFnOperator2.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), WindowedValue.valueInGlobalWindow("finishBundle")}));
        doFnOperator2.notifyCheckpointComplete(1L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), WindowedValue.valueInGlobalWindow("finishBundle")}));
    }

    @Test
    public void testExactlyOnceBufferingKeyed() throws Exception {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(2L);
        defaults.setCheckpointingInterval(1L);
        TupleTag tupleTag = new TupleTag("main-output");
        StringUtf8Coder of = StringUtf8Coder.of();
        KvToByteBufferKeySelector kvToByteBufferKeySelector = new KvToByteBufferKeySelector(of, new SerializablePipelineOptions(defaults));
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(KvCoder.of(of, StringUtf8Coder.of()));
        DoFn<KV<String, String>, KV<String, String>> doFn = new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.11
            @DoFn.StartBundle
            public void startBundle(DoFn<KV<String, String>, KV<String, String>>.StartBundleContext startBundleContext) {
                DoFnOperatorTest.access$108(DoFnOperatorTest.this);
            }

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext) {
                processContext.output((KV) processContext.element());
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<KV<String, String>, KV<String, String>>.FinishBundleContext finishBundleContext) {
                finishBundleContext.output(KV.of("key3", "finishBundle"), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults));
        Supplier supplier = () -> {
            return new DoFnOperator(doFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, of, kvToByteBufferKeySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        };
        DoFnOperator doFnOperator = (DoFnOperator) supplier.get();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(doFnOperator, kvToByteBufferKeySelector, kvToByteBufferKeySelector.getProducedType());
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "a"))));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "b"))));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key2", "c"))));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key2", "d"))));
        MatcherAssert.assertThat(Integer.valueOf(Iterables.size(keyedOneInputStreamOperatorTestHarness.getOutput())), Matchers.is(0));
        OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(1));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("key", "a")), WindowedValue.valueInGlobalWindow(KV.of("key", "b")), WindowedValue.valueInGlobalWindow(KV.of("key2", "c")), WindowedValue.valueInGlobalWindow(KV.of("key2", "d")), WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))}));
        DoFnOperator doFnOperator2 = (DoFnOperator) supplier.get();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(doFnOperator2, kvToByteBufferKeySelector, kvToByteBufferKeySelector.getProducedType());
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        doFnOperator2.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("key", "a")), WindowedValue.valueInGlobalWindow(KV.of("key", "b")), WindowedValue.valueInGlobalWindow(KV.of("key2", "c")), WindowedValue.valueInGlobalWindow(KV.of("key2", "d")), WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))}));
        doFnOperator2.notifyCheckpointComplete(1L);
        MatcherAssert.assertThat(Integer.valueOf(this.numStartBundleCalled), Matchers.is(2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness2.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("key", "a")), WindowedValue.valueInGlobalWindow(KV.of("key", "b")), WindowedValue.valueInGlobalWindow(KV.of("key2", "c")), WindowedValue.valueInGlobalWindow(KV.of("key2", "d")), WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))}));
    }

    @Test(expected = IllegalStateException.class)
    public void testFailOnRequiresStableInputAndDisabledCheckpointing() {
        TupleTag tupleTag = new TupleTag("main-output");
        StringUtf8Coder of = StringUtf8Coder.of();
        KvToByteBufferKeySelector kvToByteBufferKeySelector = new KvToByteBufferKeySelector(of, (SerializablePipelineOptions) null);
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(KvCoder.of(of, StringUtf8Coder.of()));
        DoFn<String, String> doFn = new DoFn<String, String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.12
            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output((String) processContext.element());
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(FlinkPipelineOptions.defaults()));
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setCheckpointingInterval(-1L);
        new DoFnOperator(doFn, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, of, kvToByteBufferKeySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    @Test
    public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws Exception {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setMaxBundleSize(10L);
        defaults.setCheckpointingInterval(1L);
        TupleTag tupleTag = new TupleTag("main-output");
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.13
            @DoFn.FinishBundle
            public void finishBundle() {
                throw new RuntimeException("something went wrong here");
            }
        }, "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults)), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("regular element")));
        Assert.assertThrows(Error.class, () -> {
            oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        });
    }

    @Test
    public void testAccumulatorRegistrationOnOperatorClose() throws Exception {
        DoFnOperator operatorForCleanupInspection = getOperatorForCleanupInspection();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(operatorForCleanupInspection);
        oneInputStreamOperatorTestHarness.open();
        FlinkMetricContainer flinkMetricContainer = (FlinkMetricContainer) Mockito.spy((FlinkMetricContainer) Whitebox.getInternalState(operatorForCleanupInspection, "flinkMetricContainer"));
        Whitebox.setInternalState(operatorForCleanupInspection, "flinkMetricContainer", flinkMetricContainer);
        oneInputStreamOperatorTestHarness.close();
        operatorForCleanupInspection.dispose();
        ((FlinkMetricContainer) Mockito.verify(flinkMetricContainer, Mockito.times(2))).registerMetricsForPipelineResult();
    }

    @Test
    public void testRemoveCachedClassReferences() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(getOperatorForCleanupInspection());
        LRUMap lRUMap = (LRUMap) Whitebox.getInternalState(TypeFactory.defaultInstance(), "_typeCache");
        MatcherAssert.assertThat(Integer.valueOf(lRUMap.size()), Matchers.greaterThan(0));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.close();
        MatcherAssert.assertThat(Integer.valueOf(lRUMap.size()), Matchers.is(0));
    }

    private static DoFnOperator getOperatorForCleanupInspection() {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setParallelism(4);
        TupleTag tupleTag = new TupleTag("main-output");
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        return new DoFnOperator(new IdentityDoFn<String>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.14
            @DoFn.FinishBundle
            public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
                finishBundleContext.output("finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
            }
        }, "stepName", valueOnlyCoder, Collections.emptyMap(), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions(defaults)), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), defaults, (Coder) null, (KeySelector) null, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    private Iterable<WindowedValue<String>> stripStreamRecord(Iterable<?> iterable) {
        return FluentIterable.from(iterable).filter(obj -> {
            return obj instanceof StreamRecord;
        }).transform(new Function<Object, WindowedValue<String>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest.15
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public WindowedValue<String> m17apply(Object obj2) {
                if (obj2 instanceof StreamRecord) {
                    return (WindowedValue) ((StreamRecord) obj2).getValue();
                }
                throw new RuntimeException("unreachable");
            }
        });
    }

    private WindowedValue<Iterable<?>> valuesInWindow(Iterable<?> iterable, Instant instant, BoundedWindow boundedWindow) {
        return WindowedValue.of(iterable, instant, boundedWindow, PaneInfo.NO_FIRING);
    }

    private <T> WindowedValue<T> valueInWindow(T t, Instant instant, BoundedWindow boundedWindow) {
        return WindowedValue.of(t, instant, boundedWindow, PaneInfo.NO_FIRING);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1980673785:
                if (implMethodName.equals("lambda$testTimersRestore$14396367$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1677030095:
                if (implMethodName.equals("lambda$testLateDroppingForStatefulFn$14396367$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1668913538:
                if (implMethodName.equals("lambda$keyedParDoSideInputCheckpointing$f7a0d5c1$1")) {
                    z = 4;
                    break;
                }
                break;
            case -535231039:
                if (implMethodName.equals("lambda$testSideInputs$2e46429e$1")) {
                    z = 3;
                    break;
                }
                break;
            case 948625169:
                if (implMethodName.equals("lambda$testWatermarkContract$14396367$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1286212367:
                if (implMethodName.equals("lambda$getHarness$b88e741f$1")) {
                    z = false;
                    break;
                }
                break;
            case 2132442188:
                if (implMethodName.equals("lambda$keyedParDoPushbackDataCheckpointing$f7a0d5c1$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    return windowedValue -> {
                        return FlinkKeyUtils.encodeKey((String) ((KV) windowedValue.getValue()).getKey(), StringUtf8Coder.of());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/StringUtf8Coder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    StringUtf8Coder stringUtf8Coder = (StringUtf8Coder) serializedLambda.getCapturedArg(0);
                    return windowedValue2 -> {
                        return FlinkKeyUtils.encodeKey((String) windowedValue2.getValue(), stringUtf8Coder);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/VarIntCoder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    VarIntCoder varIntCoder = (VarIntCoder) serializedLambda.getCapturedArg(0);
                    return windowedValue3 -> {
                        return FlinkKeyUtils.encodeKey((Integer) windowedValue3.getValue(), varIntCoder);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                    return windowedValue4 -> {
                        return FlinkKeyUtils.encodeKey((String) windowedValue4.getValue(), coder);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/StringUtf8Coder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    StringUtf8Coder stringUtf8Coder2 = (StringUtf8Coder) serializedLambda.getCapturedArg(0);
                    return windowedValue5 -> {
                        return FlinkKeyUtils.encodeKey((String) windowedValue5.getValue(), stringUtf8Coder2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/VarIntCoder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    VarIntCoder varIntCoder2 = (VarIntCoder) serializedLambda.getCapturedArg(0);
                    return windowedValue6 -> {
                        return FlinkKeyUtils.encodeKey((Integer) windowedValue6.getValue(), varIntCoder2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/VarIntCoder;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/nio/ByteBuffer;")) {
                    VarIntCoder varIntCoder3 = (VarIntCoder) serializedLambda.getCapturedArg(0);
                    return windowedValue7 -> {
                        return FlinkKeyUtils.encodeKey((Integer) windowedValue7.getValue(), varIntCoder3);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static /* synthetic */ int access$112(DoFnOperatorTest doFnOperatorTest, int i) {
        int i2 = doFnOperatorTest.numStartBundleCalled + i;
        doFnOperatorTest.numStartBundleCalled = i2;
        return i2;
    }

    static /* synthetic */ int access$108(DoFnOperatorTest doFnOperatorTest) {
        int i = doFnOperatorTest.numStartBundleCalled;
        doFnOperatorTest.numStartBundleCalled = i + 1;
        return i;
    }
}
