Find the average of numbers using MapReduce

I am trying to write code to find the average of numbers using MapReduce.

I am trying to use global counters to achieve my goal, but I cannot set the counter value in mapmy Mapper method , and I also cannot get the counter value in reducemy reducer method .

Should I use a global counter in mapanyway (e.g. using the incrCounter(key, amount)provided Reporter)? Or would you suggest some other logic to get the average number of numbers?

+5
source share
4 answers

The logic is quite simple: If the whole number has the same key , then the sender sends all the values ​​that you want to find on average for the same key. Because of this, in a reducer you can sum values ​​in an iterator. You can then save the counter for the duration of the iterator that solves the problem of how many items will be averaged. Finally, after the iterator, you can find the average value by dividing the sum by the number of elements.

Be careful, this logic will not work if the combiner class is set to the same class as the gearbox ...

+5
source

Use all 3 Mapper / Combiner / Reducer to solve the problem. See link below for full code and explanation.

http://alchemistviews.blogspot.com/2013/08/calculate-average-in-map-reduce-using.html

+1

/. - sum = k1 + k2 + k3 +..., . , k1/size + k2/size + k3/size +...

Java 8 :

    public double average(List<Valuable> list) {
      final int size = list.size();
      return list
            .stream()
            .mapToDouble(element->element.someValue())
            .reduce(0,(sum,x)->sum+x/size);
    }

, , , .

+1

- , , . Han et al. , :

[...] [...] . , [..] n . , n . , n , , , ( ), .

, , . Han et al., :

[...] m ( m - ), .

avg = sum/count. , . . API org.apache.hadoop.mapreduce.Counter :

, /.

, .

, , , - (, ); <sum><separator><count>.

In the cartographer, the score will always be 1, and the amount will be by itself. To reduce map files, you can use a combiner and process aggregates of the type (sum_1 + ... + sum_n, count_1 + ... + count_n). This must be repeated in the gearbox and completed with the final calculation / calculation amount. Keep in mind that this approach is independent of the key used!

Finally, here is a simple example that uses LAPD statistics to calculate the β€œaverage crime time” in Los Angeles:

public class Driver extends Configured implements Tool {
    enum Counters {
        DISCARDED_ENTRY
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Driver(), args);
    }

    public int run(String[] args) throws Exception {
        Configuration configuration = getConf();

        Job job = Job.getInstance(configuration);
        job.setJarByClass(Driver.class);

        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : -1;
    }
}

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void map(
        LongWritable key,
        Text value,
        org.apache.hadoop.mapreduce.Mapper<
            LongWritable,
            Text,
            LongWritable,
            Text
        >.Context context
    ) throws IOException, InterruptedException {
            // parse the CSV line
            ArrayList<String> values = this.parse(value.toString());

            // validate the parsed values
            if (this.isValid(values)) {

                // fetch the third and the fourth column
                String time = values.get(3);
                String year = values.get(2)
                    .substring(values.get(2).length() - 4);

                // convert time to minutes (e.g. 1542 -> 942)
                int minutes = Integer.parseInt(time.substring(0, 2))
                    * 60 + Integer.parseInt(time.substring(2,4));

                // create the aggregate atom (a/n)
                // with a = time in minutes and n = 1
                context.write(
                    new LongWritable(Integer.parseInt(year)),
                    new Text(Integer.toString(minutes) + ":1")
                );
            } else {
                // invalid line format, so we increment a counter
                context.getCounter(Driver.Counters.DISCARDED_ENTRY)
                    .increment(1);
            }
    }

    protected boolean isValid(ArrayList<String> values) {
        return values.size() > 3 
            && values.get(2).length() == 10 
            && values.get(3).length() == 4;
    }

    protected ArrayList<String> parse(String line) {
        ArrayList<String> values = new ArrayList<>();
        String current = "";
        boolean escaping = false;

        for (int i = 0; i < line.length(); i++){
            char c = line.charAt(i);

            if (c == '"') {
                escaping = !escaping;
            } else if (c == ',' && !escaping) {
                values.add(current);
                current = "";
            } else {
                current += c;
            }
        }

        values.add(current);

        return values;
    }
}

public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key,
        Iterable<Text> values,
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate intermediate aggregates
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
    }
}

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key, 
        Iterable<Text> values, 
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate the finale aggregate
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        // cut of seconds
        int average = Math.round(a / n);

        // convert the average minutes back to time
        context.write(
            key,
            new Text(
                Integer.toString(average / 60) 
                    + ":" + Integer.toString(average % 60)
            )
        );
    }
}
0
source

All Articles