package bixo.operations;

import bixo.config.FetcherPolicy;
import bixo.config.UserAgent;
import bixo.fetcher.BaseFetcher;
import bixo.fetcher.SimpleHttpFetcher;
import bixo.hadoop.FetchCounters;
import bixo.utils.ThreadedExecutor;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.fa.PersianAnalyzer;

/* loaded from: input_file:bixo/operations/UrlLengthener.class */
public class UrlLengthener extends BaseOperation<NullContext> implements Function<NullContext> {
    private static final int MAX_CONTENT_SIZE = 2048;
    private static final int MAX_REDIRECTS = 1;
    private static final int REDIRECT_CONNECTION_TIMEOUT = 20000;
    private static final int REDIRECT_SOCKET_TIMEOUT = 10000;
    private static final int REDIRECT_RETRY_COUNT = 1;
    private static final long COMMAND_TIMEOUT = 30000;
    private static final long TERMINATE_TIMEOUT = 60000;
    private BaseFetcher _fetcher;
    private int _maxThreads;
    private Set<String> _urlShorteners;
    private transient LoggingFlowProcess _flowProcess;
    private transient TupleEntryCollector _collector;
    private transient ThreadedExecutor _executor;
    private static final Logger LOGGER = Logger.getLogger(UrlLengthener.class);
    private static final Pattern HOSTNAME_PATTERN = Pattern.compile("^http://([^/:?]{3,})");
    public static final String URL_FN = "url";
    private static final Fields DEFAULT_FIELD = new Fields(URL_FN);

    public static BaseFetcher makeFetcher(int i, UserAgent userAgent) {
        FetcherPolicy fetcherPolicy = new FetcherPolicy();
        fetcherPolicy.setRedirectMode(FetcherPolicy.RedirectMode.FOLLOW_NONE);
        fetcherPolicy.setMaxRedirects(1);
        fetcherPolicy.setMaxConnectionsPerHost(i);
        SimpleHttpFetcher simpleHttpFetcher = new SimpleHttpFetcher(i, fetcherPolicy, userAgent);
        simpleHttpFetcher.setDefaultMaxContentSize(2048);
        simpleHttpFetcher.setAcceptEncoding("");
        return simpleHttpFetcher;
    }

    public UrlLengthener(BaseFetcher baseFetcher) throws IOException {
        this(baseFetcher, DEFAULT_FIELD);
    }

    public UrlLengthener(BaseFetcher baseFetcher, Fields fields) throws IOException {
        super(fields);
        if (fields.size() != 1) {
            throw new IllegalArgumentException("resultField must contain a single field");
        }
        this._fetcher = baseFetcher;
        this._maxThreads = baseFetcher.getMaxThreads();
        this._urlShorteners = loadUrlShorteners();
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        super.prepare(flowProcess, operationCall);
        this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
        this._flowProcess.addReporter(new LoggingFlowReporter());
        this._executor = new ThreadedExecutor(this._maxThreads, 30000L);
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        try {
            if (!this._executor.terminate(60000L)) {
                LOGGER.warn("Had to do a hard shutdown of robots fetching");
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for termination");
            Thread.currentThread().interrupt();
        }
        this._flowProcess.dumpCounters();
        super.cleanup(flowProcess, operationCall);
    }

    @Override // cascading.operation.Function
    public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
        this._collector = functionCall.getOutputCollector();
        String string = functionCall.getArguments().getTuple().getString(0);
        Matcher matcher = HOSTNAME_PATTERN.matcher(string);
        if (!matcher.find()) {
            emitTuple(string);
            return;
        }
        if (!this._urlShorteners.contains(matcher.group(1))) {
            emitTuple(string);
            return;
        }
        try {
            this._executor.execute(new ResolveRedirectsTask(string, this._fetcher, this._collector));
        } catch (RejectedExecutionException e) {
            LOGGER.error("Redirection handling pool rejected our request for " + string);
            this._flowProcess.increment(FetchCounters.URLS_REJECTED, 1);
            emitTuple(string);
        } catch (Throwable th) {
            LOGGER.error("Caught an unexpected throwable - redirection code rejected our request for " + string, th);
            this._flowProcess.increment(FetchCounters.URLS_REJECTED, 1);
            emitTuple(string);
        }
    }

    public static Set<String> loadUrlShorteners() throws IOException {
        HashSet hashSet = new HashSet();
        Iterator<String> it = IOUtils.readLines(UrlLengthener.class.getResourceAsStream("/url-shorteners.txt"), "UTF-8").iterator();
        while (it.hasNext()) {
            String trim = it.next().trim();
            if (trim.length() != 0 && !trim.startsWith(PersianAnalyzer.STOPWORDS_COMMENT)) {
                int indexOf = trim.indexOf(35);
                if (indexOf != -1) {
                    trim = trim.substring(0, indexOf).trim();
                }
                hashSet.add(trim);
            }
        }
        return hashSet;
    }

    private void emitTuple(String str) {
        synchronized (this._collector) {
            this._collector.add(new Tuple(str));
        }
    }
}
