package org.apache.flink.test.recovery;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.class */
public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailureRecoveryTest {
    private static final int DATA_COUNT = 10000;

    /* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase$CheckpointedSink.class */
    private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
        private long stepSize;
        private long congruence;
        private long toCollect;
        private long collected = 0;
        private long end;

        public CheckpointedSink(long j) {
            this.end = j;
        }

        public void open(Configuration configuration) throws IOException {
            this.stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
            this.congruence = getRuntimeContext().getIndexOfThisSubtask();
            this.toCollect = this.end % this.stepSize > this.congruence ? (this.end / this.stepSize) + 1 : this.end / this.stepSize;
        }

        public void invoke(Long l) throws Exception {
            long j = (this.collected * this.stepSize) + this.congruence;
            Assert.assertTrue("Value did not match expected value. " + j + " != " + l, l.equals(Long.valueOf(j)));
            this.collected++;
            if (this.collected > this.toCollect) {
                Assert.fail("Collected <= toCollect: " + this.collected + " > " + this.toCollect);
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m542snapshotState(long j, long j2) throws Exception {
            return Long.valueOf(this.collected);
        }

        public void restoreState(Long l) {
            this.collected = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase$SleepyDurableGenerateSequence.class */
    public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
        private static final long serialVersionUID = 1;
        private static final long SLEEP_TIME = 50;
        private final File coordinateDir;
        private final long end;
        private long collected;
        private volatile boolean isRunning = true;

        public SleepyDurableGenerateSequence(File file, long j) {
            this.coordinateDir = file;
            this.end = j;
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            Object checkpointLock = sourceContext.getCheckpointLock();
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            long numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
            long indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
            long j = this.end % numberOfParallelSubtasks > indexOfThisSubtask ? (this.end / numberOfParallelSubtasks) + serialVersionUID : this.end / numberOfParallelSubtasks;
            File file = new File(this.coordinateDir, "proceed");
            boolean z = true;
            while (this.isRunning && this.collected < j) {
                if (z) {
                    if (file.exists()) {
                        z = false;
                    } else {
                        Thread.sleep(SLEEP_TIME);
                    }
                }
                synchronized (checkpointLock) {
                    sourceContext.collect(Long.valueOf((this.collected * numberOfParallelSubtasks) + indexOfThisSubtask));
                    this.collected += serialVersionUID;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m543snapshotState(long j, long j2) throws Exception {
            return Long.valueOf(this.collected);
        }

        public void restoreState(Long l) {
            this.collected = l.longValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase$StatefulMapper.class */
    public static class StatefulMapper extends RichMapFunction<Long, Long> implements Checkpointed<Integer> {
        private File coordinateDir;
        private boolean markerCreated = false;
        private boolean restored = false;

        public StatefulMapper(File file) {
            this.coordinateDir = file;
        }

        public Long map(Long l) throws Exception {
            if (!this.markerCreated) {
                AbstractProcessFailureRecoveryTest.touchFile(new File(this.coordinateDir, "ready_" + getRuntimeContext().getIndexOfThisSubtask()));
                this.markerCreated = true;
            }
            return l;
        }

        public void close() {
            if (this.restored) {
                return;
            }
            Assert.fail();
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Integer m544snapshotState(long j, long j2) throws Exception {
            return 1;
        }

        public void restoreState(Integer num) {
            this.restored = true;
        }
    }

    @Override // org.apache.flink.test.recovery.AbstractProcessFailureRecoveryTest
    public void testProgram(int i, File file) throws Exception {
        File file2 = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), UUID.randomUUID().toString());
        Assert.assertTrue("Cannot create directory for checkpoints", file2.mkdirs());
        StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", i, new String[0]);
        createRemoteEnvironment.setParallelism(4);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        createRemoteEnvironment.setNumberOfExecutionRetries(1);
        createRemoteEnvironment.enableCheckpointing(200L);
        createRemoteEnvironment.setStateHandleProvider(FileStateHandle.createProvider(file2.getAbsolutePath()));
        createRemoteEnvironment.addSource(new SleepyDurableGenerateSequence(file, 10000L)).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.ProcessFailureStreamingRecoveryITCase.1
            public Long map(Long l) throws Exception {
                return l;
            }
        }).startNewChain().map(new StatefulMapper(file)).addSink(new CheckpointedSink(10000L));
        try {
            createRemoteEnvironment.execute();
            if (file2.exists()) {
                FileUtils.deleteDirectory(file2);
            }
        } catch (Throwable th) {
            if (file2.exists()) {
                FileUtils.deleteDirectory(file2);
            }
            throw th;
        }
    }
}
