Large output processing with reduced pitch in Hadoop

During the reduce phase of my MapReduce program, the only operation I perform is to execute each value in the Iterator provided, as shown below:

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

My problem is that some of the decreasing output values ​​are huge lines of text; so large that even with a very large initial size, the string buffer must increase (double) its size several times to accommodate the entire iterator context, causing a memory problem.

Java , . - Hadoop? HDFS ( )? , - , output.collect?

. / . , , /, SringBuilder, .

+3
2

, , , .

OutputFormat, RecordWriter.write(Key, Value) , .

, ( - , - :

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

RecordWriter.write() /:

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

        if (!nullKey) {
            // if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

            // write out the key and separator
            writeObject(key);
            out.write(keyValueSeparator);
        } else if (!nullValue) {
            // write out the value delimiter
            out.write(valueDelimiter);
        }

        // write out the value
        writeObject(value);

        // track that we've written some data
        dataWritten = true;
    }

    public synchronized void close(Reporter reporter) throws IOException {
        // if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

        out.close();
    }

, close , ,

pastebin, :

key1    value1
key2    value1,value2,value3
key3    value1,value2
+3

, , , - inerface -, .
, HDFS.
- : a) b) .

, , . , , .

+2

All Articles