package gr.ilsp.fmc.pipes;

import bixo.datum.FetchedDatum;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import gr.ilsp.fmc.datums.ExtendedParsedDatum;
import gr.ilsp.fmc.main.SimpleCrawlHFS;
import gr.ilsp.fmc.parser.SimpleNoLinksParser;
import org.apache.log4j.Logger;

/* loaded from: input_file:gr/ilsp/fmc/pipes/ExtendedParsePipe.class */
public class ExtendedParsePipe extends SubAssembly {
    private static final long serialVersionUID = -5601927304688260557L;
    private static final Logger LOGGER = Logger.getLogger(ExtendedParsePipe.class);
    public static final String PARSE_PIPE_NAME = "parse_pipe";

    /* loaded from: input_file:gr/ilsp/fmc/pipes/ExtendedParsePipe$ExtendedParserCounters.class */
    private enum ExtendedParserCounters {
        PARSER_DOCUMENTS_PARSED,
        PARSER_DOCUMENTS_FAILED,
        PARSER_TIME;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static ExtendedParserCounters[] valuesCustom() {
            ExtendedParserCounters[] valuesCustom = values();
            int length = valuesCustom.length;
            ExtendedParserCounters[] extendedParserCountersArr = new ExtendedParserCounters[length];
            System.arraycopy(valuesCustom, 0, extendedParserCountersArr, 0, length);
            return extendedParserCountersArr;
        }
    }

    /* loaded from: input_file:gr/ilsp/fmc/pipes/ExtendedParsePipe$ParseFunction.class */
    private static class ParseFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private static final long serialVersionUID = -8494284260038088034L;
        private transient LoggingFlowProcess _flowProcess;
        private SimpleNoLinksParser _parser;

        public ParseFunction(SimpleNoLinksParser simpleNoLinksParser) {
            super(ExtendedParsedDatum.FIELDS);
            this._parser = simpleNoLinksParser;
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            super.prepare(flowProcess, operationCall);
            this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
            this._flowProcess.addReporter(new LoggingFlowReporter());
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public boolean isSafe() {
            return false;
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            long currentTimeMillis = System.currentTimeMillis();
            FetchedDatum fetchedDatum = new FetchedDatum(functionCall.getArguments());
            try {
                ExtendedParsedDatum parse = this._parser.parse(fetchedDatum);
                this._flowProcess.increment(ExtendedParserCounters.PARSER_DOCUMENTS_PARSED, 1);
                functionCall.getOutputCollector().add(parse.getTuple());
                SimpleCrawlHFS.incrementPagesVisited();
            } catch (Exception e) {
                ExtendedParsePipe.LOGGER.warn("PARSER_Error processing " + fetchedDatum.getUrl());
                this._flowProcess.increment(ExtendedParserCounters.PARSER_DOCUMENTS_FAILED, 1);
            } finally {
                this._flowProcess.increment(ExtendedParserCounters.PARSER_TIME, (int) (System.currentTimeMillis() - currentTimeMillis));
            }
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            this._flowProcess.dumpCounters();
            super.cleanup(flowProcess, operationCall);
        }
    }

    public ExtendedParsePipe(Pipe pipe) {
        this(pipe, new SimpleNoLinksParser());
    }

    public ExtendedParsePipe(Pipe pipe, SimpleNoLinksParser simpleNoLinksParser) {
        setTails(new Each(new Pipe("parse_pipe", pipe), new ParseFunction(simpleNoLinksParser), Fields.RESULTS));
    }

    public Pipe getTailPipe() {
        String[] tailNames = getTailNames();
        if (tailNames.length != 1) {
            throw new RuntimeException("Unexpected number of tail pipes!");
        }
        if (tailNames[0].equals("parse_pipe")) {
            return getTails()[0];
        }
        throw new RuntimeException("Unexpected name for tail pipe");
    }
}
