package co.paralleluniverse.pulsar.async;

import co.paralleluniverse.fibers.DefaultFiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.StrandFactory;
import co.paralleluniverse.strands.SuspendableRunnable;
import co.paralleluniverse.strands.SuspendableUtils;
import co.paralleluniverse.strands.Timeout;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.SendPort;
import co.paralleluniverse.strands.channels.Topic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:co/paralleluniverse/pulsar/async/ParallelTopic.class */
public class ParallelTopic<Message> extends Topic<Message> {
    private static final boolean stagedDefault = true;
    private static final StrandFactory strandFactoryDefault = DefaultFiberScheduler.getInstance();
    private final StrandFactory strandFactory;
    private final Channel<Message> internalChannel;
    private final Collection<SendPort<? super Message>> subscribersToBeLeftOpen;
    private final AtomicReference<Throwable> closingException;

    private ParallelTopic(Channel<Message> channel, StrandFactory strandFactory, boolean z) {
        this.closingException = new AtomicReference<>();
        this.subscribersToBeLeftOpen = new CopyOnWriteArraySet();
        this.internalChannel = channel;
        this.strandFactory = strandFactory;
        startDistributionLoop(z);
    }

    public ParallelTopic(int i, Channels.OverflowPolicy overflowPolicy, StrandFactory strandFactory, boolean z) {
        this(Channels.newChannel(i, overflowPolicy), strandFactory, z);
    }

    public ParallelTopic(int i, Channels.OverflowPolicy overflowPolicy, StrandFactory strandFactory) {
        this(i, overflowPolicy, strandFactory, true);
    }

    public ParallelTopic(int i, Channels.OverflowPolicy overflowPolicy, boolean z) {
        this(i, overflowPolicy, strandFactoryDefault);
    }

    public ParallelTopic(int i, Channels.OverflowPolicy overflowPolicy) {
        this(i, overflowPolicy, true);
    }

    public ParallelTopic(int i, boolean z) {
        this(Channels.newChannel(i), strandFactoryDefault, z);
    }

    public ParallelTopic(int i) {
        this(Channels.newChannel(i), strandFactoryDefault, true);
    }

    public <T extends SendPort<? super Message>> T subscribe(T t, boolean z) {
        if (z) {
            return (T) super.subscribe(t);
        }
        this.subscribersToBeLeftOpen.add(t);
        getSubscribers().add(t);
        return t;
    }

    public void unsubscribe(SendPort<? super Message> sendPort) {
        super.unsubscribe(sendPort);
        this.subscribersToBeLeftOpen.remove(sendPort);
    }

    public void unsubscribeAll() {
        super.unsubscribeAll();
        this.subscribersToBeLeftOpen.clear();
    }

    public void send(Message message) throws SuspendExecution, InterruptedException {
        this.internalChannel.send(message);
    }

    public boolean send(Message message, Timeout timeout) throws SuspendExecution, InterruptedException {
        return this.internalChannel.send(message, timeout);
    }

    public boolean send(Message message, long j, TimeUnit timeUnit) throws SuspendExecution, InterruptedException {
        return this.internalChannel.send(message, j, timeUnit);
    }

    public boolean trySend(Message message) {
        return this.internalChannel.trySend(message);
    }

    public void close() {
        this.sendClosed = true;
        this.internalChannel.close();
    }

    public void close(Throwable th) {
        this.closingException.set(th);
        close();
    }

    @Suspendable
    private Strand startDistributionLoop(final boolean z) {
        final ArrayList arrayList = new ArrayList();
        return this.strandFactory.newStrand(SuspendableUtils.runnableToCallable(new SuspendableRunnable() { // from class: co.paralleluniverse.pulsar.async.ParallelTopic.1
            public void run() throws SuspendExecution, InterruptedException {
                while (!ParallelTopic.this.sendClosed) {
                    try {
                        final Object receive = ParallelTopic.this.internalChannel.receive();
                        if (receive != null) {
                            if (z) {
                                arrayList.clear();
                            }
                            for (final SendPort sendPort : ParallelTopic.this.getSubscribers()) {
                                Strand start = ParallelTopic.this.strandFactory.newStrand(SuspendableUtils.runnableToCallable(new SuspendableRunnable() { // from class: co.paralleluniverse.pulsar.async.ParallelTopic.1.1
                                    public void run() throws SuspendExecution, InterruptedException {
                                        try {
                                            sendPort.send(receive);
                                        } catch (Throwable th) {
                                            th.printStackTrace();
                                            throw new RuntimeException(th);
                                        }
                                    }
                                })).start();
                                if (z) {
                                    arrayList.add(start);
                                }
                            }
                            if (z) {
                                Iterator it = arrayList.iterator();
                                while (it.hasNext()) {
                                    ((Strand) it.next()).join();
                                }
                            }
                        }
                    } catch (ExecutionException e) {
                        throw new AssertionError(e);
                    }
                }
                for (SendPort sendPort2 : ParallelTopic.this.getSubscribers()) {
                    if (!ParallelTopic.this.subscribersToBeLeftOpen.contains(sendPort2)) {
                        if (ParallelTopic.this.closingException.get() != null) {
                            sendPort2.close((Throwable) ParallelTopic.this.closingException.get());
                        } else {
                            sendPort2.close();
                        }
                    }
                }
            }
        })).start();
    }
}
