package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.class */
public class StreamGroupedFoldTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedFoldTest$MyFolder.class */
    private static class MyFolder implements FoldFunction<Integer, String> {
        private MyFolder() {
        }

        public String fold(String str, Integer num) throws Exception {
            return str + num.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedFoldTest$TestOpenCloseFoldFunction.class */
    private static class TestOpenCloseFoldFunction extends RichFoldFunction<Integer, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseFoldFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String fold(String str, Integer num) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return str + num;
        }
    }

    @Test
    public void testGroupedFold() throws Exception {
        KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>() { // from class: org.apache.flink.streaming.api.operators.StreamGroupedFoldTest.1
            public String getKey(Integer num) {
                return num.toString();
            }
        };
        StreamGroupedFold streamGroupedFold = new StreamGroupedFold(new MyFolder(), "100");
        streamGroupedFold.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(streamGroupedFold, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 1));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 2));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(0 + 2));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 3));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 4));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 0 + 5));
        concurrentLinkedQueue.add(new StreamRecord("1001", 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord("10011", 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("1002", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("10022", 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord("1003", 0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.StreamGroupedFoldTest.2
            public Integer getKey(Integer num) {
                return num;
            }
        };
        StreamGroupedFold streamGroupedFold = new StreamGroupedFold(new TestOpenCloseFoldFunction(), "init");
        streamGroupedFold.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(streamGroupedFold, keySelector, BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0L));
        keyedOneInputStreamOperatorTestHarness.close();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
        Assert.assertTrue("Output contains no elements.", keyedOneInputStreamOperatorTestHarness.getOutput().size() > 0);
    }
}
