package cascading.pipe.cogroup;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.tuple.Fields;
import cascading.tuple.IndexTuple;
import cascading.tuple.SpillableTupleList;
import cascading.tuple.Tuple;
import java.util.Iterator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:cascading/pipe/cogroup/CoGroupClosure.class */
public class CoGroupClosure extends GroupClosure {
    public static final String SPILL_THRESHOLD = "cascading.cogroup.spill.threshold";
    private static final int defaultThreshold = 10000;
    public static final String SPILL_COMPRESS = "cascading.cogroup.spill.compress";
    public static final String SPILL_CODECS = "cascading.cogroup.spill.codecs";
    private static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec";
    private static final Logger LOG = Logger.getLogger(CoGroupClosure.class);
    SpillableTupleList[] groups;
    private int numSelfJoins;
    private CompressionCodec codec;
    private long threshold;
    private JobConf conf;

    public CoGroupClosure(FlowProcess flowProcess, int i, Fields[] fieldsArr, Fields[] fieldsArr2) {
        super(flowProcess, fieldsArr, fieldsArr2);
        this.numSelfJoins = i;
        this.codec = getCompressionCodec(flowProcess);
        this.threshold = getLong(flowProcess, SPILL_THRESHOLD, 10000L);
        this.conf = ((HadoopFlowProcess) flowProcess).getJobConf();
        initLists(flowProcess);
    }

    @Override // cascading.pipe.cogroup.GroupClosure
    public int size() {
        return this.groups.length;
    }

    @Override // cascading.pipe.cogroup.GroupClosure
    public Iterator<Tuple> getIterator(int i) {
        if (i < 0 || i >= this.groups.length) {
            throw new IllegalArgumentException("invalid group position: " + i);
        }
        return makeIterator(i, this.groups[i].iterator());
    }

    public boolean isEmpty(int i) {
        return this.groups[i].isEmpty();
    }

    @Override // cascading.pipe.cogroup.GroupClosure
    public void reset(Joiner joiner, Tuple tuple, Iterator it) {
        super.reset(joiner, tuple, it);
        build();
    }

    private void build() {
        clearGroups();
        while (this.values.hasNext()) {
            IndexTuple indexTuple = (IndexTuple) this.values.next();
            int index = indexTuple.getIndex();
            if (this.numSelfJoins == 0 && index == 0) {
                this.groups[index].setIterator(indexTuple, this.values);
                return;
            }
            if (this.groups[index].add(indexTuple.getTuple()) && (this.groups[index].getNumFiles() - 1) % 10 == 0) {
                LOG.info("spilled group: " + this.groupingFields[index].printVerbose() + ", on grouping: " + getGrouping().print());
                Runtime runtime = Runtime.getRuntime();
                LOG.info("mem on spill (mb), free: " + ((runtime.freeMemory() / 1024) / 1024) + ", total: " + ((runtime.totalMemory() / 1024) / 1024) + ", max: " + ((runtime.maxMemory() / 1024) / 1024));
            }
        }
    }

    private void clearGroups() {
        for (SpillableTupleList spillableTupleList : this.groups) {
            spillableTupleList.clear();
        }
    }

    private void initLists(FlowProcess flowProcess) {
        int length = this.groupingFields.length;
        this.groups = new SpillableTupleList[Math.max(length, this.numSelfJoins + 1)];
        for (int i = 0; i < length; i++) {
            this.groups[i] = new SpillableTupleList(this.threshold, this.conf, this.codec, flowProcess);
        }
        for (int i2 = 1; i2 < this.numSelfJoins + 1; i2++) {
            this.groups[i2] = this.groups[0];
        }
    }

    private long getLong(FlowProcess flowProcess, String str, long j) {
        String str2 = (String) flowProcess.getProperty(str);
        return (str2 == null || str2.length() == 0) ? j : Long.parseLong(str2);
    }

    public CompressionCodec getCompressionCodec(FlowProcess flowProcess) {
        String str = (String) flowProcess.getProperty(SPILL_COMPRESS);
        if (str != null && !Boolean.parseBoolean(str)) {
            return null;
        }
        String str2 = (String) flowProcess.getProperty(SPILL_CODECS);
        if (str2 == null || str2.length() == 0) {
            str2 = defaultCodecs;
        }
        Class cls = null;
        for (String str3 : str2.split("[,\\s]+")) {
            try {
                LOG.info("attempting to load codec: " + str3);
                cls = Thread.currentThread().getContextClassLoader().loadClass(str3).asSubclass(CompressionCodec.class);
            } catch (ClassNotFoundException e) {
            }
            if (cls != null) {
                LOG.info("found codec: " + str3);
                break;
            }
            continue;
        }
        if (cls != null) {
            return (CompressionCodec) ReflectionUtils.newInstance(cls, ((HadoopFlowProcess) flowProcess).getJobConf());
        }
        LOG.warn("codecs set, but unable to load any: " + str2);
        return null;
    }
}
