package delta.process;

import delta.EventSource;
import delta.Transaction;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.concurrent.TrieMap;
import scala.collection.immutable.Range;
import scala.collection.immutable.TreeMap;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scuff.concurrent.PartitionedExecutionContext;
import scuff.concurrent.PartitionedExecutionContext$;
import scuff.concurrent.Threads$;

/* compiled from: PersistentMonotonicProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005}g!B\t\u0013\u0003\u00039\u0002\u0002C\u001e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001f\t\u0011\u0015\u0003!Q1A\u0005\u0012\u0019C\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0019\"AA\u000b\u0001B\u0001B\u0003%Q\u000b\u0003\u0005_\u0001\t\u0005\t\u0015!\u0003`\u0011!1\u0007AaA!\u0002\u00179\u0007\"B7\u0001\t\u0003q\u0007\"B7\u0001\t\u0003Y\bbBA#\u0001\u0011E\u0011q\t\u0005\t\u0003C\u0002\u0001\u0015!\u0003\u0002d!9\u0011Q\u0016\u0001\u0005\u0012\u0005=v!CA\\%\u0005\u0005\t\u0012AA]\r!\t\"#!A\t\u0002\u0005m\u0006BB7\u000f\t\u0003\ti\fC\u0005\u0002@:\t\n\u0011\"\u0001\u0002B\na\u0002+\u001a:tSN$XM\u001c;N_:|Go\u001c8jGB\u0013xnY3tg>\u0014(BA\n\u0015\u0003\u001d\u0001(o\\2fgNT\u0011!F\u0001\u0006I\u0016dG/Y\u0002\u0001+\u0011ARe\f\u001a\u0014\t\u0001Ir\u0004\u000f\t\u00035ui\u0011a\u0007\u0006\u00029\u0005)1oY1mC&\u0011ad\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u000b\u0001\n3EL\u0019\u000e\u0003II!A\t\n\u0003%5{gn\u001c;p]&\u001c\u0007K]8dKN\u001cxN\u001d\t\u0003I\u0015b\u0001\u0001B\u0003'\u0001\t\u0007qE\u0001\u0002J\tF\u0011\u0001f\u000b\t\u00035%J!AK\u000e\u0003\u000f9{G\u000f[5oOB\u0011!\u0004L\u0005\u0003[m\u00111!\u00118z!\t!s\u0006B\u00031\u0001\t\u0007qEA\u0002F-R\u0003\"\u0001\n\u001a\u0005\u000bM\u0002!\u0019\u0001\u001b\u0003\u0003M\u000b\"!N\u0016\u0011\u0005i1\u0014BA\u001c\u001c\u0005\u0011qU\u000f\u001c7\u0011\t\u0001J4EL\u0005\u0003uI\u0011a#T5tg&twMU3wSNLwN\\:SKBd\u0017-_\u0001\u0003KN\u0004$!\u0010\"\u0011\tyz4%Q\u0007\u0002)%\u0011\u0001\t\u0006\u0002\f\u000bZ,g\u000e^*pkJ\u001cW\r\u0005\u0002%\u0005\u0012I1)AA\u0001\u0002\u0003\u0015\t\u0001\u0012\u0002\u0004?\u0012\n\u0014C\u0001\u0018,\u00031\u0001(o\\2fgN\u001cFo\u001c:f+\u00059\u0005\u0003\u0002\u0011IGEJ!!\u0013\n\u0003%M#(/Z1n!J|7-Z:t'R|'/Z\u0001\u000eaJ|7-Z:t'R|'/\u001a\u0011\u00027I,\u0007\u000f\\1z\u001b&\u001c8/\u001b8h%\u00164\u0018n]5p]N$U\r\\1z!\ti%+D\u0001O\u0015\ty\u0005+\u0001\u0005ekJ\fG/[8o\u0015\t\t6$\u0001\u0006d_:\u001cWO\u001d:f]RL!a\u0015(\u0003\u001d\u0019Kg.\u001b;f\tV\u0014\u0018\r^5p]\u0006I1o\u00195fIVdWM\u001d\t\u0003-rk\u0011a\u0016\u0006\u0003#bS!!\u0017.\u0002\tU$\u0018\u000e\u001c\u0006\u00027\u0006!!.\u0019<b\u0013\tivK\u0001\rTG\",G-\u001e7fI\u0016CXmY;u_J\u001cVM\u001d<jG\u0016\f\u0001\u0003]1si&$\u0018n\u001c8UQJ,\u0017\rZ:\u0011\u0005\u0001$W\"A1\u000b\u0005E\u0013'\"A2\u0002\u000bM\u001cWO\u001a4\n\u0005\u0015\f'a\u0007)beRLG/[8oK\u0012,\u00050Z2vi&|gnQ8oi\u0016DH/\u0001\u0006fm&$WM\\2fIE\u00022\u0001[6/\u001b\u0005I'B\u00016\u001c\u0003\u001d\u0011XM\u001a7fGRL!\u0001\\5\u0003\u0011\rc\u0017m]:UC\u001e\fa\u0001P5oSRtDCB8sobL(\u0010\u0006\u0002qcB)\u0001\u0005A\u0012/c!)a\r\u0003a\u0002O\")1\b\u0003a\u0001gB\u0012AO\u001e\t\u0005}}\u001aS\u000f\u0005\u0002%m\u0012I1I]A\u0001\u0002\u0003\u0015\t\u0001\u0012\u0005\u0006\u000b\"\u0001\ra\u0012\u0005\u0006\u0017\"\u0001\r\u0001\u0014\u0005\u0006)\"\u0001\r!\u0016\u0005\u0006=\"\u0001\ra\u0018\u000b\ry~\fY!a\u0004\u0002\u0012\u0005M\u00111\b\u000b\u0003avDqA`\u0005\u0002\u0002\u0003\u000fq-\u0001\u0006fm&$WM\\2fIIBaaO\u0005A\u0002\u0005\u0005\u0001\u0007BA\u0002\u0003\u000f\u0001RAP $\u0003\u000b\u00012\u0001JA\u0004\t)\tIa`A\u0001\u0002\u0003\u0015\t\u0001\u0012\u0002\u0004?\u0012\u0012\u0004BBA\u0007\u0013\u0001\u0007q)A\u0003ti>\u0014X\rC\u0003L\u0013\u0001\u0007A\nC\u0003U\u0013\u0001\u0007Q\u000bC\u0004\u0002\u0016%\u0001\r!a\u0006\u0002\u001bI,\u0007o\u001c:u\r\u0006LG.\u001e:f!\u001dQ\u0012\u0011DA\u000f\u0003kI1!a\u0007\u001c\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0003\u0002 \u0005=b\u0002BA\u0011\u0003WqA!a\t\u0002*5\u0011\u0011Q\u0005\u0006\u0004\u0003O1\u0012A\u0002\u001fs_>$h(C\u0001\u001d\u0013\r\ticG\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\t$a\r\u0003\u0013QC'o\\<bE2,'bAA\u00177A\u0019!$a\u000e\n\u0007\u0005e2D\u0001\u0003V]&$\b\"CA\u001f\u0013A\u0005\t\u0019AA \u0003E\u0001(o\\2fgNLgn\u001a+ie\u0016\fGm\u001d\t\u00045\u0005\u0005\u0013bAA\"7\t\u0019\u0011J\u001c;\u0002\u001dA\u0014xnY3tg\u000e{g\u000e^3yiR!\u0011\u0011JA/%\u0019\tY%a\u0014\u0002X\u00191\u0011Q\n\u0001\u0001\u0003\u0013\u0012A\u0002\u0010:fM&tW-\\3oiz\u0002B!!\u0015\u0002T5\t\u0001+C\u0002\u0002VA\u0013\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0011\u0007Y\u000bI&C\u0002\u0002\\]\u0013\u0001\"\u0012=fGV$xN\u001d\u0005\u0007\u0003?R\u0001\u0019A\u0012\u0002\u0005%$\u0017A\u0002:fa2\f\u0017\u0010\u0005\u0005\u001b\u0003K\u001a\u0013\u0011NA8\u0013\r\t9g\u0007\u0002\n\rVt7\r^5p]J\u0002B!a\b\u0002l%!\u0011QNA\u001a\u0005\u0015\u0011\u0016M\\4f!\u001dQ\u0012\u0011DA9\u0003k\u0001D!a\u001d\u0002*B9!$!\u0007\u0002v\u0005\u001d\u0006\u0007BA<\u0003\u007f\u0002bAPA=G\u0005u\u0014bAA>)\tYAK]1og\u0006\u001cG/[8o!\r!\u0013q\u0010\u0003\f\u0003\u0003\u000b\u0019)!A\u0001\u0002\u000b\u0005AIA\u0002`IMBq!!\"\u0017\u0001\u0004\tY)A\u0007sKBd\u0017-\u001f)s_\u000e,7o]\u0005\u0004\u0003\u0013K\u0014A\u0006:fa2\f\u00170T5tg&twMU3wSNLwN\\:1\t\u00055\u00151\u0015\t\b5\u0005e\u0011qRAQa\u0011\t\t*!'\u0011\u000fy\nI(a%\u0002\u0018B\u0019A%!&\u0005\u000b\u0019\u0002!\u0019A\u0014\u0011\u0007\u0011\nI\n\u0002\u0007\u0002\u0002\u0006\r\u0015\u0011!A\u0001\u0006\u0003\tY*E\u0002\u0002\u001e.\u00022\u0001JAP\t\u0015\u0001\u0004A1\u0001(!\r!\u00131\u0015\u0003\f\u0003K\u000b\u0019)!A\u0001\u0002\u000b\u0005qEA\u0002`IQ\u00022\u0001JAU\t-\t)+a+\u0002\u0002\u0003\u0005)\u0011A\u0014\t\u000f\u0005\u0015e\u00031\u0001\u0002r\u0005\u0011rN\\'jgNLgn\u001a*fm&\u001c\u0018n\u001c8t)\u0019\t)$!-\u00024\"1\u0011q\f\u0007A\u0002\rBq!!.\r\u0001\u0004\tI'A\u0004nSN\u001c\u0018N\\4\u00029A+'o]5ti\u0016tG/T8o_R|g.[2Qe>\u001cWm]:peB\u0011\u0001ED\n\u0003\u001de!\"!!/\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00137+!\t\u0019-!7\u0002\\\u0006uWCAAcU\u0011\ty$a2,\u0005\u0005%\u0007\u0003BAf\u0003+l!!!4\u000b\t\u0005=\u0017\u0011[\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a5\u001c\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003/\fiMA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016$QA\n\tC\u0002\u001d\"Q\u0001\r\tC\u0002\u001d\"Qa\r\tC\u0002Q\u0002")
/* loaded from: input_file:delta/process/PersistentMonotonicProcessor.class */
public abstract class PersistentMonotonicProcessor<ID, EVT, S> implements MonotonicProcessor<ID, EVT, S>, MissingRevisionsReplay<ID, EVT> {
    private final EventSource<ID, ? super EVT> es;
    private final StreamProcessStore<ID, S> processStore;
    private final FiniteDuration replayMissingRevisionsDelay;
    private final ScheduledExecutorService scheduler;
    private final PartitionedExecutionContext partitionThreads;
    private final Function2<ID, Range, Function1<Function1<Transaction<ID, ? super EVT>, ?>, BoxedUnit>> replay;
    private final TrieMap<ID, Tuple2<Range, ScheduledFuture<?>>> delta$process$MissingRevisionsReplay$$outstandingReplays;
    private final TreeMap<Object, Transaction<ID, ? super EVT>> delta$process$MonotonicProcessor$$Empty;
    private final TrieMap<ID, MonotonicProcessor<ID, EVT, S>.StreamStatus> delta$process$MonotonicProcessor$$streamStatus;

    /* JADX WARN: Incorrect inner types in field signature: Ldelta/process/MonotonicProcessor<TID;TEVT;TS;>.IncompleteStream$; */
    private volatile MonotonicProcessor$IncompleteStream$ IncompleteStream$module;

    @Override // delta.process.MissingRevisionsReplay
    public void replayMissingRevisions(EventSource<ID, ? super EVT> eventSource, FiniteDuration finiteDuration, ScheduledExecutorService scheduledExecutorService, Function1<Throwable, BoxedUnit> function1, ID id, Range range, Function1<Transaction<ID, ? super EVT>, ?> function12) {
        replayMissingRevisions(eventSource, finiteDuration, scheduledExecutorService, function1, id, range, function12);
    }

    @Override // delta.process.MonotonicProcessor
    public Iterable<MonotonicProcessor<ID, EVT, S>.IncompleteStream> incompleteStreams() {
        Iterable<MonotonicProcessor<ID, EVT, S>.IncompleteStream> incompleteStreams;
        incompleteStreams = incompleteStreams();
        return incompleteStreams;
    }

    @Override // delta.process.MonotonicProcessor
    public Future<BoxedUnit> apply(Transaction<ID, ? super EVT> transaction) {
        Future<BoxedUnit> apply;
        apply = apply((Transaction) transaction);
        return apply;
    }

    public boolean apply$mcZD$sp(double d) {
        return Function1.apply$mcZD$sp$(this, d);
    }

    public double apply$mcDD$sp(double d) {
        return Function1.apply$mcDD$sp$(this, d);
    }

    public float apply$mcFD$sp(double d) {
        return Function1.apply$mcFD$sp$(this, d);
    }

    public int apply$mcID$sp(double d) {
        return Function1.apply$mcID$sp$(this, d);
    }

    public long apply$mcJD$sp(double d) {
        return Function1.apply$mcJD$sp$(this, d);
    }

    public void apply$mcVD$sp(double d) {
        Function1.apply$mcVD$sp$(this, d);
    }

    public boolean apply$mcZF$sp(float f) {
        return Function1.apply$mcZF$sp$(this, f);
    }

    public double apply$mcDF$sp(float f) {
        return Function1.apply$mcDF$sp$(this, f);
    }

    public float apply$mcFF$sp(float f) {
        return Function1.apply$mcFF$sp$(this, f);
    }

    public int apply$mcIF$sp(float f) {
        return Function1.apply$mcIF$sp$(this, f);
    }

    public long apply$mcJF$sp(float f) {
        return Function1.apply$mcJF$sp$(this, f);
    }

    public void apply$mcVF$sp(float f) {
        Function1.apply$mcVF$sp$(this, f);
    }

    public boolean apply$mcZI$sp(int i) {
        return Function1.apply$mcZI$sp$(this, i);
    }

    public double apply$mcDI$sp(int i) {
        return Function1.apply$mcDI$sp$(this, i);
    }

    public float apply$mcFI$sp(int i) {
        return Function1.apply$mcFI$sp$(this, i);
    }

    public int apply$mcII$sp(int i) {
        return Function1.apply$mcII$sp$(this, i);
    }

    public long apply$mcJI$sp(int i) {
        return Function1.apply$mcJI$sp$(this, i);
    }

    public void apply$mcVI$sp(int i) {
        Function1.apply$mcVI$sp$(this, i);
    }

    public boolean apply$mcZJ$sp(long j) {
        return Function1.apply$mcZJ$sp$(this, j);
    }

    public double apply$mcDJ$sp(long j) {
        return Function1.apply$mcDJ$sp$(this, j);
    }

    public float apply$mcFJ$sp(long j) {
        return Function1.apply$mcFJ$sp$(this, j);
    }

    public int apply$mcIJ$sp(long j) {
        return Function1.apply$mcIJ$sp$(this, j);
    }

    public long apply$mcJJ$sp(long j) {
        return Function1.apply$mcJJ$sp$(this, j);
    }

    public void apply$mcVJ$sp(long j) {
        Function1.apply$mcVJ$sp$(this, j);
    }

    public <A> Function1<A, Future<BoxedUnit>> compose(Function1<A, Transaction<ID, ? super EVT>> function1) {
        return Function1.compose$(this, function1);
    }

    public <A> Function1<Transaction<ID, ? super EVT>, A> andThen(Function1<Future<BoxedUnit>, A> function1) {
        return Function1.andThen$(this, function1);
    }

    public String toString() {
        return Function1.toString$(this);
    }

    @Override // delta.process.TransactionProcessor
    public Future<S> processAsync(Transaction<ID, ? super EVT> transaction, Option<S> option) {
        Future<S> processAsync;
        processAsync = processAsync(transaction, option);
        return processAsync;
    }

    @Override // delta.process.MissingRevisionsReplay
    public TrieMap<ID, Tuple2<Range, ScheduledFuture<?>>> delta$process$MissingRevisionsReplay$$outstandingReplays() {
        return this.delta$process$MissingRevisionsReplay$$outstandingReplays;
    }

    @Override // delta.process.MissingRevisionsReplay
    public final void delta$process$MissingRevisionsReplay$_setter_$delta$process$MissingRevisionsReplay$$outstandingReplays_$eq(TrieMap<ID, Tuple2<Range, ScheduledFuture<?>>> trieMap) {
        this.delta$process$MissingRevisionsReplay$$outstandingReplays = trieMap;
    }

    @Override // delta.process.MonotonicProcessor
    public TreeMap<Object, Transaction<ID, ? super EVT>> delta$process$MonotonicProcessor$$Empty() {
        return this.delta$process$MonotonicProcessor$$Empty;
    }

    @Override // delta.process.MonotonicProcessor
    public TrieMap<ID, MonotonicProcessor<ID, EVT, S>.StreamStatus> delta$process$MonotonicProcessor$$streamStatus() {
        return this.delta$process$MonotonicProcessor$$streamStatus;
    }

    /* JADX WARN: Incorrect inner types in method signature: ()Ldelta/process/MonotonicProcessor<TID;TEVT;TS;>.IncompleteStream$; */
    @Override // delta.process.MonotonicProcessor
    public MonotonicProcessor$IncompleteStream$ IncompleteStream() {
        if (this.IncompleteStream$module == null) {
            IncompleteStream$lzycompute$1();
        }
        return this.IncompleteStream$module;
    }

    @Override // delta.process.MonotonicProcessor
    public final void delta$process$MonotonicProcessor$_setter_$delta$process$MonotonicProcessor$$Empty_$eq(TreeMap<Object, Transaction<ID, ? super EVT>> treeMap) {
        this.delta$process$MonotonicProcessor$$Empty = treeMap;
    }

    @Override // delta.process.MonotonicProcessor
    public final void delta$process$MonotonicProcessor$_setter_$delta$process$MonotonicProcessor$$streamStatus_$eq(TrieMap<ID, MonotonicProcessor<ID, EVT, S>.StreamStatus> trieMap) {
        this.delta$process$MonotonicProcessor$$streamStatus = trieMap;
    }

    @Override // delta.process.MonotonicProcessor
    public StreamProcessStore<ID, S> processStore() {
        return this.processStore;
    }

    @Override // delta.process.MonotonicProcessor
    public ExecutionContext processContext(ID id) {
        return this.partitionThreads.singleThread(id.hashCode());
    }

    @Override // delta.process.MonotonicProcessor
    public void onMissingRevisions(ID id, Range range) {
        ((Function1) this.replay.apply(id, range)).apply(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [delta.process.PersistentMonotonicProcessor] */
    private final void IncompleteStream$lzycompute$1() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.IncompleteStream$module == null) {
                r0 = this;
                r0.IncompleteStream$module = new MonotonicProcessor$IncompleteStream$(this);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$replay$1(PersistentMonotonicProcessor persistentMonotonicProcessor, Throwable th) {
        persistentMonotonicProcessor.partitionThreads.reportFailure(th);
    }

    public static final /* synthetic */ void $anonfun$replay$3(PersistentMonotonicProcessor persistentMonotonicProcessor, Function1 function1, Object obj, Range range, Function1 function12) {
        persistentMonotonicProcessor.replayMissingRevisions(persistentMonotonicProcessor.es, persistentMonotonicProcessor.replayMissingRevisionsDelay, persistentMonotonicProcessor.scheduler, function1, obj, range, function12);
    }

    public PersistentMonotonicProcessor(EventSource<ID, ? super EVT> eventSource, StreamProcessStore<ID, S> streamProcessStore, FiniteDuration finiteDuration, ScheduledExecutorService scheduledExecutorService, PartitionedExecutionContext partitionedExecutionContext, ClassTag<EVT> classTag) {
        this.es = eventSource;
        this.processStore = streamProcessStore;
        this.replayMissingRevisionsDelay = finiteDuration;
        this.scheduler = scheduledExecutorService;
        this.partitionThreads = partitionedExecutionContext;
        TransactionProcessor.$init$(this);
        Function1.$init$(this);
        MonotonicProcessor.$init$((MonotonicProcessor) this);
        delta$process$MissingRevisionsReplay$_setter_$delta$process$MissingRevisionsReplay$$outstandingReplays_$eq(new TrieMap<>());
        Function1 function1 = th -> {
            $anonfun$replay$1(this, th);
            return BoxedUnit.UNIT;
        };
        this.replay = (obj, range) -> {
            return function12 -> {
                $anonfun$replay$3(this, function1, obj, range, function12);
                return BoxedUnit.UNIT;
            };
        };
    }

    public PersistentMonotonicProcessor(EventSource<ID, ? super EVT> eventSource, StreamProcessStore<ID, S> streamProcessStore, FiniteDuration finiteDuration, ScheduledExecutorService scheduledExecutorService, Function1<Throwable, BoxedUnit> function1, int i, ClassTag<EVT> classTag) {
        this(eventSource, streamProcessStore, finiteDuration, scheduledExecutorService, PartitionedExecutionContext$.MODULE$.apply(i, function1, Threads$.MODULE$.factory("default-replay-processor", function1)), classTag);
    }
}
