package nl.tradecloud.kafka;

import akka.Done;
import akka.NotUsed;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.LoggingAdapter;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerMessage$CommittableOffsetBatch$;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Consumer$;
import akka.pattern.package$;
import akka.stream.FanInShape2;
import akka.stream.FanOutShape2;
import akka.stream.FlowShape;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches$;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.UniqueKillSwitch;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Unzip$;
import akka.stream.scaladsl.Zip$;
import nl.tradecloud.kafka.command.Subscribe;
import nl.tradecloud.kafka.config.KafkaConfig;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.PartialFunction$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaSubscriberActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mg!B\u0001\u0003\u0001\tA!\u0001F&bM.\f7+\u001e2tGJL'-\u001a:BGR|'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\u000biJ\fG-Z2m_V$'\"A\u0004\u0002\u00059d7\u0003\u0002\u0001\n\u001f]\u0001\"AC\u0007\u000e\u0003-Q\u0011\u0001D\u0001\u0006g\u000e\fG.Y\u0005\u0003\u001d-\u0011a!\u00118z%\u00164\u0007C\u0001\t\u0016\u001b\u0005\t\"B\u0001\n\u0014\u0003\u0015\t7\r^8s\u0015\u0005!\u0012\u0001B1lW\u0006L!AF\t\u0003\u000b\u0005\u001bGo\u001c:\u0011\u0005AA\u0012BA\r\u0012\u00051\t5\r^8s\u0019><w-\u001b8h\u0011!Y\u0002A!A!\u0002\u0013i\u0012aC6bM.\f7i\u001c8gS\u001e\u001c\u0001\u0001\u0005\u0002\u001fC5\tqD\u0003\u0002!\u0005\u000511m\u001c8gS\u001eL!AI\u0010\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\tI\u0001\u0011\t\u0011)A\u0005K\u0005I1/\u001e2tGJL'-\u001a\t\u0003M%j\u0011a\n\u0006\u0003Q\t\tqaY8n[\u0006tG-\u0003\u0002+O\tI1+\u001e2tGJL'-\u001a\u0005\tY\u0001\u0011\t\u0011)A\u0005[\u0005!a\r\\8xa\tq\u0003\tE\u00030iYRd(D\u00011\u0015\t\t$'\u0001\u0005tG\u0006d\u0017\rZ:m\u0015\t\u00194#\u0001\u0004tiJ,\u0017-\\\u0005\u0003kA\u0012AA\u00127poB\u0011q\u0007O\u0007\u0002\u0005%\u0011\u0011H\u0001\u0002\r\u0017\u000647.Y'fgN\fw-\u001a\t\u0003wqj\u0011aE\u0005\u0003{M\u0011A\u0001R8oKB\u0011q\b\u0011\u0007\u0001\t%\t5&!A\u0001\u0002\u000b\u0005!IA\u0002`IE\n\"a\u0011$\u0011\u0005)!\u0015BA#\f\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"AC$\n\u0005![!aA!os\"A!\n\u0001B\u0001B\u0003%1*\u0001\td_:\u001cX/\\3s'\u0016$H/\u001b8hgB!AJ\u0014)\\\u001b\u0005i%BA\u0002\u0014\u0013\tyUJ\u0001\tD_:\u001cX/\\3s'\u0016$H/\u001b8hgB\u0011\u0011\u000b\u0017\b\u0003%Z\u0003\"aU\u0006\u000e\u0003QS!!\u0016\u000f\u0002\rq\u0012xn\u001c;?\u0013\t96\"\u0001\u0004Qe\u0016$WMZ\u0005\u00033j\u0013aa\u0015;sS:<'BA,\f!\rQALX\u0005\u0003;.\u0011Q!\u0011:sCf\u0004\"AC0\n\u0005\u0001\\!\u0001\u0002\"zi\u0016D\u0001B\u0019\u0001\u0003\u0002\u0003\u0006IaY\u0001\u0010gR\u0014X-Y7D_6\u0004H.\u001a;fIB\u0019Am\u001a\u001e\u000e\u0003\u0015T!AZ\u0006\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002iK\n9\u0001K]8nSN,\u0007\u0002\u00036\u0001\u0005\u0003\u0005\u000b1B6\u0002\u00075\fG\u000f\u0005\u0002m[6\t!'\u0003\u0002oe\taQ*\u0019;fe&\fG.\u001b>fe\"A\u0001\u000f\u0001B\u0001B\u0003-\u0011/\u0001\u0002fGB\u0011AM]\u0005\u0003g\u0016\u0014\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\t\u000bU\u0004A\u0011\u0001<\u0002\rqJg.\u001b;?)!98\u0010`?\u0002\u0006\u0005\u001dAc\u0001=zuB\u0011q\u0007\u0001\u0005\u0006UR\u0004\u001da\u001b\u0005\u0006aR\u0004\u001d!\u001d\u0005\u00067Q\u0004\r!\b\u0005\u0006IQ\u0004\r!\n\u0005\u0006YQ\u0004\rA \u0019\u0004\u007f\u0006\r\u0001CB\u00185mi\n\t\u0001E\u0002@\u0003\u0007!\u0011\"Q?\u0002\u0002\u0003\u0005)\u0011\u0001\"\t\u000b)#\b\u0019A&\t\u000b\t$\b\u0019A2\t\u0013\u0005-\u0001A1A\u0005\n\u00055\u0011A\u00049sK\u001aL\u00070\u001a3U_BL7m]\u000b\u0003\u0003\u001f\u0001B!UA\t!&\u0019\u00111\u0003.\u0003\u0007M+G\u000f\u0003\u0005\u0002\u0018\u0001\u0001\u000b\u0011BA\b\u0003=\u0001(/\u001a4jq\u0016$Gk\u001c9jGN\u0004\u0003\"CA\u000e\u0001\u0001\u0007I\u0011BA\u000f\u0003!\u0019\b.\u001e;e_^tWCAA\u0010!\u0015Q\u0011\u0011EA\u0013\u0013\r\t\u0019c\u0003\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u00071\f9#C\u0002\u0002*I\u0012!bS5mYN;\u0018\u000e^2i\u0011%\ti\u0003\u0001a\u0001\n\u0013\ty#\u0001\u0007tQV$Hm\\<o?\u0012*\u0017\u000f\u0006\u0003\u00022\u0005]\u0002c\u0001\u0006\u00024%\u0019\u0011QG\u0006\u0003\tUs\u0017\u000e\u001e\u0005\u000b\u0003s\tY#!AA\u0002\u0005}\u0011a\u0001=%c!A\u0011Q\b\u0001!B\u0013\ty\"A\u0005tQV$Hm\\<oA!9\u0011\u0011\t\u0001\u0005B\u0005\r\u0013\u0001\u00039sKN#\u0018M\u001d;\u0015\u0005\u0005E\u0002bBA$\u0001\u0011\u0005\u00131I\u0001\ta>\u001cHo\u0015;pa\"9\u00111\n\u0001\u0005\n\u00055\u0013a\u0002:v]:LgnZ\u000b\u0003\u0003\u001f\u0002B!!\u0015\u0002T5\t\u0001!C\u0002\u0002VU\u0011qAU3dK&4X\rC\u0004\u0002Z\u0001!\t%!\u0014\u0002\u000fI,7-Z5wK\"9\u0011Q\f\u0001\u0005\n\u0005\r\u0013a\u0001:v]\"9\u0011\u0011\r\u0001\u0005\n\u0005\r\u0014aC1u\u0019\u0016\f7\u000f^(oG\u0016$B!!\u001a\u0002tA\"\u0011qMA8!\u0019y\u0013\u0011\u000e\u001e\u0002n%\u0019\u00111\u000e\u0019\u0003\rM{WO]2f!\ry\u0014q\u000e\u0003\f\u0003c\ny&!A\u0001\u0002\u000b\u0005!IA\u0002`IMBq\u0001LA0\u0001\u0004\t)\b\r\u0003\u0002x\u0005m\u0004CB\u00185mi\nI\bE\u0002@\u0003w\"1\"! \u0002t\u0005\u0005\t\u0011!B\u0001\u0005\n\u0019q\f\n\u001a\b\u000f\u0005\u0005%\u0001#\u0001\u0002\u0004\u0006!2*\u00194lCN+(m]2sS\n,'/Q2u_J\u00042aNAC\r\u0019\t!\u0001#\u0001\u0002\bN\u0019\u0011QQ\u0005\t\u000fU\f)\t\"\u0001\u0002\fR\u0011\u00111\u0011\u0005\t\u0003\u001f\u000b)\t\"\u0001\u0002\u0012\u0006)\u0001O]8qgRa\u00111SAP\u0003C\u000b\u0019+a,\u00022R1\u0011QSAN\u0003;\u00032\u0001EAL\u0013\r\tI*\u0005\u0002\u0006!J|\u0007o\u001d\u0005\u0007U\u00065\u00059A6\t\rA\fi\tq\u0001r\u0011\u0019Y\u0012Q\u0012a\u0001;!1A%!$A\u0002\u0015Bq\u0001LAG\u0001\u0004\t)\u000b\r\u0003\u0002(\u0006-\u0006CB\u00185mi\nI\u000bE\u0002@\u0003W#1\"!,\u0002$\u0006\u0005\t\u0011!B\u0001\u0005\n\u0019q\f\n\u001b\t\r)\u000bi\t1\u0001L\u0011\u0019\u0011\u0017Q\u0012a\u0001G\"A\u0011QWAC\t\u0003\t9,A\beKN,'/[1mSj,g\t\\8x)\u0019\tI,!1\u0002LB1q\u0006N.7\u0003w\u00032aOA_\u0013\r\tyl\u0005\u0002\b\u001d>$Xk]3e\u0011!\t\u0019-a-A\u0002\u0005\u0015\u0017AB:zgR,W\u000eE\u0002\u0011\u0003\u000fL1!!3\u0012\u0005-\t5\r^8s'f\u001cH/Z7\t\u0011\u00055\u00171\u0017a\u0001\u0003\u001f\f1\u0001\\8h!\u0011\t\t.a6\u000e\u0005\u0005M'bAAk'\u0005)QM^3oi&!\u0011\u0011\\Aj\u00059aunZ4j]\u001e\fE-\u00199uKJ\u0004")
/* loaded from: input_file:nl/tradecloud/kafka/KafkaSubscriberActor.class */
public class KafkaSubscriberActor implements Actor, ActorLogging {
    private final KafkaConfig kafkaConfig;
    private final Subscribe subscribe;
    private final Flow<KafkaMessage, Done, ?> flow;
    private final ConsumerSettings<String, byte[]> consumerSettings;
    public final Promise<Done> nl$tradecloud$kafka$KafkaSubscriberActor$$streamCompleted;
    private final Materializer mat;
    private final ExecutionContext ec;
    private final Set<String> nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics;
    private Option<KillSwitch> shutdown;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorContext context;
    private final ActorRef self;

    public static Flow<byte[], KafkaMessage, NotUsed> deserializeFlow(ActorSystem actorSystem, LoggingAdapter loggingAdapter) {
        return KafkaSubscriberActor$.MODULE$.deserializeFlow(actorSystem, loggingAdapter);
    }

    public static Props props(KafkaConfig kafkaConfig, Subscribe subscribe, Flow<KafkaMessage, Done, ?> flow, ConsumerSettings<String, byte[]> consumerSettings, Promise<Done> promise, Materializer materializer, ExecutionContext executionContext) {
        return KafkaSubscriberActor$.MODULE$.props(kafkaConfig, subscribe, flow, consumerSettings, promise, materializer, executionContext);
    }

    public LoggingAdapter log() {
        return ActorLogging.log$(this);
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public Set<String> nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics() {
        return this.nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics;
    }

    private Option<KillSwitch> shutdown() {
        return this.shutdown;
    }

    private void shutdown_$eq(Option<KillSwitch> option) {
        this.shutdown = option;
    }

    public void preStart() {
        run();
    }

    public void postStop() {
        shutdown().foreach(killSwitch -> {
            killSwitch.shutdown();
            return BoxedUnit.UNIT;
        });
    }

    private PartialFunction<Object, BoxedUnit> running() {
        return new KafkaSubscriberActor$$anonfun$running$1(this);
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return PartialFunction$.MODULE$.empty();
    }

    private void run() {
        Tuple2 tuple2 = (Tuple2) atLeastOnce(this.flow).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.right()).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).run(this.mat);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((UniqueKillSwitch) tuple2._1(), (Future) tuple2._2());
        UniqueKillSwitch uniqueKillSwitch = (UniqueKillSwitch) tuple22._1();
        Future future = (Future) tuple22._2();
        shutdown_$eq(new Some(uniqueKillSwitch));
        package$.MODULE$.pipe(future, this.ec).pipeTo(self(), self());
        context().become(running());
    }

    private Source<Done, ?> atLeastOnce(Flow<KafkaMessage, Done, ?> flow) {
        return Consumer$.MODULE$.committableSource(this.consumerSettings, Subscriptions$.MODULE$.topics(nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics())).map(committableMessage -> {
            return new Tuple2(committableMessage.committableOffset(), committableMessage.record().value());
        }).via(Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(flow, builder -> {
            return flowShape -> {
                FanOutShape2 add = builder.add(Unzip$.MODULE$.apply());
                FanInShape2 add2 = builder.add(Zip$.MODULE$.apply());
                FlowShape add3 = builder.add(Flow$.MODULE$.apply().groupedWithin(this.subscribe.batchingSize(), this.subscribe.batchingInterval()).map(seq -> {
                    return (ConsumerMessage.CommittableOffsetBatch) seq.foldLeft(ConsumerMessage$CommittableOffsetBatch$.MODULE$.empty(), (committableOffsetBatch, tuple2) -> {
                        return committableOffsetBatch.updated((ConsumerMessage.CommittableOffset) tuple2._1());
                    });
                }).mapAsync(3, committableOffsetBatch -> {
                    return committableOffsetBatch.commitScaladsl();
                }));
                FlowShape add4 = builder.add(KafkaSubscriberActor$.MODULE$.deserializeFlow(this.context().system(), this.log()));
                GraphDSL$Implicits$.MODULE$.port2flow(add.out0(), builder).$tilde$greater(Flow$.MODULE$.apply().buffer(this.subscribe.offsetBuffer(), OverflowStrategy$.MODULE$.backpressure()), builder).$tilde$greater(add2.in0(), builder);
                GraphDSL$Implicits$.MODULE$.port2flow(add.out1(), builder).$tilde$greater(add4, builder).$tilde$greater(flowShape, builder).$tilde$greater(add2.in1(), builder);
                GraphDSL$Implicits$.MODULE$.port2flow(add2.out(), builder).$tilde$greater(add3.in(), builder);
                return new FlowShape(add.in(), add3.out());
            };
        })));
    }

    public KafkaSubscriberActor(KafkaConfig kafkaConfig, Subscribe subscribe, Flow<KafkaMessage, Done, ?> flow, ConsumerSettings<String, byte[]> consumerSettings, Promise<Done> promise, Materializer materializer, ExecutionContext executionContext) {
        this.kafkaConfig = kafkaConfig;
        this.subscribe = subscribe;
        this.flow = flow;
        this.consumerSettings = consumerSettings;
        this.nl$tradecloud$kafka$KafkaSubscriberActor$$streamCompleted = promise;
        this.mat = materializer;
        this.ec = executionContext;
        Actor.$init$(this);
        ActorLogging.$init$(this);
        this.nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics = (Set) subscribe.topics().map(str -> {
            return this.kafkaConfig.topicPrefix() + str;
        }, Set$.MODULE$.canBuildFrom());
        log().debug("Kafka subscriber started for topics {}", nl$tradecloud$kafka$KafkaSubscriberActor$$prefixedTopics().mkString(", "));
        this.shutdown = None$.MODULE$;
    }
}
