package cascading.local.tap.kafka;

import cascading.flow.FlowProcess;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:cascading/local/tap/kafka/TextKafkaScheme.class */
public class TextKafkaScheme extends KafkaScheme<String, String, Context, Context> {
    public static final Fields TOPIC_FIELDS = new Fields("topic", String.class);
    public static final Fields OFFSET_FIELDS = new Fields("offset", Long.TYPE);
    public static final Fields KEY_FIELDS = new Fields("key", String.class);
    public static final Fields VALUE_FIELDS = new Fields("value", String.class);
    public static final Fields TIMESTAMP_FIELDS = new Fields("timestamp", Long.TYPE);
    public static final Fields TIMESTAMP_TYPE_FIELDS = new Fields("timestampType", String.class);
    public static final Fields DEFAULT_SOURCE_FIELDS = TOPIC_FIELDS.append(OFFSET_FIELDS).append(KEY_FIELDS).append(VALUE_FIELDS).append(TIMESTAMP_FIELDS).append(TIMESTAMP_TYPE_FIELDS);

    /* loaded from: input_file:cascading/local/tap/kafka/TextKafkaScheme$Context.class */
    class Context {
        String[] topics;

        public Context(String[] strArr) {
            this.topics = strArr;
        }
    }

    public TextKafkaScheme() {
        super(DEFAULT_SOURCE_FIELDS);
    }

    public TextKafkaScheme(Fields fields) {
        super(fields);
        if (fields.size() != 6) {
            throw new IllegalArgumentException("wrong number of source fields, requires 6, got: " + fields);
        }
    }

    public void sourceConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, Iterator<ConsumerRecord<String, String>>, Producer<String, String>> tap, Properties properties) {
        properties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
    }

    public void sinkConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, Iterator<ConsumerRecord<String, String>>, Producer<String, String>> tap, Properties properties) {
        properties.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
        properties.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
    }

    public void sourcePrepare(FlowProcess<? extends Properties> flowProcess, SourceCall<Context, Iterator<ConsumerRecord<String, String>>> sourceCall) throws IOException {
        sourceCall.setContext(new Context(((KafkaTap) sourceCall.getTap()).getTopics()));
    }

    public void sinkPrepare(FlowProcess<? extends Properties> flowProcess, SinkCall<Context, Producer<String, String>> sinkCall) throws IOException {
        sinkCall.setContext(new Context(((KafkaTap) sinkCall.getTap()).getTopics()));
    }

    public boolean source(FlowProcess<? extends Properties> flowProcess, SourceCall<Context, Iterator<ConsumerRecord<String, String>>> sourceCall) throws IOException {
        Iterator it = (Iterator) sourceCall.getInput();
        if (!it.hasNext()) {
            return false;
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
        TupleEntry incomingEntry = sourceCall.getIncomingEntry();
        incomingEntry.setObject(0, consumerRecord.topic());
        incomingEntry.setObject(1, Long.valueOf(consumerRecord.offset()));
        incomingEntry.setObject(2, consumerRecord.key());
        incomingEntry.setObject(3, consumerRecord.value());
        incomingEntry.setObject(4, Long.valueOf(consumerRecord.timestamp()));
        incomingEntry.setObject(5, consumerRecord.timestampType());
        return true;
    }

    public void sink(FlowProcess<? extends Properties> flowProcess, SinkCall<Context, Producer<String, String>> sinkCall) throws IOException {
        String string = sinkCall.getOutgoingEntry().getString(0);
        String string2 = sinkCall.getOutgoingEntry().getString(1);
        for (String str : ((Context) sinkCall.getContext()).topics) {
            ((Producer) sinkCall.getOutput()).send(new ProducerRecord(str, string, string2));
        }
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sinkConfInit((FlowProcess<? extends Properties>) flowProcess, (Tap<Properties, Iterator<ConsumerRecord<String, String>>, Producer<String, String>>) tap, (Properties) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sourceConfInit((FlowProcess<? extends Properties>) flowProcess, (Tap<Properties, Iterator<ConsumerRecord<String, String>>, Producer<String, String>>) tap, (Properties) obj);
    }
}
