package delta.mongo;

import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import delta.Snapshot;
import delta.process.NonBlockingCASWrites;
import delta.process.NonBlockingRecursiveUpsert;
import delta.process.RecursiveUpsert;
import delta.process.SnapshotUpdate;
import delta.process.StreamProcessStore;
import java.util.ArrayList;
import java.util.List;
import org.bson.Document;
import scala.Function0;
import scala.Function1;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.BuildFrom$;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Either;
import scuff.Codec;
import scuff.concurrent.package$ScuffScalaFuture$;

/* compiled from: MongoStreamProcessStore.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005=f\u0001\u0002\t\u0012\u0001YA\u0001B\u000e\u0001\u0003\u0002\u0003\u0006Ia\u000e\u0005\t\u000b\u0002\u0011\t\u0011)A\u0005\r\"A!\u000b\u0001B\u0002B\u0003-1\u000b\u0003\u0005Z\u0001\t\u0005\t\u0015a\u0003[\u0011\u0015\u0001\u0007\u0001\"\u0001b\u0011\u0019A\u0007\u0001)A\u0005S\")!\u0010\u0001C\u0001w\"9\u0011\u0011\u0001\u0001\u0005\n\u0005\r\u0001bBA\r\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003\u000b\u0002A\u0011AA$\u0011\u001d\ty\u0006\u0001C\u0005\u0003CBq!!\u001e\u0001\t\u0003\t9\bC\u0004\u0002\u0004\u0002!\t!!\"\t\u000f\u0005M\u0005\u0001\"\u0005\u0002\u0016\"9\u0011\u0011\u0015\u0001\u0005\u0012\u0005\r&aF'p]\u001e|7\u000b\u001e:fC6\u0004&o\\2fgN\u001cFo\u001c:f\u0015\t\u00112#A\u0003n_:<wNC\u0001\u0015\u0003\u0015!W\r\u001c;b\u0007\u0001)2a\u0006\u0010,'\u0011\u0001\u0001$L\u001a\u0011\teQBDK\u0007\u0002#%\u00111$\u0005\u0002\u0013\u001b>twm\\*oCB\u001c\bn\u001c;Ti>\u0014X\r\u0005\u0002\u001e=1\u0001A!B\u0010\u0001\u0005\u0004\u0001#!A&\u0012\u0005\u0005:\u0003C\u0001\u0012&\u001b\u0005\u0019#\"\u0001\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001a#a\u0002(pi\"Lgn\u001a\t\u0003E!J!!K\u0012\u0003\u0007\u0005s\u0017\u0010\u0005\u0002\u001eW\u0011)A\u0006\u0001b\u0001A\t\ta\u000b\u0005\u0003/cqQS\"A\u0018\u000b\u0005A\u001a\u0012a\u00029s_\u000e,7o]\u0005\u0003e=\u0012!c\u0015;sK\u0006l\u0007K]8dKN\u001c8\u000b^8sKB!a\u0006\u000e\u000f+\u0013\t)tF\u0001\u000bO_:\u0014En\\2lS:<7)Q*Xe&$Xm]\u0001\u000eg:\f\u0007o\u001d5pi\u000e{G-Z2\u0011\taZ$&P\u0007\u0002s)\t!(A\u0003tGV4g-\u0003\u0002=s\t)1i\u001c3fGB\u0011ahQ\u0007\u0002\u007f)\u0011\u0001)Q\u0001\u0005EN|gNC\u0001C\u0003\ry'oZ\u0005\u0003\t~\u0012\u0001\u0002R8dk6,g\u000e^\u0001\u0005G>dG\u000eE\u0002H!vj\u0011\u0001\u0013\u0006\u0003\u0013*\u000baa\u00197jK:$(BA&M\u0003\u0015\t7/\u001f8d\u0015\tie*A\u0004n_:<w\u000e\u001a2\u000b\u0003=\u000b1aY8n\u0013\t\t\u0006JA\bN_:<wnQ8mY\u0016\u001cG/[8o\u0003))g/\u001b3f]\u000e,G%\r\t\u0004)^cR\"A+\u000b\u0005Y\u001b\u0013a\u0002:fM2,7\r^\u0005\u00031V\u0013\u0001b\u00117bgN$\u0016mZ\u0001\u0003K\u000e\u0004\"a\u00170\u000e\u0003qS!!X\u0012\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002`9\n\u0001R\t_3dkRLwN\\\"p]R,\u0007\u0010^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007\t4w\rF\u0002dI\u0016\u0004B!\u0007\u0001\u001dU!)!+\u0002a\u0002'\")\u0011,\u0002a\u00025\")a'\u0002a\u0001o!)Q)\u0002a\u0001\r\u0006yA/[2l\u0013:$W\r\u001f$viV\u0014X\rE\u0002\\U2L!a\u001b/\u0003\r\u0019+H/\u001e:f!\r\u0011Sn\\\u0005\u0003]\u000e\u0012aa\u00149uS>t\u0007C\u00019x\u001d\t\tX\u000f\u0005\u0002sG5\t1O\u0003\u0002u+\u00051AH]8pizJ!A^\u0012\u0002\rA\u0013X\rZ3g\u0013\tA\u0018P\u0001\u0004TiJLgn\u001a\u0006\u0003m\u000e\nQ\u0002^5dW^\u000bG/\u001a:nCJ\\W#\u0001?\u0011\u0007\tjW\u0010\u0005\u0002#}&\u0011qp\t\u0002\u0005\u0019>tw-A\u0002`S\u0012$2!PA\u0003\u0011\u001d\t9\u0001\u0003a\u0001\u0003\u0013\tAa[3zgB)\u00111BA\u000b95\u0011\u0011Q\u0002\u0006\u0005\u0003\u001f\t\t\"\u0001\u0003vi&d'BAA\n\u0003\u0011Q\u0017M^1\n\t\u0005]\u0011Q\u0002\u0002\u0005\u0019&\u001cH/A\u0005sK\u0006$')\u0019;dQR!\u0011QDA\u0019!\u0011Y&.a\b\u0011\rA\f\t\u0003HA\u0013\u0013\r\t\u0019#\u001f\u0002\u0004\u001b\u0006\u0004\b\u0003BA\u0014\u0003Si\u0011\u0001A\u0005\u0005\u0003W\tiC\u0001\u0005T]\u0006\u00048\u000f[8u\u0013\r\tyc\u0005\u0002\u000e':\f\u0007o\u001d5piN#xN]3\t\u000f\u0005\u001d\u0011\u00021\u0001\u00024A)\u0011QGA 99!\u0011qGA\u001e\u001d\r\u0011\u0018\u0011H\u0005\u0002I%\u0019\u0011QH\u0012\u0002\u000fA\f7m[1hK&!\u0011\u0011IA\"\u0005!IE/\u001a:bE2,'bAA\u001fG\u0005QqO]5uK\n\u000bGo\u00195\u0015\t\u0005%\u0013\u0011\u000b\t\u00057*\fY\u0005E\u0002#\u0003\u001bJ1!a\u0014$\u0005\u0011)f.\u001b;\t\u000f\u0005M#\u00021\u0001\u0002V\u0005I1O\\1qg\"|Go\u001d\t\b\u0003/\ni\u0006HA\u0013\u001b\t\tIFC\u0002\u0002\\\r\n!bY8mY\u0016\u001cG/[8o\u0013\u0011\t\u0019#!\u0017\u0002\u000f\u0015D\u0018m\u0019;msR9Q(a\u0019\u0002h\u0005E\u0004BBA3\u0017\u0001\u0007A$A\u0002lKfDq!!\u001b\f\u0001\u0004\tY'\u0001\u0004pY\u0012\u0014VM\u001e\t\u0004E\u00055\u0014bAA8G\t\u0019\u0011J\u001c;\t\r\u0005M4\u00021\u0001~\u0003\u001dyG\u000e\u001a+jG.\fqA]3ge\u0016\u001c\b\u000e\u0006\u0005\u0002J\u0005e\u00141PA@\u0011\u0019\t)\u0007\u0004a\u00019!9\u0011Q\u0010\u0007A\u0002\u0005-\u0014\u0001\u0003:fm&\u001c\u0018n\u001c8\t\r\u0005\u0005E\u00021\u0001~\u0003\u0011!\u0018nY6\u0002\u0019I,gM]3tQ\n\u000bGo\u00195\u0015\t\u0005%\u0013q\u0011\u0005\b\u0003\u0013k\u0001\u0019AAF\u0003%\u0011XM^5tS>t7\u000fE\u0004\u0002X\u0005uC$!$\u0011\r\t\ny)a\u001b~\u0013\r\t\tj\t\u0002\u0007)V\u0004H.\u001a\u001a\u0002\u001b]\u0014\u0018\u000e^3JM\u0006\u00137/\u001a8u)\u0019\t9*a'\u0002\u001eB!1L[AM!\u0011\u0011S.!\n\t\r\u0005\u0015d\u00021\u0001\u001d\u0011\u001d\tyJ\u0004a\u0001\u0003K\t\u0001b\u001d8baNDw\u000e^\u0001\u0011oJLG/\u001a*fa2\f7-Z7f]R$\u0002\"a&\u0002&\u0006\u001d\u00161\u0016\u0005\u0007\u0003Kz\u0001\u0019\u0001\u000f\t\u000f\u0005%v\u00021\u0001\u0002&\u0005Yq\u000e\u001c3T]\u0006\u00048\u000f[8u\u0011\u001d\tik\u0004a\u0001\u0003K\t1B\\3x':\f\u0007o\u001d5pi\u0002")
/* loaded from: input_file:delta/mongo/MongoStreamProcessStore.class */
public class MongoStreamProcessStore<K, V> extends MongoSnapshotStore<K, V> implements StreamProcessStore<K, V>, NonBlockingCASWrites<K, V> {
    private final Codec<V, Document> snapshotCodec;
    private final MongoCollection<Document> coll;
    public final ExecutionContext delta$mongo$MongoStreamProcessStore$$ec;
    private final Future<Option<String>> tickIndexFuture;

    public <R> Future<Tuple2<Option<SnapshotUpdate<V>>, R>> upsert(K k, Function1<Option<Snapshot<V>>, Future<Tuple2<Option<Snapshot<V>>, R>>> function1, ExecutionContext executionContext) {
        return NonBlockingCASWrites.upsert$(this, k, function1, executionContext);
    }

    public final <R> Future<Tuple2<Option<SnapshotUpdate<V>>, R>> upsertRecursive(K k, Option<Snapshot<V>> option, Function1<Option<Snapshot<V>>, Future<Tuple2<Option<Snapshot<V>>, R>>> function1, Function3<K, Option<Snapshot<V>>, Snapshot<V>, Future<Either<Snapshot<V>, Object>>> function3, int i, ExecutionContext executionContext) {
        return NonBlockingRecursiveUpsert.upsertRecursive$(this, k, option, function1, function3, i, executionContext);
    }

    public int upsertRetryLimit() {
        return RecursiveUpsert.upsertRetryLimit$(this);
    }

    public Nothing$ retriesExhausted(K k, Snapshot<V> snapshot, Snapshot<V> snapshot2) {
        return RecursiveUpsert.retriesExhausted$(this, k, snapshot, snapshot2);
    }

    public Option<Object> tickWatermark() {
        Future flatMap = this.tickIndexFuture.flatMap(option -> {
            return package$.MODULE$.withFutureCallback(function0 -> {
                $anonfun$tickWatermark$2(this, function0);
                return BoxedUnit.UNIT;
            }).map(option -> {
                return option.map(document -> {
                    return BoxesRunTime.boxToLong($anonfun$tickWatermark$4(document));
                });
            }, this.delta$mongo$MongoStreamProcessStore$$ec);
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
        return (Option) scala.concurrent.package$.MODULE$.blocking(() -> {
            Future ScuffScalaFuture = scuff.concurrent.package$.MODULE$.ScuffScalaFuture(flatMap);
            return (Option) package$ScuffScalaFuture$.MODULE$.await$extension(ScuffScalaFuture, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(11)).seconds(), package$ScuffScalaFuture$.MODULE$.await$default$2$extension(ScuffScalaFuture));
        });
    }

    private Document _id(List<K> list) {
        return new Document("_id", new Document("$in", list));
    }

    public Future<Map<K, Snapshot<V>>> readBatch(Iterable<K> iterable) {
        return package$.MODULE$.withFutureCallback(function0 -> {
            $anonfun$readBatch$1(this, iterable, function0);
            return BoxedUnit.UNIT;
        }).map(option -> {
            Map empty;
            if (option instanceof Some) {
                empty = (Map) ((IterableOnceOps) JavaConverters$.MODULE$.asScalaBufferConverter((ArrayList) ((Some) option).value()).asScala()).foldLeft(Predef$.MODULE$.Map().empty(), (map, document) -> {
                    return map.updated(document.get("_id", this.keyClass()), new Snapshot(this.snapshotCodec.decode(document.get("data", Document.class)), Predef$.MODULE$.Integer2int(document.getInteger("revision")), Predef$.MODULE$.Long2long(document.getLong("tick"))));
                });
            } else {
                empty = Predef$.MODULE$.Map().empty();
            }
            return empty;
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    public Future<BoxedUnit> writeBatch(scala.collection.Map<K, Snapshot<V>> map) {
        return Future$.MODULE$.sequence((Iterable) map.map(tuple2 -> {
            if (tuple2 != null) {
                return this.write(tuple2._1(), (Snapshot) tuple2._2());
            }
            throw new MatchError((Object) null);
        }), BuildFrom$.MODULE$.buildFromIterableOps(), this.delta$mongo$MongoStreamProcessStore$$ec).map(iterable -> {
            $anonfun$writeBatch$2(iterable);
            return BoxedUnit.UNIT;
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    private Document exactly(K k, int i, long j) {
        return _id((MongoStreamProcessStore<K, V>) k).append("revision", BoxesRunTime.boxToInteger(i)).append("tick", BoxesRunTime.boxToLong(j));
    }

    public Future<BoxedUnit> refresh(K k, int i, long j) {
        return package$.MODULE$.withFutureCallback(function0 -> {
            $anonfun$refresh$1(this, i, j, k, function0);
            return BoxedUnit.UNIT;
        }).map(option -> {
            $anonfun$refresh$2(option);
            return BoxedUnit.UNIT;
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    public Future<BoxedUnit> refreshBatch(scala.collection.Map<K, Tuple2<Object, Object>> map) {
        return Future$.MODULE$.sequence((Iterable) map.map(tuple2 -> {
            if (tuple2 != null) {
                Object _1 = tuple2._1();
                Tuple2 tuple2 = (Tuple2) tuple2._2();
                if (tuple2 != null) {
                    return this.refresh(_1, tuple2._1$mcI$sp(), tuple2._2$mcJ$sp());
                }
            }
            throw new MatchError(tuple2);
        }), BuildFrom$.MODULE$.buildFromIterableOps(), this.delta$mongo$MongoStreamProcessStore$$ec).map(iterable -> {
            $anonfun$refreshBatch$2(iterable);
            return BoxedUnit.UNIT;
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    public Future<Option<Snapshot<V>>> writeIfAbsent(K k, Snapshot<V> snapshot) {
        return package$.MODULE$.withFutureCallback(function0 -> {
            $anonfun$writeIfAbsent$1(this, k, snapshot, function0);
            return BoxedUnit.UNIT;
        }).map(option -> {
            return None$.MODULE$;
        }, this.delta$mongo$MongoStreamProcessStore$$ec).recoverWith(new MongoStreamProcessStore$$anonfun$writeIfAbsent$3(this, k), this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    public Future<Option<Snapshot<V>>> writeReplacement(K k, Snapshot<V> snapshot, Snapshot<V> snapshot2) {
        return package$.MODULE$.withFutureCallback(function0 -> {
            $anonfun$writeReplacement$1(this, snapshot2, k, snapshot, function0);
            return BoxedUnit.UNIT;
        }).flatMap(option -> {
            Future map;
            boolean z = false;
            Some some = null;
            if (option instanceof Some) {
                z = true;
                some = (Some) option;
                UpdateResult updateResult = (UpdateResult) some.value();
                if (updateResult.wasAcknowledged() && updateResult.getModifiedCount() == 1) {
                    map = Future$.MODULE$.successful(None$.MODULE$);
                    return map;
                }
            }
            if (z && ((UpdateResult) some.value()).getMatchedCount() == 0) {
                map = this.read(k).map(option -> {
                    if (option instanceof Some) {
                        return (Some) option;
                    }
                    throw scala.sys.package$.MODULE$.error(new StringBuilder(58).append("Failed to replace old snapshot ").append(k).append(", because it doesn't exist!").toString());
                }, this.delta$mongo$MongoStreamProcessStore$$ec);
                return map;
            }
            if (!z) {
                throw new MatchError(option);
            }
            throw scala.sys.package$.MODULE$.error(new StringBuilder(20).append("Unexepected result: ").append((UpdateResult) some.value()).toString());
        }, this.delta$mongo$MongoStreamProcessStore$$ec);
    }

    public static final /* synthetic */ void $anonfun$tickIndexFuture$1(MongoStreamProcessStore mongoStreamProcessStore, Function0 function0) {
        mongoStreamProcessStore.coll.createIndex(new Document("tick", BoxesRunTime.boxToInteger(-1)), (SingleResultCallback) function0.apply());
    }

    public static final /* synthetic */ void $anonfun$tickWatermark$2(MongoStreamProcessStore mongoStreamProcessStore, Function0 function0) {
        mongoStreamProcessStore.coll.find().sort(new Document("tick", BoxesRunTime.boxToInteger(-1))).limit(1).projection(new Document("_id", BoxesRunTime.boxToInteger(0)).append("tick", BoxesRunTime.boxToInteger(1))).first((SingleResultCallback) function0.apply());
    }

    public static final /* synthetic */ long $anonfun$tickWatermark$4(Document document) {
        return document.getLong("tick").longValue();
    }

    public static final /* synthetic */ void $anonfun$readBatch$1(MongoStreamProcessStore mongoStreamProcessStore, Iterable iterable, Function0 function0) {
        List<K> list = (List) JavaConverters$.MODULE$.seqAsJavaListConverter(iterable.toSeq()).asJava();
        mongoStreamProcessStore.coll.find(mongoStreamProcessStore._id((List) list)).into(new ArrayList(list.size()), (SingleResultCallback) function0.apply());
    }

    public static final /* synthetic */ void $anonfun$writeBatch$2(Iterable iterable) {
    }

    public static final /* synthetic */ void $anonfun$refresh$1(MongoStreamProcessStore mongoStreamProcessStore, int i, long j, Object obj, Function0 function0) {
        UpdateOptions upsert = new UpdateOptions().upsert(false);
        mongoStreamProcessStore.coll.updateOne(mongoStreamProcessStore.where(obj, i, j), new Document("$set", new Document("revision", BoxesRunTime.boxToInteger(i)).append("tick", BoxesRunTime.boxToLong(j))), upsert, (SingleResultCallback) function0.apply());
    }

    public static final /* synthetic */ void $anonfun$refresh$2(Option option) {
    }

    public static final /* synthetic */ void $anonfun$refreshBatch$2(Iterable iterable) {
    }

    public static final /* synthetic */ void $anonfun$writeIfAbsent$1(MongoStreamProcessStore mongoStreamProcessStore, Object obj, Snapshot snapshot, Function0 function0) {
        mongoStreamProcessStore.coll.insertOne(mongoStreamProcessStore._id((MongoStreamProcessStore) obj).append("data", mongoStreamProcessStore.snapshotCodec.encode(snapshot.content())).append("revision", BoxesRunTime.boxToInteger(snapshot.revision())).append("tick", BoxesRunTime.boxToLong(snapshot.tick())), (SingleResultCallback) function0.apply());
    }

    public static final /* synthetic */ void $anonfun$writeReplacement$1(MongoStreamProcessStore mongoStreamProcessStore, Snapshot snapshot, Object obj, Snapshot snapshot2, Function0 function0) {
        mongoStreamProcessStore.coll.updateOne(mongoStreamProcessStore.exactly(obj, snapshot2.revision(), snapshot2.tick()), new Document("$set", new Document().append("data", mongoStreamProcessStore.snapshotCodec.encode(snapshot.content())).append("revision", BoxesRunTime.boxToInteger(snapshot.revision())).append("tick", BoxesRunTime.boxToLong(snapshot.tick()))), (SingleResultCallback) function0.apply());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MongoStreamProcessStore(Codec<V, Document> codec, MongoCollection<Document> mongoCollection, ClassTag<K> classTag, ExecutionContext executionContext) {
        super(codec, mongoCollection, classTag, executionContext);
        this.snapshotCodec = codec;
        this.coll = mongoCollection;
        this.delta$mongo$MongoStreamProcessStore$$ec = executionContext;
        RecursiveUpsert.$init$(this);
        NonBlockingRecursiveUpsert.$init$(this);
        NonBlockingCASWrites.$init$(this);
        this.tickIndexFuture = package$.MODULE$.withFutureCallback(function0 -> {
            $anonfun$tickIndexFuture$1(this, function0);
            return BoxedUnit.UNIT;
        });
    }
}
