package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase.class */
public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
    final long NUM_STRINGS = 10000000;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$OnceFailingAggregator.class */
    private static class OnceFailingAggregator extends RichFlatMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount> implements Checkpointed<HashMap<String, StreamFaultToleranceTestBase.PrefixCount>>, CheckpointListener {
        static boolean wasCheckpointedBeforeFailure = false;
        private static volatile boolean hasFailed = false;
        private final HashMap<String, StreamFaultToleranceTestBase.PrefixCount> aggregationMap = new HashMap<>();
        private long failurePos;
        private long count;
        private boolean wasCheckpointed;

        OnceFailingAggregator(long j) {
            this.failurePos = j;
        }

        public void open(Configuration configuration) {
            this.count = 0L;
        }

        public void flatMap(StreamFaultToleranceTestBase.PrefixCount prefixCount, Collector<StreamFaultToleranceTestBase.PrefixCount> collector) throws Exception {
            this.count++;
            if (!hasFailed && this.count >= this.failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
                wasCheckpointedBeforeFailure = this.wasCheckpointed;
                hasFailed = true;
                throw new Exception("Test Failure");
            }
            StreamFaultToleranceTestBase.PrefixCount prefixCount2 = this.aggregationMap.get(prefixCount.prefix);
            if (prefixCount2 == null) {
                this.aggregationMap.put(prefixCount.prefix, prefixCount);
                collector.collect(prefixCount);
            } else {
                prefixCount2.count += prefixCount.count;
                collector.collect(prefixCount2);
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public HashMap<String, StreamFaultToleranceTestBase.PrefixCount> m563snapshotState(long j, long j2) {
            return this.aggregationMap;
        }

        public void restoreState(HashMap<String, StreamFaultToleranceTestBase.PrefixCount> hashMap) {
            this.aggregationMap.putAll(hashMap);
        }

        public void notifyCheckpointComplete(long j) {
            this.wasCheckpointed = true;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((StreamFaultToleranceTestBase.PrefixCount) obj, (Collector<StreamFaultToleranceTestBase.PrefixCount>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$StatefulCounterFunction.class */
    private static class StatefulCounterFunction extends RichMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount> implements Checkpointed<Long> {
        static final long[] counts = new long[12];
        private long count;

        private StatefulCounterFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(StreamFaultToleranceTestBase.PrefixCount prefixCount) throws Exception {
            this.count++;
            return prefixCount;
        }

        public void close() throws IOException {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m565snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$StringGeneratingSourceFunction.class */
    private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> implements CheckpointedAsynchronously<Integer> {
        private final long numElements;
        private int index;
        private volatile boolean isRunning = true;

        StringGeneratingSourceFunction(long j) {
            this.numElements = j;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            Random random = new Random();
            StringBuilder sb = new StringBuilder();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = getRuntimeContext().getIndexOfThisSubtask();
            }
            while (this.isRunning && this.index < this.numElements) {
                char c = (char) ((this.index % 40) + 40);
                sb.setLength(0);
                sb.append(c);
                String randomString = randomString(sb, random);
                synchronized (checkpointLock) {
                    this.index += numberOfParallelSubtasks;
                    sourceContext.collect(randomString);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private static String randomString(StringBuilder sb, Random random) {
            int nextInt = random.nextInt(10) + 5;
            for (int i = 0; i < nextInt; i++) {
                sb.append((char) (random.nextInt(20000) + 33));
            }
            return sb.toString();
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Integer m566snapshotState(long j, long j2) {
            return Integer.valueOf(this.index);
        }

        public void restoreState(Integer num) {
            this.index = num.intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$StringPrefixCountRichMapFunction.class */
    private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, StreamFaultToleranceTestBase.PrefixCount> implements CheckpointedAsynchronously<Long> {
        static final long[] counts = new long[12];
        private long count;

        private StringPrefixCountRichMapFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(String str) {
            this.count++;
            return new StreamFaultToleranceTestBase.PrefixCount(str.substring(0, 1), str, 1L);
        }

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m568snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$StringRichFilterFunction.class */
    private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
        static final long[] counts = new long[12];
        private long count;

        private StringRichFilterFunction() {
        }

        public boolean filter(String str) throws Exception {
            this.count++;
            return str.length() < 100;
        }

        public void close() {
            counts[getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m570snapshotState(long j, long j2) {
            return Long.valueOf(this.count);
        }

        public void restoreState(Long l) {
            this.count = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StateCheckpointedITCase$ValidatingSink.class */
    private static class ValidatingSink extends RichSinkFunction<StreamFaultToleranceTestBase.PrefixCount> implements Checkpointed<HashMap<Character, Long>> {
        private static Map<Character, Long>[] maps = new Map[12];
        private HashMap<Character, Long> counts;

        private ValidatingSink() {
            this.counts = new HashMap<>();
        }

        public void invoke(StreamFaultToleranceTestBase.PrefixCount prefixCount) {
            Character valueOf = Character.valueOf(prefixCount.prefix.charAt(0));
            Long l = this.counts.get(valueOf);
            if (l == null) {
                this.counts.put(valueOf, Long.valueOf(prefixCount.count));
            } else {
                this.counts.put(valueOf, Long.valueOf(Math.max(l.longValue(), prefixCount.count)));
            }
        }

        public void close() throws Exception {
            maps[getRuntimeContext().getIndexOfThisSubtask()] = this.counts;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public HashMap<Character, Long> m572snapshotState(long j, long j2) {
            return this.counts;
        }

        public void restoreState(HashMap<Character, Long> hashMap) {
            this.counts.putAll(hashMap);
        }
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void testProgram(StreamExecutionEnvironment streamExecutionEnvironment) {
        Assert.assertTrue("Broken test setup", true);
        long nextLong = (new Random().nextLong() % 250000) + 333333;
        streamExecutionEnvironment.enableCheckpointing(200L);
        streamExecutionEnvironment.addSource(new StringGeneratingSourceFunction(10000000L)).filter(new StringRichFilterFunction()).map(new StringPrefixCountRichMapFunction()).startNewChain().map(new StatefulCounterFunction()).keyBy(new String[]{"prefix"}).flatMap(new OnceFailingAggregator(nextLong)).addSink(new ValidatingSink());
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void postSubmit() {
        if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
            LOG.warn("Test inconclusive: failure occurred before first checkpoint");
        }
        long j = 0;
        for (long j2 : StringRichFilterFunction.counts) {
            j += j2;
        }
        long j3 = 0;
        for (long j4 : StringPrefixCountRichMapFunction.counts) {
            j3 += j4;
        }
        long j5 = 0;
        for (long j6 : StatefulCounterFunction.counts) {
            j5 += j6;
        }
        Assert.assertEquals(10000000L, j);
        Assert.assertEquals(10000000L, j3);
        Assert.assertEquals(10000000L, j5);
        for (Map map : ValidatingSink.maps) {
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(250000L, ((Long) it.next()).longValue());
            }
        }
    }
}
