package groovyx.gpars.actor.impl;

import groovy.time.Duration;
import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.ActorMessage;
import groovyx.gpars.remote.RemoteConnection;
import groovyx.gpars.remote.RemoteHost;
import groovyx.gpars.serial.RemoteSerialized;
import groovyx.gpars.serial.SerialMsg;
import groovyx.gpars.serial.WithSerialId;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:WEB-INF/lib/gpars-1.2.1.jar:groovyx/gpars/actor/impl/MessageStream.class */
public abstract class MessageStream extends WithSerialId {
    private static final long serialVersionUID = 7644465423857532477L;

    /* loaded from: input_file:WEB-INF/lib/gpars-1.2.1.jar:groovyx/gpars/actor/impl/MessageStream$RemoteMessageStream.class */
    public static class RemoteMessageStream extends MessageStream implements RemoteSerialized {
        private static final long serialVersionUID = 3936054469565089659L;
        private final RemoteHost remoteHost;

        public RemoteMessageStream(RemoteHost remoteHost) {
            this.remoteHost = remoteHost;
        }

        @Override // groovyx.gpars.actor.impl.MessageStream
        public MessageStream send(Object obj) {
            if (!(obj instanceof ActorMessage)) {
                obj = new ActorMessage(obj, Actor.threadBoundActor());
            }
            this.remoteHost.write(new SendTo(this, (ActorMessage) obj));
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gpars-1.2.1.jar:groovyx/gpars/actor/impl/MessageStream$ResultWaiter.class */
    public static class ResultWaiter<V> extends MessageStream {
        private static final long serialVersionUID = 6512046150496489148L;
        private volatile Object value;
        private volatile boolean isSet;

        private ResultWaiter() {
            this.value = Thread.currentThread();
        }

        @Override // groovyx.gpars.actor.impl.MessageStream
        public MessageStream send(Object obj) {
            if (this.isSet) {
                throw new IllegalStateException("A reply has already been sent. The originator does not expect more than one reply.");
            }
            Thread thread = (Thread) this.value;
            if (obj instanceof ActorMessage) {
                this.value = ((ActorMessage) obj).getPayLoad();
            } else {
                this.value = obj;
            }
            this.isSet = true;
            LockSupport.unpark(thread);
            return this;
        }

        public V getResult() throws InterruptedException {
            while (!this.isSet) {
                LockSupport.park();
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
            }
            rethrowException();
            return (V) this.value;
        }

        public Object getResult(long j, TimeUnit timeUnit) throws InterruptedException {
            long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
            while (!this.isSet) {
                long nanoTime2 = nanoTime - System.nanoTime();
                if (nanoTime2 <= 0) {
                    return null;
                }
                LockSupport.parkNanos(nanoTime2);
                MessageStream.reInterrupt();
            }
            rethrowException();
            return this.value;
        }

        private void rethrowException() {
            if (this.value instanceof Throwable) {
                if (!(this.value instanceof RuntimeException)) {
                    throw new RuntimeException((Throwable) this.value);
                }
                throw ((RuntimeException) this.value);
            }
        }

        public void onDeliveryError(Object obj) {
            send(new IllegalStateException("Delivery error. Maybe target actor is not active"));
        }
    }

    /* loaded from: input_file:WEB-INF/lib/gpars-1.2.1.jar:groovyx/gpars/actor/impl/MessageStream$SendTo.class */
    public static class SendTo extends SerialMsg {
        private static final long serialVersionUID = 1989120447646342520L;
        private final MessageStream to;
        private final ActorMessage message;

        public SendTo(MessageStream messageStream, ActorMessage actorMessage) {
            this.to = messageStream;
            this.message = actorMessage;
        }

        public MessageStream getTo() {
            return this.to;
        }

        public ActorMessage getMessage() {
            return this.message;
        }

        @Override // groovyx.gpars.serial.SerialMsg
        public void execute(RemoteConnection remoteConnection) {
            this.to.send(this.message);
        }
    }

    public abstract MessageStream send(Object obj);

    public final MessageStream send() {
        return send(new Object());
    }

    public final <T> MessageStream send(T t, MessageStream messageStream) {
        return send(new ActorMessage(t, messageStream));
    }

    public final <T> MessageStream leftShift(T t) {
        return send(t);
    }

    public final <T> MessageStream call(T t) {
        return send(t);
    }

    public final <T, V> V sendAndWait(T t) throws InterruptedException {
        ResultWaiter resultWaiter = new ResultWaiter();
        send(new ActorMessage(t, resultWaiter));
        return (V) resultWaiter.getResult();
    }

    public final <T> Object sendAndWait(T t, long j, TimeUnit timeUnit) throws InterruptedException {
        ResultWaiter resultWaiter = new ResultWaiter();
        send(new ActorMessage(t, resultWaiter));
        return resultWaiter.getResult(j, timeUnit);
    }

    public final <T> Object sendAndWait(T t, Duration duration) throws InterruptedException {
        return sendAndWait(t, duration.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Override // groovyx.gpars.serial.WithSerialId
    public Class<RemoteMessageStream> getRemoteClass() {
        return RemoteMessageStream.class;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void reInterrupt() throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException();
        }
    }
}
