package xin.manong.stream.framework.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.weapon.base.common.Context;
import xin.manong.weapon.base.log.JSONLogger;
import xin.manong.weapon.base.record.KVRecord;

/* loaded from: input_file:xin/manong/stream/framework/common/StreamManager.class */
public class StreamManager {
    private static final String STREAM_LOGGER_FILE = "./logs/stream.log";
    private static JSONLogger streamLogger;
    private static final Logger logger = LoggerFactory.getLogger(StreamManager.class);
    private static final Set<String> BASE_LOGGER_KEYS = buildBaseLoggerKeys();

    public static void buildStreamLogger(String str, List<String> list) {
        HashSet hashSet = new HashSet(list);
        hashSet.addAll(BASE_LOGGER_KEYS);
        streamLogger = new JSONLogger(StringUtils.isEmpty(str) ? STREAM_LOGGER_FILE : str, hashSet);
    }

    public static void commitLog(KVRecord kVRecord) {
        HashMap hashMap = new HashMap(kVRecord.getFieldMap());
        hashMap.put("__STREAM_RECORD_ID__", kVRecord.getId());
        hashMap.put("__STREAM_RECORD_TYPE__", kVRecord.getRecordType().name());
        if (streamLogger != null) {
            streamLogger.commit(hashMap);
        } else {
            logger.warn("stream logger not init");
        }
    }

    public static void commitLog(Context context) {
        if (streamLogger != null) {
            streamLogger.commit(context.getFeatureMap());
        } else {
            logger.warn("stream logger not init");
        }
    }

    public static void keepWatchRecord(KVRecord kVRecord, Context context) {
        Set hashSet = context.contains("__STREAM_KEEP_WATCH__") ? (Set) context.get("__STREAM_KEEP_WATCH__") : new HashSet();
        if (hashSet.contains(kVRecord)) {
            return;
        }
        String str = (String) context.get("__STREAM_TRACE_ID__");
        String str2 = (String) context.get("__STREAM_PROCESSOR__");
        String str3 = (String) context.get("__STREAM_RECEIVER__");
        if (!StringUtils.isEmpty(str)) {
            kVRecord.put("__STREAM_TRACE_ID__", str);
        }
        if (!StringUtils.isEmpty(str2)) {
            kVRecord.put("__STREAM_BIRTH_PROCESSOR__", str2);
        }
        if (!StringUtils.isEmpty(str3) && !kVRecord.has("__STREAM_RECEIVER__")) {
            kVRecord.put("__STREAM_RECEIVER__", str3);
        }
        appendStreamHistory(kVRecord, context);
        hashSet.add(kVRecord);
        if (context.contains("__STREAM_KEEP_WATCH__")) {
            return;
        }
        context.put("__STREAM_KEEP_WATCH__", hashSet);
    }

    public static void removeStreamHistory(KVRecord kVRecord) {
        if (kVRecord == null || kVRecord.isEmpty()) {
            return;
        }
        Iterator it = kVRecord.getFieldMap().entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            if (!str.equals("__STREAM_HISTORY__") && str.startsWith("__STREAM_")) {
                it.remove();
            }
        }
    }

    private static void appendStreamHistory(KVRecord kVRecord, Context context) {
        String str = (String) context.get("__STREAM_RECEIVER__");
        if (StringUtils.isEmpty(str)) {
            logger.warn("receiver name is not found from context");
            return;
        }
        String str2 = kVRecord.has("__STREAM_TRACE_ID__") ? (String) kVRecord.get("__STREAM_TRACE_ID__") : null;
        String str3 = kVRecord.has("__STREAM_BIRTH_PROCESSOR__") ? (String) kVRecord.get("__STREAM_BIRTH_PROCESSOR__") : null;
        if (StringUtils.isEmpty(str2) && StringUtils.isEmpty(str3)) {
            logger.warn("missing fields[{}] and [{}]", "__STREAM_TRACE_ID__", "__STREAM_BIRTH_PROCESSOR__");
            return;
        }
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("__STREAM_RECEIVER__", str);
        if (!StringUtils.isEmpty(str2)) {
            jSONObject.put("__STREAM_TRACE_ID__", str2);
        }
        if (!StringUtils.isEmpty(str3)) {
            jSONObject.put("__STREAM_BIRTH_PROCESSOR__", str3);
        }
        JSONArray streamHistory = getStreamHistory(kVRecord);
        streamHistory.add(jSONObject);
        kVRecord.put("__STREAM_HISTORY__", streamHistory);
    }

    private static JSONArray getStreamHistory(KVRecord kVRecord) {
        try {
            Object jSONArray = kVRecord.has("__STREAM_HISTORY__") ? kVRecord.get("__STREAM_HISTORY__") : new JSONArray();
            if (jSONArray instanceof String) {
                jSONArray = JSON.parseArray((String) jSONArray, new Feature[0]);
            }
            if (jSONArray instanceof JSONArray) {
                return (JSONArray) jSONArray;
            }
            throw new Exception(String.format("invalid type[%s] for %s", jSONArray.getClass().getName(), "__STREAM_HISTORY__"));
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
            return new JSONArray();
        }
    }

    private static Set<String> buildBaseLoggerKeys() {
        HashSet hashSet = new HashSet();
        hashSet.add("__STREAM_DEBUG_MESSAGE__");
        hashSet.add("__STREAM_EXCEPTION_PROCESSOR__");
        hashSet.add("__STREAM_PROCESS_TIME__");
        hashSet.add("__STREAM_RECEIVER__");
        hashSet.add("__STREAM_PROCESS_TRACE__");
        hashSet.add("__STREAM_PROCESSOR_TIME__");
        hashSet.add("__STREAM_START_PROCESS_TIME__");
        hashSet.add("__STREAM_EXCEPTION_RECEIVER__");
        hashSet.add("__STREAM_BIRTH_PROCESSOR__");
        hashSet.add("__STREAM_RECORD_TYPE__");
        hashSet.add("__STREAM_RECORD_ID__");
        hashSet.add("__STREAM_TRACE_ID__");
        hashSet.add("__STREAM_MESSAGE_ID__");
        hashSet.add("__STREAM_MESSAGE_KEY__");
        hashSet.add("__STREAM_MESSAGE_TOPIC__");
        hashSet.add("__STREAM_MESSAGE_TAG__");
        hashSet.add("__STREAM_MESSAGE_PARTITION__");
        hashSet.add("__STREAM_MESSAGE_OFFSET__");
        hashSet.add("__STREAM_MESSAGE_TIMESTAMP__");
        return hashSet;
    }
}
