package com.bixolabs.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.FlowStep;
import cascading.flow.StepCounters;
import cascading.stats.CascadingStats;
import cascading.stats.FlowStats;
import cascading.stats.StepStats;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.FileWriterWithEncoding;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/bixolabs/cascading/FlowMonitor.class */
public class FlowMonitor {
    private static final Logger LOGGER = Logger.getLogger(FlowMonitor.class);
    public static final String FILENAME = "flow-monitor.html";
    public static final int DEFAULT_UPDATE_INTERVAL = 10000;
    public static final int DEFAULT_ROWS_PER_STEP = 10;
    private static final String MONITOR_TOP_HTML = "/monitor-top.html";
    private static final String MONITOR_STEP_HTML = "/monitor-step.html";
    private static final String MONITOR_ROW_HTML = "/monitor-row.html";
    private static final String DEFAULT_HADOOP_LOG_DIR = "/mnt/hadoop/logs/";
    private static final String DEFAULT_LOCAL_LOG_DIR = "./";
    private Flow _flow;
    private Throwable _flowException;
    private int _updateInterval = 10000;
    private int _timeEntriesPerStep = 10;
    private boolean _includeCascadingCounters = false;
    private File _htmlDir = null;
    private List<IMonitorTask> _tasks = new ArrayList();
    private String _htmlTopTemplate = IOUtils.toString(FlowMonitor.class.getResourceAsStream(MONITOR_TOP_HTML));
    private String _htmlStepTemplate = IOUtils.toString(FlowMonitor.class.getResourceAsStream(MONITOR_STEP_HTML));
    private String _htmlRowTemplate = IOUtils.toString(FlowMonitor.class.getResourceAsStream(MONITOR_ROW_HTML));
    private List<StepEntry> _stepEntries = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bixolabs/cascading/FlowMonitor$StepEntry.class */
    public static class StepEntry {
        FlowStep _step;
        StepStats _stats;
        long _startTime;
        long _duration;
        CascadingStats.Status _status = CascadingStats.Status.PENDING;
        List<TimeEntry> _timeEntries = new ArrayList();

        public StepEntry(FlowStep flowStep) {
            this._step = flowStep;
        }

        public FlowStep getStep() {
            return this._step;
        }

        public CascadingStats.Status getStatus() {
            return this._status;
        }

        public void setStatus(CascadingStats.Status status) {
            this._status = status;
        }

        public String getName() {
            return this._step.getStepName();
        }

        public int getId() {
            return this._step.getID();
        }

        public long getStartTime() {
            return this._startTime;
        }

        public void setStartTime(long j) {
            this._startTime = j;
        }

        public long getDuration() {
            return this._duration;
        }

        public void setDuration(long j) {
            this._duration = j;
        }

        public List<TimeEntry> getTimerEntries() {
            return this._timeEntries;
        }

        public void addTimeEntry(TimeEntry timeEntry, int i) {
            this._timeEntries.add(timeEntry);
            while (this._timeEntries.size() > i) {
                this._timeEntries.remove(0);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bixolabs/cascading/FlowMonitor$TimeEntry.class */
    public static class TimeEntry {
        private long _timeDelta;
        private List<String> _counterValues;

        public TimeEntry(long j) {
            this._timeDelta = j;
            this._counterValues = new ArrayList();
        }

        public TimeEntry(long j, List<String> list) {
            this._timeDelta = j;
            this._counterValues = list;
        }

        public long getTimeDelta() {
            return this._timeDelta;
        }

        public List<String> getCounterValues() {
            return this._counterValues;
        }

        public void addCounterValue(String str) {
            this._counterValues.add(str);
        }
    }

    public FlowMonitor(Flow flow) throws IOException {
        this._flow = flow;
        Iterator<FlowStep> it = this._flow.getSteps().iterator();
        while (it.hasNext()) {
            this._stepEntries.add(new StepEntry(it.next()));
        }
    }

    public Flow getFlow() {
        return this._flow;
    }

    public File getHtmlDirectory() {
        return this._htmlDir;
    }

    public void setHtmlDirectory(String str) {
        this._htmlDir = new File(str);
    }

    public int getRowsPerStep() {
        return this._timeEntriesPerStep;
    }

    public void setRowsPerStep(int i) {
        this._timeEntriesPerStep = i;
    }

    public int getUpdateInterval() {
        return this._updateInterval;
    }

    public void setUpdateInterval(int i) {
        this._updateInterval = i;
    }

    public void setIncludeCascadingCounters(boolean z) {
        this._includeCascadingCounters = z;
    }

    public boolean isIncludeCascadingCounters() {
        return this._includeCascadingCounters;
    }

    public void addMonitorTask(IMonitorTask iMonitorTask) {
        this._tasks.add(iMonitorTask);
    }

    public boolean run(Enum... enumArr) throws Throwable {
        FlowStats flowStats;
        if (this._htmlDir == null) {
            this._htmlDir = getDefaultLogDir(this._flow.getJobConf());
        }
        this._flowException = null;
        this._flow.addListener(new FlowListener() { // from class: com.bixolabs.cascading.FlowMonitor.1
            @Override // cascading.flow.FlowListener
            public void onCompleted(Flow flow) {
            }

            @Override // cascading.flow.FlowListener
            public void onStarting(Flow flow) {
            }

            @Override // cascading.flow.FlowListener
            public void onStopping(Flow flow) {
            }

            @Override // cascading.flow.FlowListener
            public boolean onThrowable(Flow flow, Throwable th) {
                FlowMonitor.this._flowException = th;
                return true;
            }
        });
        this._flow.start();
        do {
            Thread.sleep(this._updateInterval);
            flowStats = this._flow.getFlowStats();
            for (StepStats stepStats : flowStats.getStepStats()) {
                StepEntry findStepById = findStepById(((Integer) stepStats.getID()).intValue());
                CascadingStats.Status status = findStepById.getStatus();
                CascadingStats.Status status2 = stepStats.getStatus();
                if (status != status2) {
                    findStepById.setStartTime(stepStats.getStartTime());
                    findStepById.setStatus(stepStats.getStatus());
                }
                if (status == CascadingStats.Status.RUNNING || status2 == CascadingStats.Status.RUNNING) {
                    if (stepStats.isFinished()) {
                        findStepById.setDuration(stepStats.getDuration());
                    } else if (stepStats.isRunning()) {
                        findStepById.setDuration(System.currentTimeMillis() - findStepById.getStartTime());
                    } else {
                        findStepById.setDuration(0L);
                    }
                    findStepById.addTimeEntry(makeTimeEntry(findStepById, stepStats, enumArr), this._timeEntriesPerStep);
                }
            }
            StringBuilder sb = new StringBuilder(this._htmlTopTemplate);
            replace(sb, "%flowname%", StringEscapeUtils.escapeHtml(this._flow.getName()));
            for (StepEntry stepEntry : this._stepEntries) {
                StringBuilder sb2 = new StringBuilder(this._htmlStepTemplate);
                replaceHtml(sb2, "%stepname%", stepEntry.getName());
                replaceHtml(sb2, "%stepstatus%", "" + stepEntry.getStatus());
                replaceHtml(sb2, "%stepstart%", new Date(stepEntry.getStartTime()).toString());
                replaceHtml(sb2, "%stepduration%", "" + (stepEntry.getDuration() / 1000));
                replace(sb2, "%counternames%", getTableHeader(stepEntry.getStep(), enumArr));
                if (stepEntry.getStatus() != CascadingStats.Status.PENDING) {
                    for (TimeEntry timeEntry : stepEntry.getTimerEntries()) {
                        StringBuilder sb3 = new StringBuilder(this._htmlRowTemplate);
                        replaceHtml(sb3, "%timeoffset%", "" + (timeEntry.getTimeDelta() / 1000));
                        replace(sb3, "%countervalues%", getCounterValues(timeEntry.getCounterValues()));
                        insert(sb2, "%steprows%", sb3.toString());
                    }
                }
                replace(sb2, "%steprows%", "");
                insert(sb, "%steps%", sb2.toString());
            }
            replace(sb, "%steps%", "");
            FileWriterWithEncoding fileWriterWithEncoding = new FileWriterWithEncoding(new File(this._htmlDir, FILENAME), "UTF-8");
            IOUtils.write(sb.toString(), (Writer) fileWriterWithEncoding);
            fileWriterWithEncoding.close();
        } while (!flowStats.isFinished());
        File file = new File(this._htmlDir, FILENAME);
        File file2 = new File(this._htmlDir, String.format("%s-%s", this._flow.getName(), FILENAME));
        file2.delete();
        if (!file.exists() || file2.exists()) {
            LOGGER.warn("Unable to create archive of file " + file.getAbsolutePath());
        } else {
            try {
                String iOUtils = IOUtils.toString(new FileReader(file));
                FileWriterWithEncoding fileWriterWithEncoding2 = new FileWriterWithEncoding(file2, "UTF-8");
                IOUtils.write(iOUtils, (Writer) fileWriterWithEncoding2);
                fileWriterWithEncoding2.close();
            } catch (Exception e) {
                LOGGER.warn("Unable to create archive of file " + file.getAbsolutePath(), e);
            }
        }
        if (!flowStats.isFailed() || this._flowException == null) {
            return flowStats.isSuccessful();
        }
        throw this._flowException;
    }

    private StepEntry findStepById(int i) {
        for (StepEntry stepEntry : this._stepEntries) {
            if (stepEntry.getId() == i) {
                return stepEntry;
            }
        }
        throw new RuntimeException("Can't find StepEntry with id " + i);
    }

    private TimeEntry makeTimeEntry(StepEntry stepEntry, StepStats stepStats, Enum... enumArr) {
        FlowStep step = stepEntry.getStep();
        TimeEntry timeEntry = new TimeEntry(stepEntry.getDuration());
        for (Enum r0 : enumArr) {
            timeEntry.addCounterValue("" + StepUtils.safeGetCounter(stepStats, r0));
        }
        Iterator<IMonitorTask> it = this._tasks.iterator();
        while (it.hasNext()) {
            try {
                timeEntry.addCounterValue(it.next().getValue(this._flow, step, stepStats));
            } catch (Throwable th) {
                LOGGER.error("Exception thrown by MonitorTask!", th);
                timeEntry.addCounterValue("<error>");
            }
        }
        if (this._includeCascadingCounters) {
            for (StepCounters stepCounters : StepCounters.values()) {
                timeEntry.addCounterValue("" + StepUtils.safeGetCounter(stepStats, stepCounters));
            }
        }
        return timeEntry;
    }

    private String getTableHeader(FlowStep flowStep, Enum... enumArr) {
        StringBuilder sb = new StringBuilder();
        for (Enum r0 : enumArr) {
            sb.append("<td>");
            sb.append(StringEscapeUtils.escapeHtml(r0.toString()));
            sb.append("</td>");
        }
        for (IMonitorTask iMonitorTask : this._tasks) {
            sb.append("<td>");
            sb.append(StringEscapeUtils.escapeHtml(iMonitorTask.getName(this._flow, flowStep)));
            sb.append("</td>");
        }
        if (this._includeCascadingCounters) {
            for (StepCounters stepCounters : StepCounters.values()) {
                sb.append("<td>");
                sb.append(StringEscapeUtils.escapeHtml(stepCounters.toString()));
                sb.append("</td>");
            }
        }
        return sb.toString();
    }

    private String getCounterValues(List<String> list) {
        StringBuilder sb = new StringBuilder();
        for (String str : list) {
            sb.append("<td>");
            sb.append(StringEscapeUtils.escapeHtml(str));
            sb.append("</td>");
        }
        return sb.toString();
    }

    private void replace(StringBuilder sb, String str, String str2) {
        int indexOf = sb.indexOf(str);
        if (indexOf == -1) {
            throw new RuntimeException("Key doesn't exist in template: " + str);
        }
        sb.delete(indexOf, indexOf + str.length());
        sb.insert(indexOf, str2);
    }

    private void replaceHtml(StringBuilder sb, String str, String str2) {
        replace(sb, str, StringEscapeUtils.escapeHtml(str2));
    }

    private void insert(StringBuilder sb, String str, String str2) {
        int indexOf = sb.indexOf(str);
        if (indexOf == -1) {
            throw new RuntimeException("Key doesn't exist in template: " + str);
        }
        sb.insert(indexOf, str2);
    }

    private File getDefaultLogDir(JobConf jobConf) {
        File file;
        if (isJobLocal(jobConf)) {
            file = new File(DEFAULT_LOCAL_LOG_DIR);
            if (!file.exists()) {
                file.mkdir();
            }
        } else {
            String property = System.getProperty("HADOOP_LOG_DIR");
            if (property == null) {
                property = System.getProperty("hadoop.log.dir");
            }
            if (property == null && System.getProperty("HADOOP_HOME") != null) {
                property = "/logs";
            }
            if (property == null) {
                property = DEFAULT_HADOOP_LOG_DIR;
            }
            LOGGER.info("Setting monitor output directory to: " + property);
            file = new File(property);
        }
        if (file.exists() && file.isDirectory()) {
            return file;
        }
        throw new RuntimeException("Can't find default location for HTML file: " + file);
    }

    private static boolean isJobLocal(JobConf jobConf) {
        return jobConf.get("mapred.job.tracker").equalsIgnoreCase("local");
    }
}
