package voldemort.store.readonly.mr;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.utils.ByteUtils;

/* loaded from: input_file:voldemort/store/readonly/mr/HadoopStoreBuilderReducer.class */
public class HadoopStoreBuilderReducer extends AbstractStoreBuilderConfigurable implements Reducer<BytesWritable, BytesWritable, Text, Text> {
    private static final Logger logger = Logger.getLogger(HadoopStoreBuilderReducer.class);
    private DataOutputStream indexFileStream = null;
    private DataOutputStream valueFileStream = null;
    private int position = 0;
    private String taskId = null;
    private int numChunks = -1;
    private int nodeId = -1;
    private int chunkId = -1;
    private Path taskIndexFileName;
    private Path taskValueFileName;
    private String outputDir;
    private JobConf conf;
    private CheckSum.CheckSumType checkSumType;
    private CheckSum checkSumDigestIndex;
    private CheckSum checkSumDigestValue;

    public void reduce(BytesWritable bytesWritable, Iterator<BytesWritable> it, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
        BytesWritable next = it.next();
        byte[] bArr = next.get();
        if (this.nodeId == -1) {
            this.nodeId = ByteUtils.readInt(bArr, 0);
        }
        if (this.chunkId == -1) {
            this.chunkId = ReadOnlyUtils.chunk(bytesWritable.get(), this.numChunks);
        }
        this.indexFileStream.write(bytesWritable.get(), 0, bytesWritable.getSize());
        this.indexFileStream.writeInt(this.position);
        if (this.checkSumDigestIndex != null) {
            this.checkSumDigestIndex.update(bytesWritable.get(), 0, bytesWritable.getSize());
            this.checkSumDigestIndex.update(this.position);
        }
        int size = next.getSize() - 4;
        this.valueFileStream.writeInt(size);
        this.valueFileStream.write(bArr, 4, size);
        if (this.checkSumDigestValue != null) {
            this.checkSumDigestValue.update(size);
            this.checkSumDigestValue.update(bArr, 4, size);
        }
        this.position += 4 + size;
        if (this.position < 0) {
            throw new VoldemortException("Chunk overflow exception: chunk " + this.chunkId + " has exceeded 2147483647 bytes.");
        }
        if (it.hasNext()) {
            throw new VoldemortException("Duplicate keys detected for md5 sum " + ByteUtils.toHexString(ByteUtils.copy(bytesWritable.get(), 0, bytesWritable.getSize())));
        }
    }

    @Override // voldemort.store.readonly.mr.AbstractStoreBuilderConfigurable
    public void configure(JobConf jobConf) {
        super.configure(jobConf);
        try {
            this.conf = jobConf;
            this.position = 0;
            this.numChunks = jobConf.getInt("num.chunks", -1);
            this.outputDir = jobConf.get("final.output.dir");
            this.taskId = jobConf.get("mapred.task.id");
            this.checkSumType = CheckSum.fromString(jobConf.get("checksum.type"));
            this.checkSumDigestIndex = CheckSum.getInstance(this.checkSumType);
            this.checkSumDigestValue = CheckSum.getInstance(this.checkSumType);
            this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(jobConf), getStoreName() + "." + this.taskId + ".index");
            this.taskValueFileName = new Path(FileOutputFormat.getOutputPath(jobConf), getStoreName() + "." + this.taskId + ".data");
            jobConf.getInt("store.output.replication.factor", 2);
            logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName + " for writing.");
            FileSystem fileSystem = this.taskIndexFileName.getFileSystem(jobConf);
            this.indexFileStream = fileSystem.create(this.taskIndexFileName);
            this.valueFileStream = fileSystem.create(this.taskValueFileName);
        } catch (IOException e) {
            throw new RuntimeException("Failed to open Input/OutputStream", e);
        }
    }

    @Override // voldemort.store.readonly.mr.AbstractStoreBuilderConfigurable
    public void close() throws IOException {
        this.indexFileStream.close();
        this.valueFileStream.close();
        Path path = new Path(this.outputDir, "node-" + this.nodeId);
        Path path2 = new Path(path, this.chunkId + ".index");
        Path path3 = new Path(path, this.chunkId + ".data");
        FileSystem fileSystem = path2.getFileSystem(this.conf);
        fileSystem.mkdirs(path);
        if (this.checkSumType != CheckSum.CheckSumType.NONE) {
            if (this.checkSumDigestIndex == null || this.checkSumDigestValue == null) {
                throw new VoldemortException("Failed to open CheckSum digest");
            }
            Path path4 = new Path(path, this.chunkId + ".index.checksum");
            Path path5 = new Path(path, this.chunkId + ".data.checksum");
            FSDataOutputStream create = fileSystem.create(path4);
            create.write(this.checkSumDigestIndex.getCheckSum());
            create.close();
            FSDataOutputStream create2 = fileSystem.create(path5);
            create2.write(this.checkSumDigestValue.getCheckSum());
            create2.close();
        }
        logger.info("Moving " + this.taskIndexFileName + " to " + path2 + ".");
        fileSystem.rename(this.taskIndexFileName, path2);
        logger.info("Moving " + this.taskValueFileName + " to " + path3 + ".");
        fileSystem.rename(this.taskValueFileName, path3);
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
        reduce((BytesWritable) obj, (Iterator<BytesWritable>) it, (OutputCollector<Text, Text>) outputCollector, reporter);
    }
}
