package bixo.operations;

import bixo.config.UserAgent;
import bixo.datum.GroupedUrlDatum;
import bixo.datum.ScoredUrlDatum;
import bixo.fetcher.BaseFetcher;
import bixo.hadoop.FetchCounters;
import bixo.robots.BaseRobotsParser;
import bixo.robots.RobotUtils;
import bixo.utils.DiskQueue;
import bixo.utils.GroupingKey;
import bixo.utils.ThreadedExecutor;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.TupleEntry;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import org.apache.log4j.Logger;

/* loaded from: input_file:bixo/operations/FilterAndScoreByUrlAndRobots.class */
public class FilterAndScoreByUrlAndRobots extends BaseOperation<NullContext> implements Buffer<NullContext> {
    private static final Logger LOGGER = Logger.getLogger(FilterAndScoreByUrlAndRobots.class);
    private static final long COMMAND_TIMEOUT = RobotUtils.getMaxFetchTime();
    private static final long TERMINATE_TIMEOUT = COMMAND_TIMEOUT;
    private static final int MAX_URLS_IN_MEMORY = 100;
    private BaseScoreGenerator _scorer;
    private BaseFetcher _fetcher;
    private BaseRobotsParser _parser;
    private transient ThreadedExecutor _executor;
    private transient LoggingFlowProcess _flowProcess;

    public FilterAndScoreByUrlAndRobots(UserAgent userAgent, int i, BaseRobotsParser baseRobotsParser, BaseScoreGenerator baseScoreGenerator) {
        super(ScoredUrlDatum.FIELDS);
        this._scorer = baseScoreGenerator;
        this._parser = baseRobotsParser;
        this._fetcher = RobotUtils.createFetcher(userAgent, i);
    }

    public FilterAndScoreByUrlAndRobots(BaseFetcher baseFetcher, BaseRobotsParser baseRobotsParser, BaseScoreGenerator baseScoreGenerator) {
        super(ScoredUrlDatum.FIELDS);
        this._scorer = baseScoreGenerator;
        this._parser = baseRobotsParser;
        this._fetcher = baseFetcher;
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        this._executor = new ThreadedExecutor(this._fetcher.getMaxThreads(), COMMAND_TIMEOUT);
        this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
        this._flowProcess.addReporter(new LoggingFlowReporter());
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        try {
            if (!this._executor.terminate(TERMINATE_TIMEOUT)) {
                LOGGER.warn("Had to do a hard shutdown of robots fetching");
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for termination");
            Thread.currentThread().interrupt();
        }
        this._flowProcess.dumpCounters();
    }

    @Override // cascading.operation.Buffer
    public void operate(FlowProcess flowProcess, BufferCall<NullContext> bufferCall) {
        String string = bufferCall.getGroup().getString(0);
        DiskQueue diskQueue = new DiskQueue(100);
        Iterator<TupleEntry> argumentsIterator = bufferCall.getArgumentsIterator();
        while (argumentsIterator.hasNext()) {
            diskQueue.add(new GroupedUrlDatum(new TupleEntry(argumentsIterator.next())));
        }
        try {
            this._executor.execute(new ProcessRobotsTask(string, this._scorer, diskQueue, this._fetcher, this._parser, bufferCall.getOutputCollector(), this._flowProcess));
        } catch (RejectedExecutionException e) {
            LOGGER.error("Robots handling pool rejected our request for " + string);
            this._flowProcess.increment(FetchCounters.DOMAINS_REJECTED, 1);
            this._flowProcess.increment(FetchCounters.URLS_REJECTED, diskQueue.size());
            ProcessRobotsTask.emptyQueue(diskQueue, GroupingKey.DEFERRED_GROUPING_KEY, bufferCall.getOutputCollector());
        } catch (Throwable th) {
            LOGGER.error("Caught an unexpected throwable - robots handling rejected our request for " + string, th);
            this._flowProcess.increment(FetchCounters.DOMAINS_REJECTED, 1);
            this._flowProcess.increment(FetchCounters.URLS_REJECTED, diskQueue.size());
            ProcessRobotsTask.emptyQueue(diskQueue, GroupingKey.DEFERRED_GROUPING_KEY, bufferCall.getOutputCollector());
        }
    }
}
