package scuff.concurrent;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import scala.Function1;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import scala.util.control.NonFatal$;
import scuff.StreamConsumer;

/* compiled from: AsyncStreamConsumer.scala */
@ScalaSignature(bytes = "\u0006\u0001I4\u0001\"\u0001\u0002\u0011\u0002\u0007\u0005q!\u001a\u0002\u0014\u0003NLhnY*ue\u0016\fWnQ8ogVlWM\u001d\u0006\u0003\u0007\u0011\t!bY8oGV\u0014(/\u001a8u\u0015\u0005)\u0011!B:dk\u001a47\u0001A\u000b\u0004\u0011U!3c\u0001\u0001\n\u001fA\u0011!\"D\u0007\u0002\u0017)\tA\"A\u0003tG\u0006d\u0017-\u0003\u0002\u000f\u0017\t1\u0011I\\=SK\u001a\u0004B\u0001E\t\u0014=5\tA!\u0003\u0002\u0013\t\tq1\u000b\u001e:fC6\u001cuN\\:v[\u0016\u0014\bC\u0001\u000b\u0016\u0019\u0001!aA\u0006\u0001\t\u0006\u00049\"!\u0001+\u0012\u0005aY\u0002C\u0001\u0006\u001a\u0013\tQ2BA\u0004O_RD\u0017N\\4\u0011\u0005)a\u0012BA\u000f\f\u0005\r\te.\u001f\t\u0004?\u0005\u001aS\"\u0001\u0011\u000b\u0005\rY\u0011B\u0001\u0012!\u0005\u00191U\u000f^;sKB\u0011A\u0003\n\u0003\u0007K\u0001!)\u0019A\f\u0003\u0003ICQa\n\u0001\u0005\u0002!\na\u0001J5oSR$C#A\u0015\u0011\u0005)Q\u0013BA\u0016\f\u0005\u0011)f.\u001b;\t\u000b5\u0002a\u0011\u0003\u0018\u0002#\r|W\u000e\u001d7fi&|g\u000eV5nK>,H/F\u00010!\t\u00014'D\u00012\u0015\t\u0011\u0004%\u0001\u0005ekJ\fG/[8o\u0013\t!\u0014G\u0001\bGS:LG/\u001a#ve\u0006$\u0018n\u001c8\t\u000bY\u0002a\u0011C\u001c\u0002\u0011]DWM\u001c#p]\u0016$\u0012A\b\u0005\bs\u0001\u0011\r\u0015\"\u0003;\u0003%\u0019X-\\1qQ>\u0014X-F\u0001<!\ta$)D\u0001>\u0015\t\u0019aH\u0003\u0002@\u0001\u0006!Q\u000f^5m\u0015\u0005\t\u0015\u0001\u00026bm\u0006L!aQ\u001f\u0003\u0013M+W.\u00199i_J,\u0007bB#\u0001\u0005\u0004&IAR\u0001\u0006KJ\u0014xN]\u000b\u0002\u000fB\u0019\u0001jS'\u000e\u0003%S!AS\u001f\u0002\r\u0005$x.\\5d\u0013\ta\u0015JA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\tqeK\u0004\u0002P):\u0011\u0001kU\u0007\u0002#*\u0011!KB\u0001\u0007yI|w\u000e\u001e \n\u00031I!!V\u0006\u0002\u000fA\f7m[1hK&\u0011q\u000b\u0017\u0002\n)\"\u0014xn^1cY\u0016T!!V\u0006\t\u000bi\u0003A\u0011A.\u0002\r=tg*\u001a=u)\tIC\fC\u0003^3\u0002\u00071#A\u0001u\u0011\u0015y\u0006\u0001\"\u0001a\u0003\u001dyg.\u0012:s_J$\"!K1\t\u000b\tt\u0006\u0019A'\u0002\u0005QD\u0007\"\u00023\u0001\t\u00039\u0014AB8o\t>tWME\u0002gQ*4Aa\u001a\u0001\u0001K\naAH]3gS:,W.\u001a8u}A!\u0011\u000eA\n$\u001b\u0005\u0011\u0001\u0003\u0002\u0006l'5L!\u0001\\\u0006\u0003\u0013\u0019+hn\u0019;j_:\f\u0004G\u00018q!\ry\u0012e\u001c\t\u0003)A$\u0011\"\u001d\u0001\u0002\u0002\u0003\u0005)\u0011A\f\u0003\u0007}#\u0013\u0007")
/* loaded from: input_file:scuff/concurrent/AsyncStreamConsumer.class */
public interface AsyncStreamConsumer<T, R> extends StreamConsumer<T, Future<R>> {
    void scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$semaphore_$eq(Semaphore semaphore);

    void scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$error_$eq(AtomicReference<Throwable> atomicReference);

    FiniteDuration completionTimeout();

    Future<R> whenDone();

    Semaphore scuff$concurrent$AsyncStreamConsumer$$semaphore();

    AtomicReference<Throwable> scuff$concurrent$AsyncStreamConsumer$$error();

    @Override // scuff.StreamConsumer
    default void onNext(T t) {
        Future failed;
        try {
            failed = (Future) ((Function1) this).apply(t);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            failed = Future$.MODULE$.failed((Throwable) unapply.get());
        }
        Future future = failed;
        if (!future.isCompleted()) {
            scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire();
            future.onComplete(r4 -> {
                $anonfun$onNext$1(this, r4);
                return BoxedUnit.UNIT;
            }, Threads$.MODULE$.PiggyBack());
        }
        future.failed().foreach(th2 -> {
            this.onError(th2);
            return BoxedUnit.UNIT;
        }, Threads$.MODULE$.PiggyBack());
    }

    @Override // scuff.StreamConsumer
    default void onError(Throwable th) {
        scuff$concurrent$AsyncStreamConsumer$$error().weakCompareAndSet(null, th);
    }

    @Override // scuff.StreamConsumer
    default Future<R> onDone() {
        Future<T> newBlockingThread;
        boolean z = false;
        Throwable th = scuff$concurrent$AsyncStreamConsumer$$error().get();
        if (th == null) {
            z = true;
            if (scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire(Integer.MAX_VALUE)) {
                newBlockingThread = Future$.MODULE$.successful(BoxedUnit.UNIT);
                return newBlockingThread.flatMap(boxedUnit -> {
                    return this.whenDone();
                }, Threads$.MODULE$.PiggyBack());
            }
        }
        newBlockingThread = z ? Threads$.MODULE$.newBlockingThread(new StringBuilder(23).append("Awaiting completion of ").append(getClass().getName()).toString(), Threads$.MODULE$.newBlockingThread$default$2(), Threads$.MODULE$.newBlockingThread$default$3(), () -> {
            FiniteDuration completionTimeout = this.completionTimeout();
            if (!this.scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire(Integer.MAX_VALUE, completionTimeout.length(), completionTimeout.unit())) {
                throw new TimeoutException(new StringBuilder(40).append("Timed out after ").append(completionTimeout).append(" awaiting completion of ").append(this.getClass().getName()).toString());
            }
        }) : Future$.MODULE$.failed(th);
        return newBlockingThread.flatMap(boxedUnit2 -> {
            return this.whenDone();
        }, Threads$.MODULE$.PiggyBack());
    }

    static /* synthetic */ void $anonfun$onNext$1(AsyncStreamConsumer asyncStreamConsumer, Try r3) {
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$$semaphore().release();
    }

    static void $init$(AsyncStreamConsumer asyncStreamConsumer) {
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$semaphore_$eq(new Semaphore(Integer.MAX_VALUE));
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$error_$eq(new AtomicReference<>());
    }
}
