package gr.ilsp.fmc.pipes;

import bixo.datum.UrlDatum;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.cogroup.RightJoin;
import cascading.tuple.Fields;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import gr.ilsp.fmc.classifier.Classifier;
import gr.ilsp.fmc.classifier.ClassifierCounters;
import gr.ilsp.fmc.datums.ClassifierDatum;
import gr.ilsp.fmc.datums.ExtendedParsedDatum;
import gr.ilsp.fmc.main.SimpleCrawlHFS;
import gr.ilsp.fmc.parser.ScoreLinks;
import org.apache.log4j.Logger;

/* loaded from: input_file:gr/ilsp/fmc/pipes/ClassifierPipe.class */
public class ClassifierPipe extends SubAssembly {
    private static final long serialVersionUID = 916057049645756562L;
    private static final Logger LOGGER = Logger.getLogger(ClassifierPipe.class);
    public static final String CLASSIFIER_PIPE_NAME = "classifier_pipe";
    public static final String SCORED_LINKS_PIPE_NAME = "scored_links_pipe";

    /* loaded from: input_file:gr/ilsp/fmc/pipes/ClassifierPipe$ClassifyFunction.class */
    private static class ClassifyFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private static final long serialVersionUID = -6664107549904861854L;
        private transient LoggingFlowProcess _flowProcess;
        private Classifier _classifier;

        public ClassifyFunction(Classifier classifier) {
            super(ClassifierDatum.FIELDS);
            this._classifier = classifier;
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            super.prepare(flowProcess, operationCall);
            this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
            this._flowProcess.addReporter(new LoggingFlowReporter());
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public boolean isSafe() {
            return false;
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            long currentTimeMillis = System.currentTimeMillis();
            ExtendedParsedDatum extendedParsedDatum = new ExtendedParsedDatum(functionCall.getArguments());
            try {
                ClassifierDatum classify = this._classifier.classify(extendedParsedDatum);
                if (classify == null) {
                    this._flowProcess.increment(ClassifierCounters.CLASSIFIER_DOCUMENTS_FAILED, 1);
                    SimpleCrawlHFS.incrementPagesCutByClassifier();
                } else {
                    this._flowProcess.increment(ClassifierCounters.CLASSIFIER_DOCUMENTS_PASSED, 1);
                    functionCall.getOutputCollector().add(classify.getTuple());
                    SimpleCrawlHFS.incrementPagesStored();
                    SimpleCrawlHFS.incrementTokensStored(classify.getLengthInTok());
                }
            } catch (Exception e) {
                ClassifierPipe.LOGGER.info("CLASSIFIER_Error processing " + extendedParsedDatum.getUrl(), e);
                this._flowProcess.increment(ClassifierCounters.CLASSIFIER_DOCUMENTS_ABORTED, 1);
            } finally {
                this._flowProcess.increment(ClassifierCounters.CLASSIFIER_TIME, (int) (System.currentTimeMillis() - currentTimeMillis));
            }
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            this._flowProcess.dumpCounters();
            super.cleanup(flowProcess, operationCall);
        }
    }

    public ClassifierPipe(Pipe pipe, Classifier classifier) {
        GroupBy groupBy = new GroupBy(new Each(new Pipe(CLASSIFIER_PIPE_NAME, pipe), new ClassifyFunction(classifier), Fields.RESULTS), new Fields(UrlDatum.URL_FN));
        setTails(groupBy, new Each(new CoGroup(SCORED_LINKS_PIPE_NAME, groupBy, new Fields(UrlDatum.URL_FN), pipe, new Fields(UrlDatum.URL_FN), new Fields("ClassifierDatum-subclasses", "ClassifierDatum-subscores", "ClassifierDatum-totabscore", "ClassifierDatum-totrelscore", "ClassifierDatum-lengthintok", "url1", "payload1", "ExtendedParsedDatum-hostAddress", "ExtendedParsedDatum-parsedText", "ExtendedParsedDatum-language", "ExtendedParsedDatum-title", "ExtendedParsedDatum-outLinks", "ExtendedParsedDatum-parsedMeta", "UrlDatum-url", "PayloadDatum-payload"), new RightJoin()), new ScoreLinks(classifier)));
    }

    public Pipe getTailPipe() {
        String[] tailNames = getTailNames();
        if (tailNames.length != 1) {
            throw new RuntimeException("Unexpected number of tail pipes!");
        }
        if (tailNames[0].equals(CLASSIFIER_PIPE_NAME)) {
            return getTails()[0];
        }
        throw new RuntimeException("Unexpected name for tail pipe");
    }

    public Pipe getClassifierTailPipe() {
        return getTailPipe(CLASSIFIER_PIPE_NAME);
    }

    public Pipe getScoredLinksTailPipe() {
        return getTailPipe(SCORED_LINKS_PIPE_NAME);
    }

    public Pipe getTailPipe(String str) {
        return str.equals(CLASSIFIER_PIPE_NAME) ? getTails()[0] : getTails()[1];
    }
}
