package storm.trident.spout;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;

/* loaded from: input_file:storm/trident/spout/PartitionedTridentSpoutExecutor.class */
public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
    IPartitionedTridentSpout _spout;

    /* loaded from: input_file:storm/trident/spout/PartitionedTridentSpoutExecutor$Coordinator.class */
    class Coordinator implements ITridentSpout.BatchCoordinator<Long> {
        private IPartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map map, TopologyContext topologyContext) {
            this._coordinator = PartitionedTridentSpoutExecutor.this._spout.getCoordinator(map, topologyContext);
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public Long initializeTransaction(long j, Long l) {
            return Long.valueOf(this._coordinator.numPartitions());
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void close() {
            this._coordinator.close();
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void success(long j) {
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public boolean isReady(long j) {
            return this._coordinator.isReady(j);
        }
    }

    /* loaded from: input_file:storm/trident/spout/PartitionedTridentSpoutExecutor$Emitter.class */
    class Emitter implements ITridentSpout.Emitter<Long> {
        private IPartitionedTridentSpout.Emitter _emitter;
        private TransactionalState _state;
        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap();
        private int _index;
        private int _numTasks;

        public Emitter(String str, Map map, TopologyContext topologyContext) {
            this._emitter = PartitionedTridentSpoutExecutor.this._spout.getEmitter(map, topologyContext);
            this._state = TransactionalState.newUserState(map, str);
            this._index = topologyContext.getThisTaskIndex();
            this._numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void emitBatch(final TransactionAttempt transactionAttempt, Long l, final TridentCollector tridentCollector) {
            int i = this._index;
            while (true) {
                final int i2 = i;
                if (i2 >= l.longValue()) {
                    return;
                }
                if (!this._partitionStates.containsKey(Integer.valueOf(i2))) {
                    this._partitionStates.put(Integer.valueOf(i2), new RotatingTransactionalState(this._state, "" + i2));
                }
                Object stateOrCreate = this._partitionStates.get(Integer.valueOf(i2)).getStateOrCreate(transactionAttempt.getTransactionId().longValue(), new RotatingTransactionalState.StateInitializer() { // from class: storm.trident.spout.PartitionedTridentSpoutExecutor.Emitter.1
                    @Override // storm.trident.topology.state.RotatingTransactionalState.StateInitializer
                    public Object init(long j, Object obj) {
                        return Emitter.this._emitter.emitPartitionBatchNew(transactionAttempt, tridentCollector, i2, obj);
                    }
                });
                if (stateOrCreate != null) {
                    this._emitter.emitPartitionBatch(transactionAttempt, tridentCollector, i2, stateOrCreate);
                }
                i = i2 + this._numTasks;
            }
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void success(TransactionAttempt transactionAttempt) {
            Iterator<RotatingTransactionalState> it = this._partitionStates.values().iterator();
            while (it.hasNext()) {
                it.next().cleanupBefore(transactionAttempt.getTransactionId().longValue());
            }
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void close() {
            this._state.close();
            this._emitter.close();
        }
    }

    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout iPartitionedTridentSpout) {
        this._spout = iPartitionedTridentSpout;
    }

    public IPartitionedTridentSpout getPartitionedSpout() {
        return this._spout;
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.BatchCoordinator<Integer> getCoordinator(String str, Map map, TopologyContext topologyContext) {
        return new Coordinator(map, topologyContext);
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.Emitter<Integer> getEmitter(String str, Map map, TopologyContext topologyContext) {
        return new Emitter(str, map, topologyContext);
    }

    @Override // storm.trident.spout.ITridentSpout
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    @Override // storm.trident.spout.ITridentSpout
    public Fields getOutputFields() {
        return this._spout.getOutputFields();
    }
}
