package org.apache.flink.test.checkpointing;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase.class */
public class StreamCheckpointingITCase {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 3;
    private static final int PARALLELISM = 6;
    private static ForkableFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$OnceFailingReducer.class */
    private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
        private static volatile boolean hasFailed = false;
        private final long numElements;
        private long failurePos;
        private long count;

        OnceFailingReducer(long j) {
            this.numElements = j;
        }

        public void open(Configuration configuration) {
            long numberOfParallelSubtasks = (long) ((0.4d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = (new Random().nextLong() % (((long) ((0.7d * this.numElements) / getRuntimeContext().getNumberOfParallelSubtasks())) - numberOfParallelSubtasks)) + numberOfParallelSubtasks;
            this.count = 0L;
        }

        public PrefixCount reduce(PrefixCount prefixCount, PrefixCount prefixCount2) throws Exception {
            this.count++;
            if (hasFailed || this.count < this.failurePos) {
                prefixCount.count += prefixCount2.count;
                return prefixCount;
            }
            hasFailed = true;
            throw new Exception("Test Failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$PrefixCount.class */
    public static class PrefixCount {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String str, String str2, long j) {
            this.prefix = str;
            this.value = str2;
            this.count = j;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$StatefulCounterFunction.class */
    private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
        static final long[] counts = new long[StreamCheckpointingITCase.PARALLELISM];
        private long count;

        private StatefulCounterFunction() {
            this.count = 0L;
        }

        public PrefixCount map(PrefixCount prefixCount) throws Exception {
            this.count++;
            return prefixCount;
        }

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m475snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$StringGeneratingSourceFunction.class */
    private static class StringGeneratingSourceFunction extends RichSourceFunction<String> implements Checkpointed<Long>, ParallelSourceFunction<String> {
        private final long numElements;
        private Random rnd;
        private StringBuilder stringBuilder;
        private long index;
        private int step;
        private volatile boolean isRunning;
        static final long[] counts = new long[StreamCheckpointingITCase.PARALLELISM];

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.index;
        }

        StringGeneratingSourceFunction(long j) {
            this.numElements = j;
        }

        public void open(Configuration configuration) {
            this.rnd = new Random();
            this.stringBuilder = new StringBuilder();
            this.step = getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = getRuntimeContext().getIndexOfThisSubtask();
            }
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            while (this.isRunning && this.index < this.numElements) {
                this.stringBuilder.setLength(0);
                this.stringBuilder.append((char) ((this.index % 40) + 40));
                String randomString = randomString(this.stringBuilder, this.rnd);
                synchronized (checkpointLock) {
                    this.index += this.step;
                    sourceContext.collect(randomString);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m477snapshotState(long j, long j2) {
            return Long.valueOf(this.index);
        }

        public void restoreState(Long l) {
            this.index = l.longValue();
        }

        private static String randomString(StringBuilder sb, Random random) {
            int nextInt = random.nextInt(10) + 5;
            for (int i = 0; i < nextInt; i++) {
                sb.append((char) (random.nextInt(20000) + 33));
            }
            return sb.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$StringPrefixCountRichMapFunction.class */
    private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
        static final long[] counts = new long[StreamCheckpointingITCase.PARALLELISM];
        private long count;

        private StringPrefixCountRichMapFunction() {
            this.count = 0L;
        }

        public PrefixCount map(String str) {
            this.count++;
            return new PrefixCount(str.substring(0, 1), str, 1L);
        }

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m479snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamCheckpointingITCase$StringRichFilterFunction.class */
    private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
        static final long[] counts = new long[StreamCheckpointingITCase.PARALLELISM];
        private long count;

        private StringRichFilterFunction() {
            this.count = 0L;
        }

        public boolean filter(String str) {
            this.count++;
            return str.length() < 100;
        }

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m481snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("localinstancemanager.numtaskmanager", NUM_TASK_MANAGERS);
            configuration.setInteger("taskmanager.numberOfTaskSlots", NUM_TASK_SLOTS);
            configuration.setString("execution-retries.delay", "0 ms");
            configuration.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(configuration, false);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to start test cluster: " + e.getMessage());
        }
    }

    @AfterClass
    public static void shutdownCluster() {
        try {
            cluster.shutdown();
            cluster = null;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to stop test cluster: " + e.getMessage());
        }
    }

    @Test
    public void runCheckpointedProgram() {
        Assert.assertTrue("Broken test setup", true);
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.enableCheckpointing(500L);
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.addSource(new StringGeneratingSourceFunction(10000000L)).filter(new StringRichFilterFunction()).map(new StringPrefixCountRichMapFunction()).startNewChain().map(new StatefulCounterFunction()).groupBy(new String[]{"prefix"}).reduce(new OnceFailingReducer(10000000L)).addSink(new RichSinkFunction<PrefixCount>() { // from class: org.apache.flink.test.checkpointing.StreamCheckpointingITCase.1
                private Map<Character, Long> counts = new HashMap();

                public void invoke(PrefixCount prefixCount) {
                    Character valueOf = Character.valueOf(prefixCount.prefix.charAt(0));
                    Long l = this.counts.get(valueOf);
                    if (l == null) {
                        this.counts.put(valueOf, Long.valueOf(prefixCount.count));
                    } else {
                        this.counts.put(valueOf, Long.valueOf(Math.max(l.longValue(), prefixCount.count)));
                    }
                }
            });
            createRemoteEnvironment.execute();
            long j = 0;
            for (long j2 : StringRichFilterFunction.counts) {
                j += j2;
            }
            long j3 = 0;
            for (long j4 : StringPrefixCountRichMapFunction.counts) {
                j3 += j4;
            }
            long j5 = 0;
            for (long j6 : StatefulCounterFunction.counts) {
                j5 += j6;
            }
            Assert.assertEquals(10000000L, j);
            Assert.assertEquals(10000000L, j3);
            Assert.assertEquals(10000000L, j5);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
