package scuff.concurrent;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import scala.Function1;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import scala.util.control.NonFatal$;
import scuff.StreamConsumer;

/* compiled from: AsyncStreamConsumer.scala */
@ScalaSignature(bytes = "\u0006\u0001I4\u0001\"\u0003\u0006\u0011\u0002\u0007\u0005q\"\u001a\u0005\u0006]\u0001!\ta\f\u0005\u0006g\u00011\t\u0002\u000e\u0005\u0006w\u00011\t\u0002\u0010\u0005\b{\u0001\u0011\r\u0015\"\u0003?\u0011\u001dA\u0005A1Q\u0005\n%CQ\u0001\u0018\u0001\u0005\u0002uCQ\u0001\u0019\u0001\u0005\u0002\u0005DQ\u0001\u001a\u0001\u0005\u0002q\u00121#Q:z]\u000e\u001cFO]3b[\u000e{gn];nKJT!a\u0003\u0007\u0002\u0015\r|gnY;se\u0016tGOC\u0001\u000e\u0003\u0015\u00198-\u001e4g\u0007\u0001)2\u0001E\u000f-'\r\u0001\u0011c\u0006\t\u0003%Ui\u0011a\u0005\u0006\u0002)\u0005)1oY1mC&\u0011ac\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\taI2DJ\u0007\u0002\u0019%\u0011!\u0004\u0004\u0002\u000f'R\u0014X-Y7D_:\u001cX/\\3s!\taR\u0004\u0004\u0001\u0005\ry\u0001\u0001R1\u0001 \u0005\u0005!\u0016C\u0001\u0011$!\t\u0011\u0012%\u0003\u0002#'\t9aj\u001c;iS:<\u0007C\u0001\n%\u0013\t)3CA\u0002B]f\u00042aJ\u0015,\u001b\u0005A#BA\u0006\u0014\u0013\tQ\u0003F\u0001\u0004GkR,(/\u001a\t\u000391\"a!\f\u0001\u0005\u0006\u0004y\"!\u0001*\u0002\r\u0011Jg.\u001b;%)\u0005\u0001\u0004C\u0001\n2\u0013\t\u00114C\u0001\u0003V]&$\u0018!E2p[BdW\r^5p]RKW.Z8viV\tQ\u0007\u0005\u00027s5\tqG\u0003\u00029Q\u0005AA-\u001e:bi&|g.\u0003\u0002;o\tqa)\u001b8ji\u0016$UO]1uS>t\u0017\u0001C<iK:$uN\\3\u0015\u0003\u0019\n\u0011b]3nCBDwN]3\u0016\u0003}\u0002\"\u0001\u0011$\u000e\u0003\u0005S!a\u0003\"\u000b\u0005\r#\u0015\u0001B;uS2T\u0011!R\u0001\u0005U\u00064\u0018-\u0003\u0002H\u0003\nI1+Z7ba\"|'/Z\u0001\u0006KJ\u0014xN]\u000b\u0002\u0015B\u00191J\u0014)\u000e\u00031S!!T!\u0002\r\u0005$x.\\5d\u0013\tyEJA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\t\t\u0016L\u0004\u0002S/:\u00111KV\u0007\u0002)*\u0011QKD\u0001\u0007yI|w\u000e\u001e \n\u0003QI!\u0001W\n\u0002\u000fA\f7m[1hK&\u0011!l\u0017\u0002\n)\"\u0014xn^1cY\u0016T!\u0001W\n\u0002\r=tg*\u001a=u)\t\u0001d\fC\u0003`\r\u0001\u00071$A\u0001u\u0003\u001dyg.\u0012:s_J$\"\u0001\r2\t\u000b\r<\u0001\u0019\u0001)\u0002\u0005QD\u0017AB8o\t>tWME\u0002gQ*4Aa\u001a\u0001\u0001K\naAH]3gS:,W.\u001a8u}A!\u0011\u000eA\u000e,\u001b\u0005Q\u0001\u0003\u0002\nl75L!\u0001\\\n\u0003\u0013\u0019+hn\u0019;j_:\f\u0004G\u00018q!\r9\u0013f\u001c\t\u00039A$\u0011\"\u001d\u0001\u0002\u0002\u0003\u0005)\u0011A\u0010\u0003\u0007}#\u0013\u0007")
/* loaded from: input_file:scuff/concurrent/AsyncStreamConsumer.class */
public interface AsyncStreamConsumer<T, R> extends StreamConsumer<T, Future<R>> {
    void scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$semaphore_$eq(Semaphore semaphore);

    void scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$error_$eq(AtomicReference<Throwable> atomicReference);

    FiniteDuration completionTimeout();

    Future<R> whenDone();

    Semaphore scuff$concurrent$AsyncStreamConsumer$$semaphore();

    AtomicReference<Throwable> scuff$concurrent$AsyncStreamConsumer$$error();

    @Override // scuff.StreamConsumer
    default void onNext(T t) {
        Future failed;
        try {
            failed = (Future) ((Function1) this).apply(t);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            failed = Future$.MODULE$.failed((Throwable) unapply.get());
        }
        Future future = failed;
        if (!future.isCompleted()) {
            scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire();
            future.onComplete(r4 -> {
                $anonfun$onNext$1(this, r4);
                return BoxedUnit.UNIT;
            }, Threads$.MODULE$.PiggyBack());
        }
        future.failed().foreach(th2 -> {
            this.onError(th2);
            return BoxedUnit.UNIT;
        }, Threads$.MODULE$.PiggyBack());
    }

    @Override // scuff.StreamConsumer
    default void onError(Throwable th) {
        scuff$concurrent$AsyncStreamConsumer$$error().weakCompareAndSet(null, th);
    }

    @Override // scuff.StreamConsumer
    default Future<R> onDone() {
        Future<R> failed;
        boolean z = false;
        Throwable th = scuff$concurrent$AsyncStreamConsumer$$error().get();
        if (th == null) {
            z = true;
            if (scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire(Integer.MAX_VALUE)) {
                failed = whenDone();
                return failed;
            }
        }
        if (z) {
            String function1 = ((Function1) this).toString();
            String className$1 = toClassName$1(toClassName$default$1$1());
            failed = Threads$.MODULE$.onBlockingThread(new StringBuilder(25).append("Awaiting completion of ").append(className$1).append(": ").append(function1).toString(), Threads$.MODULE$.onBlockingThread$default$2(), Threads$.MODULE$.onBlockingThread$default$3(), () -> {
                FiniteDuration completionTimeout = this.completionTimeout();
                if (!this.scuff$concurrent$AsyncStreamConsumer$$semaphore().tryAcquire(Integer.MAX_VALUE, completionTimeout.length(), completionTimeout.unit())) {
                    throw new TimeoutException(new StringBuilder(146).append("Stream consumption in `").append(className$1).append("` is still not finished, ").append(completionTimeout).append(" after stream completion, possibly due to either incomplete stream or incomplete state. Instance: ").append(function1).toString());
                }
            }).flatMap(boxedUnit -> {
                return this.whenDone();
            }, Threads$.MODULE$.PiggyBack());
        } else {
            failed = Future$.MODULE$.failed(th);
        }
        return failed;
    }

    static /* synthetic */ void $anonfun$onNext$1(AsyncStreamConsumer asyncStreamConsumer, Try r3) {
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$$semaphore().release();
    }

    private default String toClassName$1(Class cls) {
        while (cls.getName().contains("$anon$") && cls.getEnclosingClass() != null) {
            cls = cls.getEnclosingClass();
        }
        return cls.getName();
    }

    private default Class toClassName$default$1$1() {
        return getClass();
    }

    static void $init$(AsyncStreamConsumer asyncStreamConsumer) {
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$semaphore_$eq(new Semaphore(Integer.MAX_VALUE));
        asyncStreamConsumer.scuff$concurrent$AsyncStreamConsumer$_setter_$scuff$concurrent$AsyncStreamConsumer$$error_$eq(new AtomicReference<>());
    }
}
