package cascading.aws.s3.logs;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.local.tap.aws.s3.S3FileCheckpointer;
import cascading.local.tap.aws.s3.S3Tap;
import cascading.local.tap.kafka.KafkaTap;
import cascading.local.tap.kafka.TextKafkaScheme;
import cascading.operation.Debug;
import cascading.operation.regex.RegexParser;
import cascading.operation.text.DateFormatter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextDelimited;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.local.DirTap;
import cascading.tap.local.PartitionTap;
import cascading.tap.partition.DelimitedPartition;
import cascading.tuple.Fields;
import cascading.tuple.type.DateType;
import java.io.IOException;
import java.net.URI;
import java.util.TimeZone;

/* loaded from: input_file:cascading/aws/s3/logs/Main.class */
public class Main {
    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
    public static final String DD_MMM_YYYY = "dd-MMM-yyyy";
    public static final DateType DMY = new DateType(DD_MMM_YYYY, UTC);
    public static final Fields KEY = new Fields("date", DMY);
    public static final Fields LINE = new Fields("line", String.class);
    public static final Fields KEY_LINE = KEY.append(LINE);

    public static void main(String[] strArr) throws IOException {
        if (strArr.length < 3) {
            return;
        }
        System.out.println("source s3 uri = " + strArr[0]);
        System.out.println("kafka host = " + strArr[1]);
        System.out.println("sink file path = " + strArr[2]);
        if (strArr.length == 4) {
            System.out.println("checkpoint file path = " + strArr[3]);
        }
        S3Tap s3Tap = new S3Tap(new TextLine(), strArr.length == 4 ? new S3FileCheckpointer() : new S3FileCheckpointer(strArr[3]), URI.create(strArr[0]));
        KafkaTap kafkaTap = new KafkaTap(new TextKafkaScheme(TextKafkaScheme.TOPIC_FIELDS.append(TextKafkaScheme.OFFSET_FIELDS).append(KEY_LINE)), strArr[1], "parsers", new String[]{"logs"});
        PartitionTap partitionTap = new PartitionTap(new DirTap(new TextDelimited(true, ",", "\""), strArr[2], SinkMode.UPDATE), new DelimitedPartition(KEY.append(S3Logs.OPERATION), "/", "logs.csv"));
        Each each = new Each(new Each(new Each(new Pipe("head"), new Fields(new Comparable[]{"line"}), new RegexParser(S3Logs.TIME, S3Logs.REGEX, new int[]{3}), new Fields(new Comparable[]{"time", "line"})), S3Logs.TIME, new DateFormatter(KEY, DD_MMM_YYYY, UTC), KEY_LINE), new Debug(true));
        Flow connect = new LocalFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("ingress")).addSource(each, s3Tap).addSink(each, kafkaTap).addTail(each));
        connect.start();
        Each each2 = new Each(new Each(new Pipe("head"), new Fields(new Comparable[]{"line"}), new RegexParser(S3Logs.FIELDS, S3Logs.REGEX), KEY.append(S3Logs.FIELDS)), new Debug(true));
        Flow connect2 = new LocalFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("egress")).addSource(each2, kafkaTap).addSink(each2, partitionTap).addTail(each2));
        connect2.start();
        connect2.complete();
        System.out.println("completed egress");
        connect.complete();
        System.out.println("completed ingress");
    }
}
