package bixo.pipes;

import bixo.datum.FetchedDatum;
import bixo.datum.ParsedDatum;
import bixo.parser.BaseParser;
import bixo.parser.ParserCounters;
import bixo.parser.SimpleParser;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import org.apache.log4j.Logger;

/* loaded from: input_file:bixo/pipes/ParsePipe.class */
public class ParsePipe extends SubAssembly {
    private static final Logger LOGGER = Logger.getLogger(ParsePipe.class);
    public static final String PARSE_PIPE_NAME = "parse_pipe";

    /* loaded from: input_file:bixo/pipes/ParsePipe$ParseFunction.class */
    private static class ParseFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private transient LoggingFlowProcess _flowProcess;
        private BaseParser _parser;

        public ParseFunction(BaseParser baseParser) {
            super(ParsedDatum.FIELDS);
            this._parser = baseParser;
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            super.prepare(flowProcess, operationCall);
            this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
            this._flowProcess.addReporter(new LoggingFlowReporter());
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public boolean isSafe() {
            return false;
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            FetchedDatum fetchedDatum = new FetchedDatum(functionCall.getArguments());
            try {
                ParsedDatum parse = this._parser.parse(fetchedDatum);
                this._flowProcess.increment(ParserCounters.DOCUMENTS_PARSED, 1);
                functionCall.getOutputCollector().add(parse.getTuple());
            } catch (Exception e) {
                ParsePipe.LOGGER.warn("Error processing " + fetchedDatum.getUrl(), e);
                this._flowProcess.increment(ParserCounters.DOCUMENTS_FAILED, 1);
            }
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            this._flowProcess.dumpCounters();
            super.cleanup(flowProcess, operationCall);
        }
    }

    public ParsePipe(Pipe pipe) {
        this(pipe, new SimpleParser());
    }

    public ParsePipe(Pipe pipe, BaseParser baseParser) {
        setTails(new Each(new Pipe("parse_pipe", pipe), new ParseFunction(baseParser), Fields.RESULTS));
    }

    public Pipe getTailPipe() {
        String[] tailNames = getTailNames();
        if (tailNames.length != 1) {
            throw new RuntimeException("Unexpected number of tail pipes!");
        }
        if (tailNames[0].equals("parse_pipe")) {
            return getTails()[0];
        }
        throw new RuntimeException("Unexpected name for tail pipe");
    }
}
