hadoop mapreduce api calls WordCount native and cluster code

Run code natively

package com.example.hadoop.api.mr;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    /**
     * Text: refers to StringWritable
     * (LongWritable, Text) Input on the map side: these two parameters will never change, Text: text data, LongWritable: offset (offset when data is divided)
     *
     * (Text, IntWritable) output on the map side: always changing according to needs
     */
    public static class MapTask extends Mapper<LongWritable,Text, Text, IntWritable>{
        /**
         * Each time a row of data is read, this method is executed once
         * Sample data
         *hadoop,hadoop,spark,spark,spark,
         * hive,hadoop,spark,spark,spark,
         * spark,hadoop,hive,spark,spark,
         * @param key offset
         * @param value text data
         * @param context output data (hadoop,1) (spark,1)
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(",");
            for (String word:words){
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }

    /**
     * The output of reduce map is the input of reduce
     */
    public static class ReduceTask extends Reducer<Text,IntWritable,Text,IntWritable>{
        /**
         * Each time the key is operated, the method is executed once
         * @param key
         * @param values
         * @param context
         * @throwsIOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
           int count = 0;
           for(IntWritable value:values){
               count + + ;
           }
           context.write(key,new IntWritable(count));
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //Local test mode, job object submits tasks
        Job job = Job.getInstance();

        //Submit our two inner classes
        job.setMapperClass(MapTask.class);
        job.setReducerClass(ReduceTask.class);

        //Submit the type of the output parameter. Note that only the output parameter type is required.
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job,new Path("mr/wordcount.txt"));
        FileOutputFormat.setOutputPath(job,new Path("mr/outwordCount"));

        Boolean b = job.waitForCompletion(true);
        System.out.println(b?"Success":"If failed, please find the bug");
    }
}

An error is reported after running the local idea

Click winutils.exe in the D:\hadoop-2.9.2\bin directory of this machine and report an error that msvcr100.dll cannot be found, indicating that the C++ operating environment is missing. msvcr100.dll corresponds to the 2010C++ operating environment. My computer is X64. You can choose the version of your computer to download and install it directly.

https://www.microsoft.com/en-us/download/details.aspx?id=26999

The installation is complete. To be on the safe side, you can copy C:\Windows\System32\msvcr100.dll to D:\hadoop-2.9.2\bin under the hadoop installation directory bin.

Run the WordCount.java main method again, and the error is as follows

Now the hadoop.dll file is missing, so download this file separately.

https://github.com/steveloughran/winutils

Choose a version that is similar to your own, download it, and copy it to the hadoop installation directory.

Restart the computer and run successfully

Statistical results after running

cluster code

package com.example.hadoop.api.mr;

import com.example.hadoop.util.SystemUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @author wangmeiyan
 * @Date 2023/11/02 17:10:00
 * Cluster mapReduce
 */
public class WordCountColony {
    /**
     * Text: refers to StringWritable
     * (LongWritable, Text) Input on the map side: these two parameters will never change, Text: text data, LongWritable: offset (offset when data is divided)
     *
     * (Text, IntWritable) output on the map side: always changing according to needs
     */
    public static class MapTask extends Mapper<LongWritable, Text, Text, IntWritable> {
        /**
         * Each time a row of data is read, this method is executed once
         * Sample data
         *hadoop,hadoop,spark,spark,spark,
         * hive,hadoop,spark,spark,spark,
         * spark,hadoop,hive,spark,spark,
         *
         * @param key offset
         * @param value text data
         * @param context output data (hadoop,1) (spark,1)
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(",");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }

        }
    }

    /**
     * The output of reduce map is the input of reduce
     */
    public static class ReduceTask extends Reducer<Text,IntWritable,Text,IntWritable> {
        /**
         * Each time the key is operated, the method is executed once
         * @param key
         * @param values
         * @param context
         * @throwsIOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int count = 0;
            for(IntWritable value:values){
                count + + ;
            }
            context.write(key,new IntWritable(count));
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //Cluster test mode, job object submits tasks
        Configuration configuration = new Configuration();
        String hdfsUrl = SystemUtil.getProperties().getProperty("spring.hdfs.url");
        configuration.set("fs.defaultFS",hdfsUrl);
        Job job = Job.getInstance(configuration);

        //Submit our two inner classes
        job.setMapperClass(WordCount.MapTask.class);
        job.setReducerClass(WordCount.ReduceTask.class);

        //Submit the type of the output parameter. Note that only the output parameter type is required.
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.addInputPath(job,new Path(SystemUtil.getProperties().getProperty("spring.hdfs.input")));
        //If the file already exists, delete it
        Path output = new Path(SystemUtil.getProperties().getProperty("spring.hdfs.output"));
        FileSystem fileSystem = FileSystem.get(configuration);

        if(fileSystem.exists(output)){
            fileSystem.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        Boolean b = job.waitForCompletion(true);
        System.out.println(b?"Success":"If failed, please find the bug");
    }
}

Prepare directories and files to be counted on the hadoop page

Run the code directly and view the statistical results