package nl.tradecloud.kafka;

import akka.Done$;
import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.Actor;
import akka.actor.Actor$emptyBehavior$;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.LoggingAdapter;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.scaladsl.Producer$;
import akka.pattern.package$;
import akka.stream.FanInShape2;
import akka.stream.FlowShape;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.UniformFanOutShape;
import akka.stream.scaladsl.Broadcast$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueue;
import akka.stream.scaladsl.SourceQueueWithComplete;
import akka.stream.scaladsl.Zip$;
import nl.tradecloud.kafka.command.Publish;
import nl.tradecloud.kafka.config.KafkaConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.MatchError;
import scala.Option;
import scala.PartialFunction;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaPublisherActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015c\u0001B\u0001\u0003\u0001%\u00111cS1gW\u0006\u0004VO\u00197jg\",'/Q2u_JT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011A\u0003;sC\u0012,7\r\\8vI*\tq!\u0001\u0002oY\u000e\u00011\u0003\u0002\u0001\u000b!a\u0001\"a\u0003\b\u000e\u00031Q\u0011!D\u0001\u0006g\u000e\fG.Y\u0005\u0003\u001f1\u0011a!\u00118z%\u00164\u0007CA\t\u0017\u001b\u0005\u0011\"BA\n\u0015\u0003\u0015\t7\r^8s\u0015\u0005)\u0012\u0001B1lW\u0006L!a\u0006\n\u0003\u000b\u0005\u001bGo\u001c:\u0011\u0005EI\u0012B\u0001\u000e\u0013\u00051\t5\r^8s\u0019><w-\u001b8h\u0011!a\u0002A!A!\u0002\u0013i\u0012aC6bM.\f7i\u001c8gS\u001e\u0004\"AH\u0011\u000e\u0003}Q!\u0001\t\u0002\u0002\r\r|gNZ5h\u0013\t\u0011sDA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002\u0003\u0013\u0001\u0005\u0003\u0005\u000b\u0011B\u0013\u0002!A\u0014x\u000eZ;dKJ\u001cV\r\u001e;j]\u001e\u001c\b\u0003\u0002\u0014)UUj\u0011a\n\u0006\u0003\u0007QI!!K\u0014\u0003!A\u0013x\u000eZ;dKJ\u001cV\r\u001e;j]\u001e\u001c\bCA\u00163\u001d\ta\u0003\u0007\u0005\u0002.\u00195\taF\u0003\u00020\u0011\u00051AH]8pizJ!!\r\u0007\u0002\rA\u0013X\rZ3g\u0013\t\u0019DG\u0001\u0004TiJLgn\u001a\u0006\u0003c1\u00012a\u0003\u001c9\u0013\t9DBA\u0003BeJ\f\u0017\u0010\u0005\u0002\fs%\u0011!\b\u0004\u0002\u0005\u0005f$X\r\u0003\u0005=\u0001\t\u0005\t\u0015a\u0003>\u0003\ri\u0017\r\u001e\t\u0003}\u0005k\u0011a\u0010\u0006\u0003\u0001R\taa\u001d;sK\u0006l\u0017B\u0001\"@\u00051i\u0015\r^3sS\u0006d\u0017N_3s\u0011!!\u0005A!A!\u0002\u0017)\u0015AA3d!\t1\u0015*D\u0001H\u0015\tAE\"\u0001\u0006d_:\u001cWO\u001d:f]RL!AS$\u0003!\u0015CXmY;uS>t7i\u001c8uKb$\b\"\u0002'\u0001\t\u0003i\u0015A\u0002\u001fj]&$h\bF\u0002O'R#2aT)S!\t\u0001\u0006!D\u0001\u0003\u0011\u0015a4\nq\u0001>\u0011\u0015!5\nq\u0001F\u0011\u0015a2\n1\u0001\u001e\u0011\u0015!3\n1\u0001&\u0011\u00151\u0006\u0001\"\u0011X\u0003!\u0001(/Z*uCJ$H#\u0001-\u0011\u0005-I\u0016B\u0001.\r\u0005\u0011)f.\u001b;\t\u000bq\u0003A\u0011A/\u0002\u000fI,7-Z5wKV\ta\f\u0005\u0002`A6\t\u0001!\u0003\u0002b-\t9!+Z2fSZ,\u0007\"B2\u0001\t\u0003!\u0017a\u0002:v]:Lgn\u001a\u000b\u0003=\u0016DQA\u001a2A\u0002\u001d\fQ!];fk\u0016\u00042\u0001[6n\u001b\u0005I'B\u00016@\u0003!\u00198-\u00197bINd\u0017B\u00017j\u0005-\u0019v.\u001e:dKF+X-^3\u0011\u00059\fX\"A8\u000b\u0005A\u0014\u0011aB2p[6\fg\u000eZ\u0005\u0003e>\u0014q\u0001U;cY&\u001c\b.\u0002\u0003u\u0001\u0011)(\u0001F&bM.\f\u0007K]8ek\u000e,'/T3tg\u0006<W\rE\u0003ws**DP\u0004\u0002'o&\u0011\u0001pJ\u0001\u0010!J|G-^2fe6+7o]1hK&\u0011!p\u001f\u0002\b\u001b\u0016\u001c8/Y4f\u0015\tAx\u0005\u0005\u0002~}6\tA#\u0003\u0002��)\t9aj\u001c;Vg\u0016$WABA\u0002\u0001\u0011\t)AA\nLC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d*fgVdG\u000f\u0005\u0004w\u0003\u000fQS\u0007`\u0005\u0004\u0003\u0013Y(A\u0002*fgVdG\u000fC\u0005\u0002\u000e\u0001\u0011\r\u0011\"\u0003\u0002\u0010\u00051\u0002/\u001e2mSND\u0017I\u001c3D_6\u0004H.\u001a;f\r2|w/\u0006\u0002\u0002\u0012A9\u0001.a\u0005n\u0003/a\u0018bAA\u000bS\n!a\t\\8x\u001d\ri\u0018\u0011D\u0005\u0004\u00037!\u0012\u0001\u0002#p]\u0016D\u0001\"a\b\u0001A\u0003%\u0011\u0011C\u0001\u0018aV\u0014G.[:i\u0003:$7i\\7qY\u0016$XM\u00127po\u0002:q!a\t\u0003\u0011\u0003\t)#A\nLC\u001a\\\u0017\rU;cY&\u001c\b.\u001a:BGR|'\u000fE\u0002Q\u0003O1a!\u0001\u0002\t\u0002\u0005%2cAA\u0014\u0015!9A*a\n\u0005\u0002\u00055BCAA\u0013\u0011!\t\t$a\n\u0005\u0002\u0005M\u0012!\u00029s_B\u001cHCBA\u001b\u0003\u0003\n\u0019\u0005\u0006\u0004\u00028\u0005u\u0012q\b\t\u0004#\u0005e\u0012bAA\u001e%\t)\u0001K]8qg\"1A(a\fA\u0004uBa\u0001RA\u0018\u0001\b)\u0005B\u0002\u000f\u00020\u0001\u0007Q\u0004\u0003\u0004%\u0003_\u0001\r!\n")
/* loaded from: input_file:nl/tradecloud/kafka/KafkaPublisherActor.class */
public class KafkaPublisherActor implements Actor, ActorLogging {
    private final KafkaConfig kafkaConfig;
    private final ProducerSettings<String, byte[]> producerSettings;
    private final Materializer mat;
    private final ExecutionContext ec;
    private final Flow<Publish, Done$, NotUsed> publishAndCompleteFlow;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorContext context;
    private final ActorRef self;

    public static Props props(KafkaConfig kafkaConfig, ProducerSettings<String, byte[]> producerSettings, Materializer materializer, ExecutionContext executionContext) {
        return KafkaPublisherActor$.MODULE$.props(kafkaConfig, producerSettings, materializer, executionContext);
    }

    public LoggingAdapter log() {
        return ActorLogging.log$(this);
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public void preStart() {
        Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.backpressure()).via(publishAndCompleteFlow()).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).run(this.mat);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((SourceQueueWithComplete) tuple2._1(), (Future) tuple2._2());
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple22._1();
        package$.MODULE$.pipe((Future) tuple22._2(), this.ec).pipeTo(self(), self());
        context().become(running(sourceQueueWithComplete));
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return Actor$emptyBehavior$.MODULE$;
    }

    public PartialFunction<Object, BoxedUnit> running(SourceQueue<Publish> sourceQueue) {
        return new KafkaPublisherActor$$anonfun$running$1(this, sourceQueue);
    }

    private Flow<Publish, Done$, NotUsed> publishAndCompleteFlow() {
        return this.publishAndCompleteFlow;
    }

    public KafkaPublisherActor(KafkaConfig kafkaConfig, ProducerSettings<String, byte[]> producerSettings, Materializer materializer, ExecutionContext executionContext) {
        this.kafkaConfig = kafkaConfig;
        this.producerSettings = producerSettings;
        this.mat = materializer;
        this.ec = executionContext;
        Actor.$init$(this);
        ActorLogging.$init$(this);
        log().info("Started publisher for topic={}, prefixedTopic={}");
        this.publishAndCompleteFlow = Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(builder -> {
            UniformFanOutShape add = builder.add(Broadcast$.MODULE$.apply(2, Broadcast$.MODULE$.apply$default$2()));
            FanInShape2 add2 = builder.add(Zip$.MODULE$.apply());
            FlowShape add3 = builder.add(Flow$.MODULE$.apply().map(tuple2 -> {
                ((Publish) tuple2._2()).completed().success(Done$.MODULE$);
                return Done$.MODULE$;
            }));
            FlowShape add4 = builder.add(Flow$.MODULE$.apply().map(publish -> {
                String str = this.kafkaConfig.topicPrefix() + publish.topic();
                this.log().debug("Kafka publishing cmd={}, topic={}", publish, str);
                return new ProducerMessage.Message(new ProducerRecord(str, KafkaMessageSerializer$.MODULE$.serialize(this.context().system(), publish.msg()).toByteArray()), NotUsed$.MODULE$);
            }));
            FlowShape add5 = builder.add(Producer$.MODULE$.flow(this.producerSettings));
            Flow buffer = Flow$.MODULE$.apply().buffer(10, OverflowStrategy$.MODULE$.backpressure());
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(0), builder).$tilde$greater(add4, builder).$tilde$greater(add5, builder).$tilde$greater(add2.in0(), builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(1), builder).$tilde$greater(buffer, builder).$tilde$greater(add2.in1(), builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add2.out(), builder).$tilde$greater(add3, builder);
            return new FlowShape(add.in(), add3.out());
        }));
    }
}
