package xin.manong.stream.framework.receiver;

import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.framework.common.StreamManager;
import xin.manong.stream.framework.processor.ProcessorConfig;
import xin.manong.stream.framework.processor.ProcessorGraph;
import xin.manong.stream.framework.processor.ProcessorGraphFactory;
import xin.manong.stream.sdk.common.UnacceptableException;
import xin.manong.stream.sdk.receiver.ReceiveConverter;
import xin.manong.stream.sdk.receiver.ReceiveProcessor;
import xin.manong.weapon.alarm.Alarm;
import xin.manong.weapon.alarm.AlarmSender;
import xin.manong.weapon.alarm.AlarmStatus;
import xin.manong.weapon.base.common.Context;
import xin.manong.weapon.base.record.KVRecord;
import xin.manong.weapon.base.record.KVRecords;
import xin.manong.weapon.base.util.RandomID;

/* loaded from: input_file:xin/manong/stream/framework/receiver/ReceiveProcessorImpl.class */
public class ReceiveProcessorImpl extends ReceiveProcessor {
    private static final Logger logger = LoggerFactory.getLogger(ReceiveProcessorImpl.class);
    private String name;
    private String appName;
    private List<String> processors;
    private List<ProcessorConfig> processorGraphConfig;
    private Queue<String> processorGraphIds;
    private ThreadLocal<ProcessorGraph> processorGraph;
    private AlarmSender alarmSender;

    public ReceiveProcessorImpl(String str, List<String> list, List<ProcessorConfig> list2, ReceiveConverter receiveConverter) {
        this.name = str;
        this.processors = list;
        this.processorGraphConfig = list2;
        this.converter = receiveConverter;
        this.processorGraphIds = new ConcurrentLinkedQueue();
        this.processorGraph = new ThreadLocal<>();
    }

    public void process(Object obj) throws Throwable {
        Context context = new Context();
        try {
            KVRecords convert = this.converter == null ? (KVRecords) obj : this.converter.convert(context, obj);
            if (convert == null) {
                throw new RuntimeException("convert record failed");
            }
            ProcessorGraph currentThreadProcessorGraph = currentThreadProcessorGraph();
            for (String str : this.processors) {
                for (int i = 0; i < convert.getRecordCount(); i++) {
                    process(str, currentThreadProcessorGraph, convert.getRecord(i).copy());
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            context.put("__STREAM_EXCEPTION_RECEIVER__", this.name);
            context.put("__STREAM_EXCEPTION_STACK__", ExceptionUtils.getStackTrace(e));
            context.put("__STREAM_DEBUG_MESSAGE__", String.format("数据转换异常[%s:%s]", e.getClass().getSimpleName(), e.getMessage()));
            StreamManager.commitLog(context);
        }
    }

    public void sweep() {
        while (!this.processorGraphIds.isEmpty()) {
            String poll = this.processorGraphIds.poll();
            if (!StringUtils.isEmpty(poll)) {
                ProcessorGraphFactory.clean(poll);
            }
        }
    }

    private void handleException(KVRecord kVRecord, Throwable th) {
        if (this.alarmSender != null) {
            AlarmStatus alarmStatus = ((th instanceof UnacceptableException) || (th instanceof Error)) ? AlarmStatus.FATAL : AlarmStatus.ERROR;
            Alarm alarm = new Alarm(String.format(alarmStatus == AlarmStatus.FATAL ? "严重错误发生[%s:%s]" : "链路异常发生[%s:%s]", th.getClass().getSimpleName(), th.getMessage()), alarmStatus);
            alarm.setAppName(this.appName).setTitle("应用异常报警");
            this.alarmSender.submit(alarm);
        }
        kVRecord.put("__STREAM_DEBUG_MESSAGE__", th.getMessage());
        kVRecord.put("__STREAM_EXCEPTION_STACK__", ExceptionUtils.getStackTrace(th));
        logger.error(th.getMessage(), th);
        logger.error("process record exception for receiver[{}]", this.name);
    }

    private void process(String str, ProcessorGraph processorGraph, KVRecord kVRecord) throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Context context = new Context();
        try {
            try {
                try {
                    context.put("__STREAM_TRACE_ID__", RandomID.build());
                    context.put("__STREAM_RECEIVER__", this.name);
                    kVRecord.put("__STREAM_RECEIVER__", this.name);
                    kVRecord.put("__STREAM_START_PROCESS_TIME__", Long.valueOf(currentTimeMillis));
                    KVRecords kVRecords = new KVRecords();
                    kVRecords.addRecord(kVRecord);
                    processorGraph.process(str, kVRecords, context);
                    kVRecord.put("__STREAM_PROCESS_TIME__", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    Iterator it = ((Set) context.get("__STREAM_KEEP_WATCH__")).iterator();
                    while (it.hasNext()) {
                        StreamManager.commitLog((KVRecord) it.next());
                    }
                    context.sweep();
                } finally {
                    handleException(kVRecord, th);
                }
            } catch (UnacceptableException th) {
                throw th;
            } catch (Exception th2) {
                kVRecord.put("__STREAM_PROCESS_TIME__", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                Iterator it2 = ((Set) context.get("__STREAM_KEEP_WATCH__")).iterator();
                while (it2.hasNext()) {
                    StreamManager.commitLog((KVRecord) it2.next());
                }
                context.sweep();
            }
        } catch (Throwable th3) {
            kVRecord.put("__STREAM_PROCESS_TIME__", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            Iterator it3 = ((Set) context.get("__STREAM_KEEP_WATCH__")).iterator();
            while (it3.hasNext()) {
                StreamManager.commitLog((KVRecord) it3.next());
            }
            context.sweep();
            throw th3;
        }
    }

    private ProcessorGraph currentThreadProcessorGraph() throws UnacceptableException {
        ProcessorGraph processorGraph = this.processorGraph.get();
        if (processorGraph != null) {
            return processorGraph;
        }
        ProcessorGraph make = ProcessorGraphFactory.make(this.processorGraphConfig);
        this.processorGraphIds.add(make.getId());
        this.processorGraph.set(make);
        return make;
    }

    public void setAlarmSender(AlarmSender alarmSender) {
        this.alarmSender = alarmSender;
    }

    public void setAppName(String str) {
        this.appName = str;
    }
}
