package delta.redis;

import delta.Snapshot;
import delta.process.AsyncCodec;
import delta.process.BlockingCASWrites;
import delta.process.BlockingRecursiveUpsert;
import delta.process.RecursiveUpsert;
import delta.process.StreamProcessStore;
import delta.process.Update;
import delta.process.UpdateCodec;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import redis.clients.jedis.BinaryJedis;
import redis.clients.jedis.Transaction;
import scala.Array$;
import scala.Function1;
import scala.Function2;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.StringOps$;
import scala.collection.immutable.HashMap$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.Either;
import scuff.Codec;
import scuff.package$;
import scuff.package$ScuffByteArray$;
import scuff.package$ScuffString$;

/* compiled from: BinaryRedisStreamProcessStore.scala */
@ScalaSignature(bytes = "\u0006\u0005\t5c\u0001B\r\u001b\u0001}A\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t5\u0002\u0011\t\u0011)A\u00057\"A\u0001\r\u0001B\u0001B\u0003%\u0011\r\u0003\u0005j\u0001\t\u0005\t\u0015!\u0003k\u0011!\u0001\bA!A!\u0002\u0013\t\b\u0002C;\u0001\u0005\u000b\u0007I1\u0003<\t\u0011i\u0004!\u0011!Q\u0001\n]DQa\u001f\u0001\u0005\u0002qDq!!\u0004\u0001\t\u0003\ty\u0001C\u0004\u0002\u0012\u0001!I!a\u0005\t\u0011\u0005m\u0002\u0001)A\u0005\u0003{Aq!a\u0013\u0001\t\u0013\ti\u0005\u0003\u0005\u0002p\u0001\u0001\u000b\u0011BA\u0013\u0011\u001d\t\t\b\u0001C\u0001\u0003gBq!a\u001f\u0001\t\u0003\ni\bC\u0004\u0002\u0016\u0002!\t!a&\t\u000f\u0005m\u0006\u0001\"\u0001\u0002>\"9\u00111\u0019\u0001\u0005\u0012\u0005\u0015\u0007bBAo\u0001\u0011\u0005\u0011q\u001c\u0005\b\u0003O\u0004A\u0011BAu\u0011\u001d\t9\u000f\u0001C\u0001\u0005\u0017AqA!\u0005\u0001\t#\u0011\u0019\u0002C\u0004\u00030\u0001!\tB!\r\t\u000f\tm\u0002\u0001\"\u0005\u0003>\ti\")\u001b8bef\u0014V\rZ5t'R\u0014X-Y7Qe>\u001cWm]:Ti>\u0014XM\u0003\u0002\u001c9\u0005)!/\u001a3jg*\tQ$A\u0003eK2$\u0018m\u0001\u0001\u0016\t\u0001:C'P\n\u0005\u0001\u00052t\b\u0005\u0003#G\u0015\u001aT\"\u0001\u000e\n\u0005\u0011R\"\u0001\u0007\"j]\u0006\u0014\u0018PU3eSN\u001cf.\u00199tQ>$8\u000b^8sKB\u0011ae\n\u0007\u0001\t\u0015A\u0003A1\u0001*\u0005\u0005Y\u0015C\u0001\u00161!\tYc&D\u0001-\u0015\u0005i\u0013!B:dC2\f\u0017BA\u0018-\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"aK\u0019\n\u0005Ib#aA!osB\u0011a\u0005\u000e\u0003\u0006k\u0001\u0011\r!\u000b\u0002\u0002)B)qGO\u00134y5\t\u0001H\u0003\u0002:9\u00059\u0001O]8dKN\u001c\u0018BA\u001e9\u0005I\u0019FO]3b[B\u0013xnY3tgN#xN]3\u0011\u0005\u0019jD!\u0002 \u0001\u0005\u0004I#!A+\u0011\r]\u0002Ue\r\u001fC\u0013\t\t\u0005HA\tCY>\u001c7.\u001b8h\u0007\u0006\u001bvK]5uKN\u0004\"aQ%\u000e\u0003\u0011S!!\u0012$\u0002\u000b),G-[:\u000b\u0005\u001dC\u0015aB2mS\u0016tGo\u001d\u0006\u00027%\u0011!\n\u0012\u0002\f\u0005&t\u0017M]=KK\u0012L7/\u0001\u0005lKf\u001cu\u000eZ3d!\riu+\n\b\u0003\u001dRs!a\u0014*\u000e\u0003AS!!\u0015\u0010\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0019\u0016!B:dk\u001a4\u0017BA+W\u0003\u001d\u0001\u0018mY6bO\u0016T\u0011aU\u0005\u00031f\u0013!bU3sS\u0006d\u0017N_3s\u0015\t)f+A\u0007t]\u0006\u00048\u000f[8u\u0007>$Wm\u0019\t\u0004\u001b^c\u0006cA/_g5\tA$\u0003\u0002`9\tA1K\\1qg\"|G/\u0001\u0005iCNDg*Y7f!\t\u0011gM\u0004\u0002dIB\u0011q\nL\u0005\u0003K2\na\u0001\u0015:fI\u00164\u0017BA4i\u0005\u0019\u0019FO]5oO*\u0011Q\rL\u0001\fE2|7m[5oO\u000e#\b\u0010\u0005\u0002l]6\tAN\u0003\u0002nY\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005=d'\u0001E#yK\u000e,H/[8o\u0007>tG/\u001a=u\u00035QW\rZ5t!J|g/\u001b3feB!1F\u001d;1\u0013\t\u0019HFA\u0005Gk:\u001cG/[8ocA!1F\u001d\"1\u0003-)\b\u000fZ1uK\u000e{G-Z2\u0016\u0003]\u0004Ba\u000e=4y%\u0011\u0011\u0010\u000f\u0002\f+B$\u0017\r^3D_\u0012,7-\u0001\u0007va\u0012\fG/Z\"pI\u0016\u001c\u0007%\u0001\u0004=S:LGO\u0010\u000b\n{\u0006\u0015\u0011qAA\u0005\u0003\u0017!2A`A\u0002)\ry\u0018\u0011\u0001\t\u0006E\u0001)3\u0007\u0010\u0005\u0006k\"\u0001\u001da\u001e\u0005\u0006a\"\u0001\r!\u001d\u0005\u0006\u0017\"\u0001\r\u0001\u0014\u0005\u00065\"\u0001\ra\u0017\u0005\u0006A\"\u0001\r!\u0019\u0005\u0006S\"\u0001\rA[\u0001\u0005]\u0006lW-F\u0001b\u00031qWm\u001e&V\u0011\u0006\u001c\b.T1q)\u0011\t)\"!\r\u0011\u0011\u0005]\u0011\u0011EA\u0013\u0003Ki!!!\u0007\u000b\t\u0005m\u0011QD\u0001\u0005kRLGN\u0003\u0002\u0002 \u0005!!.\u0019<b\u0013\u0011\t\u0019#!\u0007\u0003\u000f!\u000b7\u000f['baB)1&a\n\u0002,%\u0019\u0011\u0011\u0006\u0017\u0003\u000b\u0005\u0013(/Y=\u0011\u0007-\ni#C\u0002\u000201\u0012AAQ=uK\"9\u00111\u0007\u0006A\u0002\u0005U\u0012\u0001B:ju\u0016\u00042aKA\u001c\u0013\r\tI\u0004\f\u0002\u0004\u0013:$\u0018a\u00047bgR$\u0016nY6Xe&$H/\u001a8\u0011\t\u0005}\u0012qI\u0007\u0003\u0003\u0003RA!a\u0011\u0002F\u00051\u0011\r^8nS\u000eT1!\\A\r\u0013\u0011\tI%!\u0011\u0003\u0015\u0005#x.\\5d\u0019>tw-A\u000fva\u0012\fG/\u001a'bgR$\u0016nY6Xe&$H/\u001a8JM:+W\rZ3e)\u0011\ty%!\u0016\u0011\u0007-\n\t&C\u0002\u0002T1\u0012A!\u00168ji\"9\u0011q\u000b\u0007A\u0002\u0005e\u0013a\u00028foRK7m\u001b\t\u0005\u00037\ni&D\u0001\u0001\u0013\r\tyF\u000f\u0002\u0005)&\u001c7\u000eK\u0002\r\u0003G\u0002B!!\u001a\u0002l5\u0011\u0011q\r\u0006\u0004\u0003Sb\u0013AC1o]>$\u0018\r^5p]&!\u0011QNA4\u0005\u001d!\u0018-\u001b7sK\u000e\fq\u0001V5dW.+\u00170A\u0007uS\u000e\\w+\u0019;fe6\f'o[\u000b\u0003\u0003k\u0002RaKA<\u00033J1!!\u001f-\u0005\u0019y\u0005\u000f^5p]\u0006)qO]5uKR1\u0011qPAC\u0003\u0013\u0003Ra[AA\u0003\u001fJ1!a!m\u0005\u00191U\u000f^;sK\"1\u0011qQ\bA\u0002\u0015\n1a[3z\u0011\u001d\tYi\u0004a\u0001\u0003\u001b\u000b\u0001b\u001d8baNDw\u000e\u001e\t\u0005\u00037\ny)C\u0002`\u0003#K1!a%\u001d\u00059\u0019f.\u00199tQ>$(+Z1eKJ\f\u0011B]3bI\n\u000bGo\u00195\u0015\t\u0005e\u0015q\u0015\t\u0006W\u0006\u0005\u00151\u0014\t\b\u0003;\u000b\u0019+JAG\u001b\t\tyJC\u0002\u0002\"2\n!bY8mY\u0016\u001cG/[8o\u0013\u0011\t)+a(\u0003\u00075\u000b\u0007\u000fC\u0004\u0002*B\u0001\r!a+\u0002\t-,\u0017p\u001d\t\u0006\u0003[\u000b),\n\b\u0005\u0003_\u000b\u0019LD\u0002P\u0003cK\u0011!L\u0005\u0003+2JA!a.\u0002:\nA\u0011\n^3sC\ndWM\u0003\u0002VY\u0005QqO]5uK\n\u000bGo\u00195\u0015\t\u0005}\u0014q\u0018\u0005\b\u0003\u0003\f\u0002\u0019AAN\u0003%\u0019h.\u00199tQ>$8/\u0001\u0006sK\u001a\u0014Xm\u001d5LKf$B!a2\u0002ZRA\u0011qJAe\u0003\u0017\f)\u000e\u0003\u0004\u0002\bJ\u0001\r!\n\u0005\b\u0003\u001b\u0014\u0002\u0019AAh\u0003!\u0011XM^5tS>t\u0007\u0003BA.\u0003#L1!a5;\u0005!\u0011VM^5tS>t\u0007bBAl%\u0001\u0007\u0011\u0011L\u0001\u0005i&\u001c7\u000e\u0003\u0004\u0002\\J\u0001\rAQ\u0001\u0005G>tg.A\u0004sK\u001a\u0014Xm\u001d5\u0015\u0011\u0005}\u0014\u0011]Ar\u0003KDa!a\"\u0014\u0001\u0004)\u0003bBAg'\u0001\u0007\u0011q\u001a\u0005\b\u0003/\u001c\u0002\u0019AA-\u00031\u0011XM\u001a:fg\"\u0014\u0015\r^2i)\u0011\tYO!\u0003\u0015\r\u0005=\u0013Q^A|\u0011\u001d\ty\u000f\u0006a\u0001\u0003c\fqAY5o\u0017\u0016L8\u000f\u0005\u0004\u0002.\u0006M\u0018QE\u0005\u0005\u0003k\fILA\u0002TKFDq!!?\u0015\u0001\u0004\tY0A\u0005sKZL7/[8ogB1\u0011QVAz\u0003{\u0004raKA��\u0003k\u0011\u0019!C\u0002\u0003\u00021\u0012a\u0001V;qY\u0016\u0014\u0004cA\u0016\u0003\u0006%\u0019!q\u0001\u0017\u0003\t1{gn\u001a\u0005\u0007\u00037$\u0002\u0019\u0001\"\u0015\t\u0005}$Q\u0002\u0005\b\u0003s,\u0002\u0019\u0001B\b!\u001d\ti*a)&\u0003{\fQB]3bI\u001a{'/\u00169eCR,W\u0003\u0002B\u000b\u0005;!BAa\u0006\u0003.Q!!\u0011\u0004B\u0011!\u0015Y\u0017\u0011\u0011B\u000e!\r1#Q\u0004\u0003\u0007\u0005?1\"\u0019A\u0015\u0003\u0003ICqAa\t\u0017\u0001\u0004\u0011)#A\u0006va\u0012\fG/\u001a+ik:\\\u0007\u0003C\u0016\u0003(\t\u0013YCa\u0007\n\u0007\t%BFA\u0005Gk:\u001cG/[8oeA)1&a\u001e\u0002\u000e\"1\u0011q\u0011\fA\u0002\u0015\nQb\u001e:ji\u0016Le-\u00112tK:$H\u0003\u0002B\u001a\u0005s!bAa\u000b\u00036\t]\u0002BBAD/\u0001\u0007Q\u0005C\u0004\u0002\f^\u0001\r!!$\t\r\u0005mw\u00031\u0001C\u0003A9(/\u001b;f%\u0016\u0004H.Y2f[\u0016tG\u000f\u0006\u0003\u0003@\t-C\u0003\u0003B\u0016\u0005\u0003\u0012\u0019Ea\u0012\t\r\u0005\u001d\u0005\u00041\u0001&\u0011\u001d\u0011)\u0005\u0007a\u0001\u0003\u001b\u000b1b\u001c7e':\f\u0007o\u001d5pi\"9!\u0011\n\rA\u0002\u00055\u0015a\u00038foNs\u0017\r]:i_RDa!a7\u0019\u0001\u0004\u0011\u0005")
/* loaded from: input_file:delta/redis/BinaryRedisStreamProcessStore.class */
public class BinaryRedisStreamProcessStore<K, T, U> extends BinaryRedisSnapshotStore<K, T> implements StreamProcessStore<K, T, U>, BlockingCASWrites<K, T, U, BinaryJedis> {
    private final Codec<K, byte[]> keyCodec;
    private final Codec<Snapshot<T>, byte[]> snapshotCodec;
    private final String hashName;
    private final ExecutionContext blockingCtx;
    private final UpdateCodec<T, U> updateCodec;
    private final AtomicLong lastTickWritten;
    private final byte[] TickKey;
    private FiniteDuration updateThunkTimeout;

    public <R> Future<Tuple2<Option<Update<U>>, R>> upsert(K k, Function1<Option<Snapshot<T>>, Future<Tuple2<Option<Snapshot<T>>, R>>> function1, ExecutionContext executionContext) {
        return BlockingCASWrites.upsert$(this, k, function1, executionContext);
    }

    public final <R> Tuple2<Option<Update<U>>, R> upsertRecursive(K k, Option<Snapshot<T>> option, Function1<Option<Snapshot<T>>, Future<Tuple2<Option<Snapshot<T>>, R>>> function1, FiniteDuration finiteDuration, Function3<K, Option<Snapshot<T>>, Snapshot<T>, Either<Snapshot<T>, Object>> function3, int i, ExecutionContext executionContext, UpdateCodec<T, U> updateCodec) {
        return BlockingRecursiveUpsert.upsertRecursive$(this, k, option, function1, finiteDuration, function3, i, executionContext, updateCodec);
    }

    public int upsertRetryLimit() {
        return RecursiveUpsert.upsertRetryLimit$(this);
    }

    public Nothing$ retriesExhausted(K k, Snapshot<Object> snapshot, Snapshot<Object> snapshot2) {
        return RecursiveUpsert.retriesExhausted$(this, k, snapshot, snapshot2);
    }

    public String toString() {
        return StreamProcessStore.toString$(this);
    }

    public <W> StreamProcessStore<K, W, U> adaptState(Codec<W, T> codec) {
        return StreamProcessStore.adaptState$(this, codec);
    }

    public <W> StreamProcessStore<K, W, U> adaptState(AsyncCodec<W, T> asyncCodec, ExecutionContext executionContext) {
        return StreamProcessStore.adaptState$(this, asyncCodec, executionContext);
    }

    public <W> StreamProcessStore<W, T, U> adaptKeys(Codec<W, K> codec) {
        return StreamProcessStore.adaptKeys$(this, codec);
    }

    public <WK, W> StreamProcessStore<WK, W, U> adapt(Codec<WK, K> codec, AsyncCodec<W, T> asyncCodec, ExecutionContext executionContext) {
        return StreamProcessStore.adapt$(this, codec, asyncCodec, executionContext);
    }

    public FiniteDuration updateThunkTimeout() {
        return this.updateThunkTimeout;
    }

    public void delta$process$BlockingCASWrites$_setter_$updateThunkTimeout_$eq(FiniteDuration finiteDuration) {
        this.updateThunkTimeout = finiteDuration;
    }

    public UpdateCodec<T, U> updateCodec() {
        return this.updateCodec;
    }

    public String name() {
        return this.hashName;
    }

    private HashMap<byte[], byte[]> newJUHashMap(int i) {
        return new HashMap<>((int) (i / 0.67f), 0.67f);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateLastTickWrittenIfNeeded(long j) {
        while (true) {
            long j2 = this.lastTickWritten.get();
            if (j <= j2 || this.lastTickWritten.compareAndSet(j2, j)) {
                return;
            } else {
                j = j;
            }
        }
    }

    public Option<Object> tickWatermark() {
        Option<Object> option = (Option) jedis(binaryJedis -> {
            return Option$.MODULE$.apply(binaryJedis.hget(this.hash(), this.TickKey)).map(bArr -> {
                return BoxesRunTime.boxToLong($anonfun$tickWatermark$2(bArr));
            });
        });
        option.foreach(j -> {
            this.updateLastTickWrittenIfNeeded(j);
        });
        return option;
    }

    @Override // delta.redis.BinaryRedisSnapshotStore
    public Future<BoxedUnit> write(K k, Snapshot<T> snapshot) {
        return snapshot.tick() <= this.lastTickWritten.get() ? super.write(k, snapshot) : Future$.MODULE$.apply(() -> {
            this.pipeline(this.pipeline$default$1(), pipeline -> {
                this.write(pipeline, k, snapshot);
                return pipeline.hset(this.hash(), this.TickKey, package$ScuffString$.MODULE$.utf8$extension(package$.MODULE$.ScuffString(BoxesRunTime.boxToLong(snapshot.tick()).toString())));
            });
            this.updateLastTickWrittenIfNeeded(snapshot.tick());
        }, this.blockingCtx);
    }

    public Future<Map<K, Snapshot<T>>> readBatch(Iterable<K> iterable) {
        return Future$.MODULE$.apply(() -> {
            Seq seq = iterable.toSeq();
            Seq seq2 = (Seq) seq.map(obj -> {
                return (byte[]) this.keyCodec.encode(obj);
            });
            return (scala.collection.immutable.HashMap) CollectionConverters$.MODULE$.IteratorHasAsScala(((List) this.jedis(binaryJedis -> {
                return binaryJedis.hmget(this.hash(), (byte[][]) seq2.toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))));
            })).iterator()).asScala().zip(seq.iterator()).filter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$readBatch$4(tuple2));
            }).foldLeft(HashMap$.MODULE$.empty(), (hashMap, tuple22) -> {
                Tuple2 tuple22 = new Tuple2(hashMap, tuple22);
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                return hashMap.updated(tuple22._2(), this.snapshotCodec.decode((byte[]) tuple22._1()));
            });
        }, this.blockingCtx);
    }

    public Future<BoxedUnit> writeBatch(Map<K, Snapshot<T>> map) {
        return map.isEmpty() ? Future$.MODULE$.unit() : Future$.MODULE$.apply(() -> {
            Tuple2 tuple2 = (Tuple2) map.iterator().foldLeft(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.newJUHashMap(map.size())), BoxesRunTime.boxToLong(Long.MIN_VALUE)), (tuple22, tuple23) -> {
                Tuple2 tuple22 = new Tuple2(tuple22, tuple23);
                if (tuple22 != null) {
                    HashMap hashMap = (HashMap) tuple22._1();
                    long _2$mcJ$sp = tuple22._2$mcJ$sp();
                    if (tuple23 != null) {
                        Object _1 = tuple23._1();
                        Snapshot snapshot = (Snapshot) tuple23._2();
                        hashMap.put(this.keyCodec.encode(_1), this.snapshotCodec.encode(snapshot));
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(hashMap), BoxesRunTime.boxToLong(RichLong$.MODULE$.max$extension(Predef$.MODULE$.longWrapper(_2$mcJ$sp), snapshot.tick())));
                    }
                }
                throw new MatchError(tuple22);
            });
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            HashMap hashMap = (HashMap) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            if (_2$mcJ$sp <= this.lastTickWritten.get()) {
                this.jedis(binaryJedis -> {
                    $anonfun$writeBatch$4(this, hashMap, binaryJedis);
                    return BoxedUnit.UNIT;
                });
            } else {
                this.pipeline(this.pipeline$default$1(), pipeline -> {
                    pipeline.hmset(this.hash(), hashMap);
                    return pipeline.hset(this.hash(), this.TickKey, package$ScuffString$.MODULE$.utf8$extension(package$.MODULE$.ScuffString(BoxesRunTime.boxToLong(_2$mcJ$sp).toString())));
                });
                this.updateLastTickWrittenIfNeeded(_2$mcJ$sp);
            }
        }, this.blockingCtx);
    }

    public void refreshKey(BinaryJedis binaryJedis, K k, int i, long j) {
        byte[] bArr = (byte[]) this.keyCodec.encode(k);
        Snapshot snapshot = (Snapshot) this.snapshotCodec.decode(binaryJedis.hget(hash(), bArr));
        binaryJedis.hset(hash(), bArr, (byte[]) this.snapshotCodec.encode(snapshot.copy(snapshot.copy$default$1(), i, j)));
    }

    public Future<BoxedUnit> refresh(K k, int i, long j) {
        return Future$.MODULE$.apply(() -> {
            this.jedis(binaryJedis -> {
                this.refreshKey(binaryJedis, (BinaryJedis) k, i, j);
                return BoxedUnit.UNIT;
            });
        }, this.blockingCtx);
    }

    private void refreshBatch(BinaryJedis binaryJedis, Seq<byte[]> seq, Seq<Tuple2<Object, Object>> seq2) {
        while (true) {
            binaryJedis.watch((byte[][]) seq.toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))));
            Tuple2 tuple2 = (Tuple2) CollectionConverters$.MODULE$.IteratorHasAsScala(binaryJedis.hmget(hash(), (byte[][]) seq.toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)))).iterator()).asScala().zip(seq.iterator()).zip(seq2.iterator()).filter(tuple22 -> {
                return BoxesRunTime.boxToBoolean($anonfun$refreshBatch$1(tuple22));
            }).foldLeft(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(newJUHashMap(seq2.size())), BoxesRunTime.boxToLong(Long.MIN_VALUE)), (tuple23, tuple24) -> {
                Tuple2 tuple23 = new Tuple2(tuple23, tuple24);
                if (tuple23 != null) {
                    HashMap hashMap = (HashMap) tuple23._1();
                    long _2$mcJ$sp = tuple23._2$mcJ$sp();
                    if (tuple24 != null) {
                        Tuple2 tuple24 = (Tuple2) tuple24._1();
                        Tuple2 tuple25 = (Tuple2) tuple24._2();
                        if (tuple24 != null) {
                            byte[] bArr = (byte[]) tuple24._1();
                            byte[] bArr2 = (byte[]) tuple24._2();
                            if (tuple25 != null) {
                                int _1$mcI$sp = tuple25._1$mcI$sp();
                                long _2$mcJ$sp2 = tuple25._2$mcJ$sp();
                                Snapshot snapshot = (Snapshot) this.snapshotCodec.decode(bArr);
                                if (_1$mcI$sp > snapshot.revision() || _2$mcJ$sp2 > snapshot.tick()) {
                                    hashMap.put(bArr2, this.snapshotCodec.encode(snapshot.copy(snapshot.copy$default$1(), _1$mcI$sp, _2$mcJ$sp2)));
                                }
                                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(hashMap), BoxesRunTime.boxToLong(RichLong$.MODULE$.max$extension(Predef$.MODULE$.longWrapper(_2$mcJ$sp), snapshot.tick())));
                            }
                        }
                    }
                }
                throw new MatchError(tuple23);
            });
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            HashMap hashMap = (HashMap) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            if (hashMap.isEmpty()) {
                binaryJedis.unwatch();
                return;
            }
            boolean z = _2$mcJ$sp > this.lastTickWritten.get();
            if (transaction(binaryJedis, transaction -> {
                $anonfun$refreshBatch$3(this, hashMap, z, _2$mcJ$sp, transaction);
                return BoxedUnit.UNIT;
            })) {
                if (z) {
                    updateLastTickWrittenIfNeeded(_2$mcJ$sp);
                    return;
                }
                return;
            } else {
                seq2 = seq2;
                seq = seq;
                binaryJedis = binaryJedis;
            }
        }
    }

    public Future<BoxedUnit> refreshBatch(Map<K, Tuple2<Object, Object>> map) {
        if (map.isEmpty()) {
            return Future$.MODULE$.unit();
        }
        ObjectRef create = ObjectRef.create(Nil$.MODULE$);
        ObjectRef create2 = ObjectRef.create(Nil$.MODULE$);
        map.foreach(tuple2 -> {
            $anonfun$refreshBatch$4(this, create, create2, tuple2);
            return BoxedUnit.UNIT;
        });
        return Future$.MODULE$.apply(() -> {
            this.jedis(binaryJedis -> {
                $anonfun$refreshBatch$6(this, create, create2, binaryJedis);
                return BoxedUnit.UNIT;
            });
        }, this.blockingCtx);
    }

    public <R> Future<R> readForUpdate(K k, Function2<BinaryJedis, Option<Snapshot<T>>, R> function2) {
        return Future$.MODULE$.apply(() -> {
            return this.jedis(binaryJedis -> {
                byte[] bArr = (byte[]) this.keyCodec.encode(k);
                binaryJedis.watch((byte[][]) new byte[]{bArr});
                return function2.apply(binaryJedis, this.read(binaryJedis, bArr));
            });
        }, this.blockingCtx);
    }

    public Option<Snapshot<T>> writeIfAbsent(BinaryJedis binaryJedis, K k, Snapshot<T> snapshot) {
        byte[] bArr = (byte[]) this.keyCodec.encode(k);
        int intValue = binaryJedis.hsetnx(hash(), bArr, (byte[]) this.snapshotCodec.encode(snapshot)).intValue();
        switch (intValue) {
            case 0:
                return read(binaryJedis, bArr);
            case 1:
                return None$.MODULE$;
            default:
                throw new MatchError(BoxesRunTime.boxToInteger(intValue));
        }
    }

    /* JADX WARN: Type inference failed for: r1v4, types: [byte[], byte[][]] */
    public Option<Snapshot<T>> writeReplacement(BinaryJedis binaryJedis, K k, Snapshot<T> snapshot, Snapshot<T> snapshot2) {
        byte[] bArr = (byte[]) this.keyCodec.encode(k);
        byte[] bArr2 = (byte[]) this.snapshotCodec.encode(snapshot2);
        if (transaction(binaryJedis, transaction -> {
            $anonfun$writeReplacement$1(this, bArr, bArr2, transaction);
            return BoxedUnit.UNIT;
        })) {
            return None$.MODULE$;
        }
        binaryJedis.watch((byte[][]) new byte[]{bArr});
        return read(binaryJedis, bArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ Option writeReplacement(Object obj, Object obj2, Snapshot snapshot, Snapshot snapshot2) {
        return writeReplacement((BinaryJedis) obj, (BinaryJedis) obj2, snapshot, snapshot2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ Option writeIfAbsent(Object obj, Object obj2, Snapshot snapshot) {
        return writeIfAbsent((BinaryJedis) obj, (BinaryJedis) obj2, snapshot);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ void refreshKey(Object obj, Object obj2, int i, long j) {
        refreshKey((BinaryJedis) obj, (BinaryJedis) obj2, i, j);
    }

    public static final /* synthetic */ long $anonfun$tickWatermark$2(byte[] bArr) {
        return StringOps$.MODULE$.toLong$extension(Predef$.MODULE$.augmentString(package$ScuffByteArray$.MODULE$.utf8$extension(package$.MODULE$.ScuffByteArray(bArr))));
    }

    public static final /* synthetic */ boolean $anonfun$readBatch$4(Tuple2 tuple2) {
        return tuple2._1() != null;
    }

    public static final /* synthetic */ void $anonfun$writeBatch$4(BinaryRedisStreamProcessStore binaryRedisStreamProcessStore, HashMap hashMap, BinaryJedis binaryJedis) {
        binaryJedis.hmset(binaryRedisStreamProcessStore.hash(), hashMap);
    }

    public static final /* synthetic */ boolean $anonfun$refreshBatch$1(Tuple2 tuple2) {
        return ((Tuple2) tuple2._1())._1() != null;
    }

    public static final /* synthetic */ void $anonfun$refreshBatch$3(BinaryRedisStreamProcessStore binaryRedisStreamProcessStore, HashMap hashMap, boolean z, long j, Transaction transaction) {
        transaction.hmset(binaryRedisStreamProcessStore.hash(), hashMap);
        if (z) {
            transaction.hset(binaryRedisStreamProcessStore.hash(), binaryRedisStreamProcessStore.TickKey, package$ScuffString$.MODULE$.utf8$extension(package$.MODULE$.ScuffString(BoxesRunTime.boxToLong(j).toString())));
        }
    }

    public static final /* synthetic */ void $anonfun$refreshBatch$4(BinaryRedisStreamProcessStore binaryRedisStreamProcessStore, ObjectRef objectRef, ObjectRef objectRef2, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Object _1 = tuple2._1();
        Tuple2 tuple22 = (Tuple2) tuple2._2();
        objectRef.elem = ((scala.collection.immutable.List) objectRef.elem).$colon$colon(binaryRedisStreamProcessStore.keyCodec.encode(_1));
        objectRef2.elem = ((scala.collection.immutable.List) objectRef2.elem).$colon$colon(tuple22);
    }

    public static final /* synthetic */ void $anonfun$refreshBatch$6(BinaryRedisStreamProcessStore binaryRedisStreamProcessStore, ObjectRef objectRef, ObjectRef objectRef2, BinaryJedis binaryJedis) {
        binaryRedisStreamProcessStore.refreshBatch(binaryJedis, (scala.collection.immutable.List) objectRef.elem, (scala.collection.immutable.List) objectRef2.elem);
    }

    public static final /* synthetic */ void $anonfun$writeReplacement$1(BinaryRedisStreamProcessStore binaryRedisStreamProcessStore, byte[] bArr, byte[] bArr2, Transaction transaction) {
        transaction.hset(binaryRedisStreamProcessStore.hash(), bArr, bArr2);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BinaryRedisStreamProcessStore(Codec<K, byte[]> codec, Codec<Snapshot<T>, byte[]> codec2, String str, ExecutionContext executionContext, Function1<Function1<BinaryJedis, Object>, Object> function1, UpdateCodec<T, U> updateCodec) {
        super(codec, codec2, str, executionContext, function1);
        this.keyCodec = codec;
        this.snapshotCodec = codec2;
        this.hashName = str;
        this.blockingCtx = executionContext;
        this.updateCodec = updateCodec;
        StreamProcessStore.$init$(this);
        RecursiveUpsert.$init$(this);
        BlockingRecursiveUpsert.$init$(this);
        BlockingCASWrites.$init$(this);
        this.lastTickWritten = new AtomicLong(Long.MIN_VALUE);
        this.TickKey = (byte[]) Array$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapByteArray(new byte[]{116}), ClassTag$.MODULE$.Byte());
        Statics.releaseFence();
    }
}
