package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.IteratorTemplate;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.python.apache.xerces.impl.xs.SchemaSymbols;
import org.tukaani.xz.common.Util;
import scala.Array$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.io.Codec$;
import scala.io.Source$;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/core-plugins-1.2.0.jar:lib/kafka_2.10-0.8.2.2.jar:kafka/tools/TestLogCleaning$.class
 */
/* compiled from: TestLogCleaning.scala */
/* loaded from: input_file:lib/kafka_2.10-0.8.2.2.jar:kafka/tools/TestLogCleaning$.class */
public final class TestLogCleaning$ {
    public static final TestLogCleaning$ MODULE$ = null;

    static {
        new TestLogCleaning$();
    }

    public void main(String[] strArr) {
        OptionParser optionParser = new OptionParser();
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo(Predef$.MODULE$.long2Long(Util.VLI_MAX), new Long[0]);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(5), new Integer[0]);
        OptionSpec ofType = optionParser.accepts("broker", "Url to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(1), new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo4 = optionParser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0), new Integer[0]);
        OptionSpec ofType2 = optionParser.accepts("zk", "Zk url.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo5 = optionParser.accepts("sleep", "Time to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0), new Integer[0]);
        OptionSpec<?> ofType3 = optionParser.accepts(ArchiveStreamFactory.DUMP, "Dump the message contents of a topic partition that contains test data from this test to standard out.").withRequiredArg().describedAs("directory").ofType(String.class);
        OptionSet parse = optionParser.parse(strArr);
        if (strArr.length == 0) {
            CommandLineUtils$.MODULE$.printUsageAndDie(optionParser, "An integration test for log cleaning.");
        }
        if (parse.has(ofType3)) {
            dumpLog(new File((String) parse.valueOf(ofType3)));
            System.exit(0);
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(optionParser, parse, Predef$.MODULE$.wrapRefArray(new OptionSpec[]{ofType, ofType2, defaultsTo}));
        long longValue = ((Long) parse.valueOf(defaultsTo)).longValue();
        int intValue = ((Integer) parse.valueOf(defaultsTo4)).intValue();
        int intValue2 = ((Integer) parse.valueOf(defaultsTo2)).intValue();
        String str = (String) parse.valueOf(ofType);
        int intValue3 = ((Integer) parse.valueOf(defaultsTo3)).intValue();
        String str2 = (String) parse.valueOf(ofType2);
        int intValue4 = ((Integer) parse.valueOf(defaultsTo5)).intValue();
        String[] strArr2 = (String[]) ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), intValue3).map(new TestLogCleaning$$anonfun$1(new Random().nextInt(Integer.MAX_VALUE)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Producing %d messages...")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(longValue)})));
        File produceMessages = produceMessages(str, strArr2, longValue, intValue2, intValue);
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Sleeping for %d seconds...")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(intValue4)})));
        Thread.sleep(intValue4 * 1000);
        Predef$.MODULE$.println("Consuming messages...");
        File consumeMessages = consumeMessages(str2, strArr2);
        int lineCount = lineCount(produceMessages);
        int lineCount2 = lineCount(consumeMessages);
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(lineCount), BoxesRunTime.boxToInteger(lineCount2), BoxesRunTime.boxToDouble(100 * (1.0d - (lineCount2 / lineCount)))})));
        Predef$.MODULE$.println("De-duplicating and validating output files...");
        validateOutput(produceMessages, consumeMessages);
        produceMessages.delete();
        consumeMessages.delete();
    }

    public void dumpLog(File file) {
        Predef$.MODULE$.require(file.exists(), new TestLogCleaning$$anonfun$dumpLog$1(file));
        Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(file.list()).mo3897sorted(Ordering$String$.MODULE$)).withFilter(new TestLogCleaning$$anonfun$dumpLog$2()).foreach(new TestLogCleaning$$anonfun$dumpLog$3(file));
    }

    public int lineCount(File file) {
        return Source$.MODULE$.fromFile(file, Codec$.MODULE$.fallbackSystemCodec()).getLines().size();
    }

    public void validateOutput(File file, File file2) {
        BufferedReader externalSort = externalSort(file);
        BufferedReader externalSort2 = externalSort(file2);
        IteratorTemplate<TestRecord> valuesIterator = valuesIterator(externalSort);
        IteratorTemplate<TestRecord> valuesIterator2 = valuesIterator(externalSort2);
        File file3 = new File(new StringBuilder().append((Object) file.getAbsolutePath()).append((Object) ".deduped").toString());
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file3), 1048576);
        File file4 = new File(new StringBuilder().append((Object) file2.getAbsolutePath()).append((Object) ".deduped").toString());
        BufferedWriter bufferedWriter2 = new BufferedWriter(new FileWriter(file4), 1048576);
        int i = 0;
        int i2 = 0;
        while (valuesIterator.hasNext() && valuesIterator2.hasNext()) {
            TestRecord next = valuesIterator.next();
            bufferedWriter.write(next.toString());
            bufferedWriter.newLine();
            TestRecord next2 = valuesIterator2.next();
            bufferedWriter2.write(next2.toString());
            bufferedWriter2.newLine();
            if (next == null) {
                if (next2 == null) {
                    i++;
                }
                i2++;
                i++;
            } else {
                if (next.equals(next2)) {
                    i++;
                }
                i2++;
                i++;
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        Predef$.MODULE$.require(!valuesIterator.hasNext(), new TestLogCleaning$$anonfun$validateOutput$1());
        Predef$.MODULE$.require(!valuesIterator2.hasNext(), new TestLogCleaning$$anonfun$validateOutput$2());
        Predef$.MODULE$.println(new StringBuilder().append((Object) "Validated ").append(BoxesRunTime.boxToInteger(i)).append((Object) " values, ").append(BoxesRunTime.boxToInteger(i2)).append((Object) " mismatches.").toString());
        Predef$.MODULE$.require(i2 == 0, new TestLogCleaning$$anonfun$validateOutput$3());
        file3.delete();
        file4.delete();
    }

    public IteratorTemplate<TestRecord> valuesIterator(final BufferedReader bufferedReader) {
        return new IteratorTemplate<TestRecord>(bufferedReader) { // from class: kafka.tools.TestLogCleaning$$anon$1
            private final BufferedReader reader$1;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // kafka.utils.IteratorTemplate
            public TestRecord makeNext() {
                TestRecord testRecord;
                TestRecord readNext = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                while (true) {
                    testRecord = readNext;
                    if (testRecord == null || !testRecord.delete()) {
                        break;
                    }
                    readNext = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                }
                return testRecord == null ? allDone() : testRecord;
            }

            {
                this.reader$1 = bufferedReader;
            }
        };
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x005d, code lost:
    
        return r7;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public kafka.tools.TestRecord readNext(java.io.BufferedReader r5) {
        /*
            r4 = this;
            r0 = r5
            java.lang.String r0 = r0.readLine()
            r6 = r0
            r0 = r6
            if (r0 != 0) goto Ld
            r0 = 0
            r0 = 0
            return r0
        Ld:
            kafka.tools.TestRecord r0 = new kafka.tools.TestRecord
            r1 = r0
            r2 = r6
            r1.<init>(r2)
            r7 = r0
        L16:
            r0 = r4
            r1 = r5
            java.lang.String r0 = r0.peekLine(r1)
            r6 = r0
            r0 = r6
            if (r0 != 0) goto L22
            r0 = r7
            return r0
        L22:
            kafka.tools.TestRecord r0 = new kafka.tools.TestRecord
            r1 = r0
            r2 = r6
            r1.<init>(r2)
            r8 = r0
            r0 = r8
            if (r0 == 0) goto L5c
            r0 = r8
            java.lang.String r0 = r0.topicAndKey()
            r1 = r7
            java.lang.String r1 = r1.topicAndKey()
            r9 = r1
            r1 = r0
            if (r1 != 0) goto L49
        L41:
            r0 = r9
            if (r0 == 0) goto L51
            goto L5c
        L49:
            r1 = r9
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L5c
        L51:
            r0 = r8
            r7 = r0
            r0 = r5
            java.lang.String r0 = r0.readLine()
            goto L16
        L5c:
            r0 = r7
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.tools.TestLogCleaning$.readNext(java.io.BufferedReader):kafka.tools.TestRecord");
    }

    public String peekLine(BufferedReader bufferedReader) {
        bufferedReader.mark(4096);
        String readLine = bufferedReader.readLine();
        bufferedReader.reset();
        return readLine;
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [kafka.tools.TestLogCleaning$$anon$2] */
    public BufferedReader externalSort(File file) {
        final Process start = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder().append((Object) "--temporary-directory=").append((Object) System.getProperty("java.io.tmpdir")).toString(), file.getAbsolutePath()).start();
        new Thread(start) { // from class: kafka.tools.TestLogCleaning$$anon$2
            private final Process process$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (this.process$1.waitFor() != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                }
            }

            {
                this.process$1 = start;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(start.getInputStream()), 10485760);
    }

    public File produceMessages(String str, String[] strArr, long j, int i, int i2) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, SchemaSymbols.ATTVAL_TRUE);
        properties.setProperty("bootstrap.servers", str);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        File createTempFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt");
        Predef$.MODULE$.println(new StringBuilder().append((Object) "Logging produce requests to ").append((Object) createTempFile.getAbsolutePath()).toString());
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(createTempFile), 1048576);
        new RichLong(Predef$.MODULE$.longWrapper(0L)).until((Object) BoxesRunTime.boxToLong(j * strArr.length)).foreach(new TestLogCleaning$$anonfun$produceMessages$1(strArr, i2, kafkaProducer, new Random(1L), (int) (j / i), bufferedWriter));
        bufferedWriter.close();
        kafkaProducer.close();
        return createTempFile;
    }

    public ZookeeperConsumerConnector makeConsumer(String str, String[] strArr) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, new StringBuilder().append((Object) "log-cleaner-test-").append(BoxesRunTime.boxToInteger(new Random().nextInt(Integer.MAX_VALUE))).toString());
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("consumer.timeout.ms", BoxesRunTime.boxToInteger(20000).toString());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
        return new ZookeeperConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
    }

    public File consumeMessages(String str, String[] strArr) {
        ZookeeperConsumerConnector makeConsumer = makeConsumer(str, strArr);
        Map createMessageStreams = makeConsumer.createMessageStreams(Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(strArr).map(new TestLogCleaning$$anonfun$2(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).toMap(Predef$.MODULE$.conforms()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        File createTempFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt");
        Predef$.MODULE$.println(new StringBuilder().append((Object) "Logging consumed messages to ").append((Object) createTempFile.getAbsolutePath()).toString());
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(createTempFile));
        Predef$.MODULE$.refArrayOps(strArr).foreach(new TestLogCleaning$$anonfun$consumeMessages$1(createMessageStreams, bufferedWriter));
        bufferedWriter.close();
        makeConsumer.shutdown();
        return createTempFile;
    }

    private TestLogCleaning$() {
        MODULE$ = this;
    }
}
