package backtype.storm.task;

import backtype.storm.drpc.PrepareRequest;
import backtype.storm.generated.ShellComponent;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;

/* loaded from: input_file:backtype/storm/task/ShellBolt.class */
public class ShellBolt implements IBolt {
    public static Logger LOG = Logger.getLogger(ShellBolt.class);
    Process _subprocess;
    OutputCollector _collector;
    Map<String, Tuple> _inputs;
    private String[] _command;
    private ShellProcess _process;
    private volatile boolean _running;
    private volatile Throwable _exception;
    private LinkedBlockingQueue _pendingWrites;
    private Random _rand;
    private Thread _readerThread;
    private Thread _writerThread;

    public ShellBolt(ShellComponent shellComponent) {
        this(shellComponent.get_execution_command(), shellComponent.get_script());
    }

    public ShellBolt(String... strArr) {
        this._inputs = new ConcurrentHashMap();
        this._running = true;
        this._pendingWrites = new LinkedBlockingQueue();
        this._command = strArr;
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._rand = new Random();
        this._process = new ShellProcess(this._command);
        this._collector = outputCollector;
        try {
            LOG.info("Launched subprocess with pid " + this._process.launch(map, topologyContext));
            this._readerThread = new Thread(new Runnable() { // from class: backtype.storm.task.ShellBolt.1
                @Override // java.lang.Runnable
                public void run() {
                    while (ShellBolt.this._running) {
                        try {
                            JSONObject readMessage = ShellBolt.this._process.readMessage();
                            if (readMessage == null) {
                            }
                            String str = (String) readMessage.get("command");
                            if (str.equals("ack")) {
                                ShellBolt.this.handleAck(readMessage);
                            } else if (str.equals("fail")) {
                                ShellBolt.this.handleFail(readMessage);
                            } else if (str.equals("error")) {
                                ShellBolt.this.handleError(readMessage);
                            } else if (str.equals("log")) {
                                ShellBolt.LOG.info("Shell msg: " + ((String) readMessage.get("msg")));
                            } else if (str.equals("emit")) {
                                ShellBolt.this.handleEmit(readMessage);
                            }
                        } catch (InterruptedException e) {
                        } catch (Throwable th) {
                            ShellBolt.this.die(th);
                        }
                    }
                }
            });
            this._readerThread.start();
            this._writerThread = new Thread(new Runnable() { // from class: backtype.storm.task.ShellBolt.2
                @Override // java.lang.Runnable
                public void run() {
                    while (ShellBolt.this._running) {
                        try {
                            Object poll = ShellBolt.this._pendingWrites.poll(1L, TimeUnit.SECONDS);
                            if (poll != null) {
                                ShellBolt.this._process.writeMessage(poll);
                            }
                        } catch (InterruptedException e) {
                        } catch (Throwable th) {
                            ShellBolt.this.die(th);
                        }
                    }
                }
            });
            this._writerThread.start();
        } catch (IOException e) {
            throw new RuntimeException("Error when launching multilang subprocess\n" + this._process.getErrorsString(), e);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (this._exception != null) {
            throw new RuntimeException(this._exception);
        }
        String l = Long.toString(this._rand.nextLong());
        this._inputs.put(l, tuple);
        try {
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(PrepareRequest.ID_STREAM, l);
            jSONObject.put("comp", tuple.getSourceComponent());
            jSONObject.put("stream", tuple.getSourceStreamId());
            jSONObject.put("task", Integer.valueOf(tuple.getSourceTask()));
            jSONObject.put("tuple", tuple.getValues());
            this._pendingWrites.put(jSONObject);
        } catch (InterruptedException e) {
            throw new RuntimeException("Error during multilang processing", e);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this._running = false;
        this._process.destroy();
        this._inputs.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAck(Map map) {
        String str = (String) map.get(PrepareRequest.ID_STREAM);
        Tuple remove = this._inputs.remove(str);
        if (remove == null) {
            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + str);
        }
        this._collector.ack(remove);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFail(Map map) {
        String str = (String) map.get(PrepareRequest.ID_STREAM);
        Tuple remove = this._inputs.remove(str);
        if (remove == null) {
            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + str);
        }
        this._collector.fail(remove);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Map map) {
        this._collector.reportError(new Exception("Shell Process Exception: " + ((String) map.get("msg"))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEmit(Map map) throws InterruptedException {
        String str = (String) map.get("stream");
        if (str == null) {
            str = "default";
        }
        Long l = (Long) map.get("task");
        List<Object> list = (List) map.get("tuple");
        ArrayList arrayList = new ArrayList();
        Object obj = map.get("anchors");
        if (obj != null) {
            if (obj instanceof String) {
                obj = Arrays.asList(obj);
            }
            for (Object obj2 : (List) obj) {
                Tuple tuple = this._inputs.get((String) obj2);
                if (tuple == null) {
                    throw new RuntimeException("Anchored onto " + obj2 + " after ack/fail");
                }
                arrayList.add(tuple);
            }
        }
        if (l != null) {
            this._collector.emitDirect((int) l.longValue(), str, arrayList, list);
            return;
        }
        List<Integer> emit = this._collector.emit(str, arrayList, list);
        Object obj3 = map.get("need_task_ids");
        if (obj3 == null || ((Boolean) obj3).booleanValue()) {
            this._pendingWrites.put(emit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(Throwable th) {
        this._exception = th;
    }
}
