package org.apache.flink.streaming.examples.statemachine.generator;

import java.io.IOException;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.class */
public class StandaloneThreadedGenerator {

    /* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator$GeneratorThread.class */
    private static class GeneratorThread extends Thread {
        private final Collector<Event> out;
        private final int minAddress;
        private final int maxAddress;
        private long delay;
        private long count;
        private volatile boolean running = true;
        private volatile boolean injectInvalidNext;

        GeneratorThread(Collector<Event> collector, int i, int i2) {
            this.out = collector;
            this.minAddress = i;
            this.maxAddress = i2;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            EventsGenerator eventsGenerator = new EventsGenerator();
            while (this.running) {
                if (this.injectInvalidNext) {
                    this.injectInvalidNext = false;
                    Event nextInvalid = eventsGenerator.nextInvalid();
                    if (nextInvalid != null) {
                        this.out.collect(nextInvalid);
                    }
                } else {
                    this.out.collect(eventsGenerator.next(this.minAddress, this.maxAddress));
                }
                this.count++;
                if (this.delay > 0) {
                    try {
                        Thread.sleep(this.delay);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        public long currentCount() {
            return this.count;
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }

        public void setDelay(long j) {
            this.delay = j;
        }

        public void sendInvalidStateTransition() {
            this.injectInvalidNext = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator$ThroughputLogger.class */
    private static class ThroughputLogger extends Thread {
        private final GeneratorThread[] generators;
        private volatile boolean running = true;

        ThroughputLogger(GeneratorThread[] generatorThreadArr) {
            this.generators = generatorThreadArr;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long j = 0;
            long currentTimeMillis = System.currentTimeMillis();
            while (this.running) {
                try {
                    Thread.sleep(1000L);
                    long currentTimeMillis2 = System.currentTimeMillis();
                    long j2 = 0;
                    for (GeneratorThread generatorThread : this.generators) {
                        j2 += generatorThread.currentCount();
                    }
                    double d = (j2 - j) / ((currentTimeMillis2 - currentTimeMillis) / 1000.0d);
                    currentTimeMillis = currentTimeMillis2;
                    j = j2;
                    System.out.println(d + " / sec");
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }
    }

    public static void runGenerator(Collector<Event>[] collectorArr) throws IOException {
        GeneratorThread[] generatorThreadArr = new GeneratorThread[collectorArr.length];
        int length = Integer.MAX_VALUE / collectorArr.length;
        for (int i = 0; i < generatorThreadArr.length; i++) {
            int i2 = length * i;
            GeneratorThread generatorThread = new GeneratorThread(collectorArr[i], i2, i2 + length);
            generatorThreadArr[i] = generatorThread;
            generatorThread.setName("Generator " + i);
        }
        long j = 2;
        int i3 = 0;
        boolean z = true;
        for (GeneratorThread generatorThread2 : generatorThreadArr) {
            generatorThread2.setDelay(2L);
            generatorThread2.start();
        }
        ThroughputLogger throughputLogger = new ThroughputLogger(generatorThreadArr);
        throughputLogger.start();
        System.out.println("Commands:");
        System.out.println(" -> q : Quit");
        System.out.println(" -> + : increase latency");
        System.out.println(" -> - : decrease latency");
        System.out.println(" -> e : inject invalid state transition");
        while (z) {
            switch (System.in.read()) {
                case 43:
                    j = Math.max(j * 2, 1L);
                    System.out.println("Delay is " + j);
                    for (GeneratorThread generatorThread3 : generatorThreadArr) {
                        generatorThread3.setDelay(j);
                    }
                    break;
                case 45:
                    j /= 2;
                    System.out.println("Delay is " + j);
                    for (GeneratorThread generatorThread4 : generatorThreadArr) {
                        generatorThread4.setDelay(j);
                    }
                    break;
                case 101:
                    System.out.println("Injecting erroneous transition ...");
                    generatorThreadArr[i3].sendInvalidStateTransition();
                    i3 = (i3 + 1) % generatorThreadArr.length;
                    break;
                case 113:
                    System.out.println("Quitting...");
                    z = false;
                    break;
            }
        }
        throughputLogger.shutdown();
        for (GeneratorThread generatorThread5 : generatorThreadArr) {
            generatorThread5.shutdown();
            try {
                generatorThread5.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
