代码运行良好,但输出不符合预期。我的代码是:

public class Test { 
 
 public static class MapReduceMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { 
 
    public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output) throws IOException, InterruptedException { 
        Scanner scanner = new Scanner(value.toString()); 
        String row; 
        String[] pre; 
        int[] tokens; 
        while (scanner.hasNext()) { 
            row = scanner.nextLine(); 
            pre = row.split("\\t"); 
            tokens = new int[pre.length]; 
 
            for(int i=0; i<pre.length;i++) { 
                tokens[i] = Integer.parseInt(pre[i]); 
            } 
 
                output.collect(new IntWritable(tokens[0]), new IntWritable(tokens[1])); 
 
        } 
    } 
 }  
 
 public static class MapReduceReducer extends Reducer<IntWritable, IntWritable, Text, NullWritable> { 
 
 NullWritable NULL = NullWritable.get(); 
 
    public void reduce(IntWritable key, Iterable<IntWritable> values, OutputCollector<Text, NullWritable> output)  
      throws IOException, InterruptedException { 
 
        for (IntWritable val : values) { 
            int a = val.get(); 
                count++; 
        } 
 
        String keyValue = key.get() + ": "; 
        output.collect(new Text(keyValue + "Mean = " + (sum / count)), NULL); 
        output.collect(new Text(keyValue + "Count = " + count), NULL); 
 
 
    } 
 } 
 
 public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    FileSystem fs = FileSystem.get(conf); 
 
    Job job = new Job(conf, "mapreduce"); 
 
    job.setJarByClass(test.class); 
 
    job.setOutputKeyClass(LongWritable.class); 
    job.setOutputValueClass(Text.class); 
 
    job.setMapperClass(MapReduceMapper.class); 
    job.setReducerClass(MapReduceReducer.class); 
 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 
 
    FileInputFormat.addInputPath(job, new Path(""));  
    String outputFile = "/home/kevmccar/mapreduce/output/";   
    Path outPath = new Path(outputFile); 
    fs.delete(outPath, true); 
    FileOutputFormat.setOutputPath(job, new Path(outputFile));  
 
    job.waitForCompletion(true); 
 } 
 
} 

我正在使用的输入文件的每一行都有一个键和一个值,例如:

1    1029109 
5    289182 
6    547849 
1    389283 

我希望输出的格式为

1: Average = 12312 
1: Count = 6564  

但它看起来像

5244    8       121602 
5253    10      663603 
5263    2       32288 
5271    6       221095 
5280    10      350834 
5290    2       245710 
5299    1       318947 
5308    9       440945 
5317    4       638909 
... 

为什么输出是这样的?

请您参考如下方法:

我将 OutputCollector 更改为 Context,只是因为我更熟悉 Context 并且不太确定 OutputCollector 的工作原理。然而,这一更改似乎起到了作用,因此我假设使用 OutputCollector 声明键和值数据类型的方式存在问题。这是我的代码供引用:

public class HelloWorld { 
 
   public static class MapReduceMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { 
 
 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
        Scanner scanner = new Scanner(value.toString()); 
        String row; 
        String[] pre; 
        int[] tokens; 
        while (scanner.hasNext()) { 
          row = scanner.nextLine(); 
          pre = row.split("\\t"); 
          tokens = new int[pre.length]; 
 
          for(int i=0; i<pre.length;i++) { 
            tokens[i] = Integer.parseInt(pre[i]); 
          } 
          System.err.println("MapKey: " + tokens[0] + "MapValue: " + tokens[1]); 
          context.write(new IntWritable(tokens[0]), new IntWritable(tokens[1])); 
 
        } 
        scanner.close(); 
     } 
  }  
 
 public static class MapReduceReducer extends Reducer<IntWritable, IntWritable, Text, NullWritable> { 
 
   NullWritable NULL = NullWritable.get(); 
 
   public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
    int sum = 0; 
    int count = 0; 
    int max = Integer.MIN_VALUE; 
    int min = Integer.MAX_VALUE; 
    for (IntWritable val : values) { 
        int a = val.get(); 
            if (a > max) { 
                max = a; } 
            else if (a < min) { 
                a = min; 
                        } 
            sum += a; 
            count++; 
    } 
 
    String keyValue = "Product "  + key.get() + ": "; 
    context.write(new Text(keyValue + "Mean = " + (sum / count)), NULL); 
    context.write(new Text(keyValue + "Count = " + count), NULL); 
    context.write(new Text(keyValue + "Min = " + min), NULL); 
    context.write(new Text(keyValue + "Max = " + max), NULL); 
 
   } 
 } 
 
 public static void main(String[] args) throws Exception { 
   Configuration conf = new Configuration(); 
   FileSystem fs = FileSystem.get(conf); 
 
   Job job = new Job(conf, "mapreduce"); 
 
   job.setJarByClass(HelloWorld.class); 
 
   job.setMapOutputKeyClass(IntWritable.class); 
   job.setMapOutputValueClass(IntWritable.class); 
   job.setOutputKeyClass(Text.class); 
   job.setOutputValueClass(NullWritable.class); 
 
   job.setMapperClass(MapReduceMapper.class); 
   job.setReducerClass(MapReduceReducer.class); 
 
   job.setInputFormatClass(TextInputFormat.class); 
   job.setOutputFormatClass(TextOutputFormat.class); 
 
   FileInputFormat.addInputPath(job, new Path(args[0]));  
   String outputFile = args[1];   
   Path outPath = new Path(outputFile); 
   fs.delete(outPath, true); 
   FileOutputFormat.setOutputPath(job, new Path(outputFile));  
 
   job.waitForCompletion(true); 
 } 
 
} 

示例输入:

1   1029109 
5   289182 
6   547849 

示例输出:

Product 5: Mean = 289182 
Product 5: Count = 1 
Product 5: Min = 2147483647 
Product 5: Max = 289182 


评论关闭
IT源码网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!