package nl.tradecloud.kafka;

import akka.actor.ActorRef;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings$;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Consumer$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.remote.WireFormats;
import akka.stream.scaladsl.Sink$;
import akka.util.Timeout;
import java.io.NotSerializableException;
import nl.tradecloud.kafka.command.Subscribe;
import nl.tradecloud.kafka.response.SubscribeAck;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaConsumer.scala */
/* loaded from: input_file:nl/tradecloud/kafka/KafkaConsumer$$anonfun$subscribing$1.class */
public final class KafkaConsumer$$anonfun$subscribing$1 extends AbstractPartialFunction<Object, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ KafkaConsumer $outer;

    /* JADX WARN: Multi-variable type inference failed */
    public final <A1, B1> B1 applyOrElse(A1 a1, Function1<A1, B1> function1) {
        Object apply;
        if (a1 instanceof Subscribe) {
            Subscribe subscribe = (Subscribe) a1;
            ActorRef sender = this.$outer.sender();
            this.$outer.log().info("Start KafkaConsumer, with group={}, topics={}, prefixedTopics={}", this.$outer.nl$tradecloud$kafka$KafkaConsumer$$group, this.$outer.nl$tradecloud$kafka$KafkaConsumer$$topics.mkString(", "), this.$outer.prefixedTopics().mkString(", "));
            this.$outer.consumer_$eq(new Some(Consumer$.MODULE$.committableSource(ConsumerSettings$.MODULE$.apply(this.$outer.context().system().settings().config().getConfig("akka.kafka.consumer"), new ByteArrayDeserializer(), new ByteArrayDeserializer()).withBootstrapServers(this.$outer.nl$tradecloud$kafka$KafkaConsumer$$config.bootstrapServers()).withGroupId(this.$outer.nl$tradecloud$kafka$KafkaConsumer$$group).withProperty("auto.offset.reset", "earliest"), Subscriptions$.MODULE$.topics(this.$outer.prefixedTopics())).map(committableMessage -> {
                this.$outer.log().debug("Received message value={}, key={}", committableMessage.record().value(), committableMessage.record().key());
                WireFormats.SerializedMessage parseFrom = WireFormats.SerializedMessage.parseFrom((byte[]) committableMessage.record().value());
                if (parseFrom != null) {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(committableMessage), KafkaMessageSerializer$.MODULE$.deserialize(this.$outer.nl$tradecloud$kafka$KafkaConsumer$$extendedSystem, parseFrom));
                }
                throw new NotSerializableException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to deserialize msg ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{committableMessage.record().value()})));
            }).mapAsync(2, tuple2 -> {
                if (tuple2 != null) {
                    ConsumerMessage.CommittableMessage committableMessage2 = (ConsumerMessage.CommittableMessage) tuple2._1();
                    Object _2 = tuple2._2();
                    if (committableMessage2 != null && _2 != null) {
                        this.$outer.log().debug("Sending msg={}", _2);
                        ActorRef ask = package$.MODULE$.ask(subscribe.ref());
                        return AskableActorRef$.MODULE$.ask$extension1(ask, _2, new Timeout(this.$outer.nl$tradecloud$kafka$KafkaConsumer$$config.acknowledgeTimeout()), AskableActorRef$.MODULE$.ask$default$3$extension(ask, _2)).flatMap(obj -> {
                            Future successful;
                            if (BoxesRunTime.equals(subscribe.acknowledgeMsg(), obj)) {
                                this.$outer.log().debug("Committing offset={}", BoxesRunTime.boxToLong(committableMessage2.record().offset()));
                                successful = committableMessage2.committableOffset().commitScaladsl();
                            } else {
                                this.$outer.log().warning("Received invalid acknowledge msg={}", obj);
                                successful = Future$.MODULE$.successful(obj);
                            }
                            return successful;
                        }, this.$outer.context().dispatcher());
                    }
                }
                throw new MatchError(tuple2);
            }).to(Sink$.MODULE$.ignore()).run(this.$outer.materializer())));
            this.$outer.consumer().map(control -> {
                return control.isShutdown();
            }).foreach(future -> {
                $anonfun$applyOrElse$5(this, future);
                return BoxedUnit.UNIT;
            });
            this.$outer.context().become(this.$outer.running());
            this.$outer.context().watch(subscribe.ref());
            akka.actor.package$.MODULE$.actorRef2Scala(sender).$bang(new SubscribeAck(subscribe), this.$outer.self());
            apply = BoxedUnit.UNIT;
        } else {
            apply = function1.apply(a1);
        }
        return (B1) apply;
    }

    public final boolean isDefinedAt(Object obj) {
        return obj instanceof Subscribe;
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$5(KafkaConsumer$$anonfun$subscribing$1 kafkaConsumer$$anonfun$subscribing$1, Future future) {
        kafkaConsumer$$anonfun$subscribing$1.$outer.nl$tradecloud$kafka$KafkaConsumer$$terminateWhenDone(future);
    }

    public KafkaConsumer$$anonfun$subscribing$1(KafkaConsumer kafkaConsumer) {
        if (kafkaConsumer == null) {
            throw null;
        }
        this.$outer = kafkaConsumer;
    }
}
