package gr.ilsp.fmc.pipes;

import bixo.datum.StatusDatum;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import gr.ilsp.fmc.datums.StatusOutputDatum;
import gr.ilsp.fmc.operations.StatusFilter;
import gr.ilsp.fmc.operations.StatusOutputCounters;
import org.apache.log4j.Logger;

/* loaded from: input_file:gr/ilsp/fmc/pipes/StatusOutputPipe.class */
public class StatusOutputPipe extends SubAssembly {
    private static final long serialVersionUID = -8701420333680102479L;
    private static final Logger LOGGER = Logger.getLogger(StatusOutputPipe.class);
    public static final String STATUSOUTPUT_PIPE_NAME = "status_pipe";

    /* loaded from: input_file:gr/ilsp/fmc/pipes/StatusOutputPipe$StatusFilterFunction.class */
    private static class StatusFilterFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private static final long serialVersionUID = -3868403647589066686L;
        private transient LoggingFlowProcess _flowProcess;
        private StatusFilter _statusFilter;

        public StatusFilterFunction(StatusFilter statusFilter) {
            super(StatusOutputDatum.FIELDS);
            this._statusFilter = statusFilter;
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            super.prepare(flowProcess, operationCall);
            this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
            this._flowProcess.addReporter(new LoggingFlowReporter());
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public boolean isSafe() {
            return false;
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            StatusDatum statusDatum = new StatusDatum(functionCall.getArguments());
            try {
                StatusOutputDatum filter = this._statusFilter.filter(statusDatum);
                this._flowProcess.increment(StatusOutputCounters.STATUS_FILTERED, 1);
                functionCall.getOutputCollector().add(filter.getTuple());
            } catch (Exception e) {
                StatusOutputPipe.LOGGER.warn("Error processing " + statusDatum.getUrl(), e);
                this._flowProcess.increment(StatusOutputCounters.STATUS_FAILED, 1);
            }
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            this._flowProcess.dumpCounters();
            super.cleanup(flowProcess, operationCall);
        }
    }

    public StatusOutputPipe(Pipe pipe) {
        this(pipe, new StatusFilter());
    }

    public StatusOutputPipe(Pipe pipe, StatusFilter statusFilter) {
        setTails(new Each(new Pipe(STATUSOUTPUT_PIPE_NAME, pipe), new StatusFilterFunction(statusFilter), Fields.RESULTS));
    }

    public Pipe getTailPipe() {
        String[] tailNames = getTailNames();
        if (tailNames.length != 1) {
            throw new RuntimeException("Unexpected number of tail pipes!");
        }
        if (tailNames[0].equals(STATUSOUTPUT_PIPE_NAME)) {
            return getTails()[0];
        }
        throw new RuntimeException("Unexpected name for tail pipe");
    }
}
