package storm.trident.testing;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.RegisteredGlobalState;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.TridentTopologyBuilder;

/* loaded from: input_file:storm/trident/testing/FeederBatchSpout.class */
public class FeederBatchSpout implements ITridentSpout, IFeeder {
    Fields _outFields;
    boolean _waitToEmit = true;
    String _id = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
    String _semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());

    /* loaded from: input_file:storm/trident/testing/FeederBatchSpout$FeederCoordinator.class */
    public class FeederCoordinator implements ITridentSpout.BatchCoordinator<Map<Integer, List<List<Object>>>> {
        int _numPartitions;
        int _emittedIndex = 0;
        Map<Long, Integer> txIndices = new HashMap();
        int _masterEmitted = 0;

        public FeederCoordinator(int i) {
            this._numPartitions = i;
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public Map<Integer, List<List<Object>>> initializeTransaction(long j, Map<Integer, List<List<Object>>> map) {
            List list = (List) RegisteredGlobalState.getState(FeederBatchSpout.this._id);
            if (list.size() <= this._emittedIndex) {
                return new HashMap();
            }
            Object obj = list.get(this._emittedIndex);
            this.txIndices.put(Long.valueOf(j), Integer.valueOf(this._emittedIndex));
            this._emittedIndex++;
            if (obj instanceof Map) {
                return (Map) obj;
            }
            List list2 = (List) obj;
            HashMap hashMap = new HashMap();
            for (int i = 0; i < this._numPartitions; i++) {
                hashMap.put(Integer.valueOf(i), new ArrayList());
            }
            for (int i2 = 0; i2 < list2.size(); i2++) {
                ((List) hashMap.get(Integer.valueOf(i2 % this._numPartitions))).add((List) list2.get(i2));
            }
            return hashMap;
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void close() {
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void success(long j) {
            Integer num = this.txIndices.get(Long.valueOf(j));
            if (num != null) {
                ((Semaphore) ((List) RegisteredGlobalState.getState(FeederBatchSpout.this._semaphoreId)).get(num.intValue())).release();
            }
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public boolean isReady(long j) {
            if (!FeederBatchSpout.this._waitToEmit) {
                return true;
            }
            if (((List) RegisteredGlobalState.getState(FeederBatchSpout.this._id)).size() > this._masterEmitted) {
                this._masterEmitted++;
                return true;
            }
            Utils.sleep(2L);
            return false;
        }
    }

    /* loaded from: input_file:storm/trident/testing/FeederBatchSpout$FeederEmitter.class */
    public class FeederEmitter implements ITridentSpout.Emitter<Map<Integer, List<List<Object>>>> {
        int _index;

        public FeederEmitter(int i) {
            this._index = i;
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void emitBatch(TransactionAttempt transactionAttempt, Map<Integer, List<List<Object>>> map, TridentCollector tridentCollector) {
            List<List<Object>> list = map.get(Integer.valueOf(this._index));
            if (list != null) {
                Iterator<List<Object>> it = list.iterator();
                while (it.hasNext()) {
                    tridentCollector.emit(it.next());
                }
            }
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void success(TransactionAttempt transactionAttempt) {
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void close() {
        }
    }

    public FeederBatchSpout(List<String> list) {
        this._outFields = new Fields(list);
    }

    public void setWaitToEmit(boolean z) {
        this._waitToEmit = z;
    }

    @Override // storm.trident.testing.IFeeder
    public void feed(Object obj) {
        Semaphore semaphore = new Semaphore(0);
        ((List) RegisteredGlobalState.getState(this._semaphoreId)).add(semaphore);
        ((List) RegisteredGlobalState.getState(this._id)).add(obj);
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // storm.trident.spout.ITridentSpout
    public Map getComponentConfiguration() {
        return null;
    }

    @Override // storm.trident.spout.ITridentSpout
    public Fields getOutputFields() {
        return this._outFields;
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.BatchCoordinator getCoordinator(String str, Map map, TopologyContext topologyContext) {
        return new FeederCoordinator(topologyContext.getComponentTasks(TridentTopologyBuilder.spoutIdFromCoordinatorId(topologyContext.getThisComponentId())).size());
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.Emitter getEmitter(String str, Map map, TopologyContext topologyContext) {
        return new FeederEmitter(topologyContext.getThisTaskIndex());
    }
}
