Partition and full sorting (WritableComparable)

1. Concept

1. Partition:

Hadoop’s default partition is obtained modulo the number of ReduceTasks based on the hashCode of the key. Users cannot control which key is stored in which partition.

 If you want to control which key is stored in which partition, you need to customize the class to inherit Partitioner<KEY, VALUE>.<br>Generic KEY and VALUE respectively correspond to the output key and value in Mapper, because partitioning is completed after map() when the ring buffer overflows.<br>?<strong>Tip</strong>: If the number of ReduceTasks is greater than the number of partitions set by getPartition() overridden in the custom class, an empty output file part-r-00000 will be generated.<br>
 If the number of ReduceTasks is less than the number of partitions set by getPartition() overridden in the custom class, some partition data has nowhere to place, and an error will be reported.<br>
 If the number of ReduceTasks is equal to 1, the custom partitioning method will not be used. The system default partition is 1, and only one partition file will be output in the end.<br>The partition number must start from 0 and increase one by one.<br><br><strong>2. Full sorting:<br></strong>The final output result is only one file, and the file is internally ordered. The way to achieve this is to set up only one ReduceTask. However, this method is extremely inefficient when processing large files.<br>Because one machine processes all files, the parallel architecture provided by MapReduce is completely lost. 
<strong>2. Project examples</strong><br>?<strong>1. Text to be processed</strong><br>?
    
    
     
     <img src="//i2.wp.com/img-blog.csdnimg.cn/img_convert/eeef12d4ab362644062457227bac33b5.gif" id="code_img_closed_0e2e8740-3bc9-4bcb-9e20-ae1b280b248f" style="outline: none ;">
    
    data.txt
 <strong>2. Requirements:</strong><br>The information of infected persons in each city is counted separately and output to the corresponding file (note: the personnel information of Wuhan is uniformly output to one file, and the personnel information of Shiyan is uniformly output to one file),<br>The output results are arranged in reverse order according to the age of the infected persons. Example of output results:
 Region Name Age Gender<br>Wuhan Zhang San 70 Female<br><br> Wuhan Li Si 50 Male
 Wuhan Wang Wu 60 Female 
 Wuhan Zhao Liu 55 Male<br>3.<strong>Person2Bean.java</strong>  

package com.jh.work8;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Person2Bean implements WritableComparable<Person2Bean> {
    private String area; // Infected area
    private String name; // Infected name
    private Integer age; // Infection age
    private String sex; // infected sex

    public Person2Bean() {
    }

    @Override
    public String toString() {
        return area + "\t" + name + "\t" + age + "\t" + sex;
    }

    public String getArea() {
        return area;
    }

    public void setArea(String area) {
        this.area = area;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    // sort
    @Override
    public int compareTo(Person2Bean o) {
        // Sort by infection age in reverse order (here is mainly sorting within the district)
        return o.getAge().compareTo(this.age);
    }

    // Serialization
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(area);
        out.writeUTF(name);
        out.writeUTF(sex);
        out.writeInt(age);
    }

    //Deserialize
    @Override
    public void readFields(DataInput in) throws IOException {
        area = in.readUTF();
        name = in.readUTF();
        sex = in.readUTF();
        age = in.readInt();
    }
}

Person2Bean

 4. <strong>Person2Mapper.java</strong>

package com.jh.work8;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Person2Mapper extends Mapper<LongWritable,Text,Person2Bean,NullWritable> {
    private Person2Bean bean = new Person2Bean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Get the content of each line of text and split it
        String[] split = value.toString().split("\t");

        //Assign value
        bean.setArea(split[0]);
        bean.setAge(Integer.parseInt(split[1]));
        bean.setSex(split[2]);
        bean.setName(split[3]);
        
        context.write(bean,NullWritable.get());
    }
}

Person2Mapper


 5. <strong>PersonPartition.java</strong>

package com.jh.work8;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class PersonPartition extends Partitioner<Person2Bean,NullWritable> {
    /*
        Custom partition, inherit Partitioner
        Generics correspond to the output of the Mapper side
     */
    @Override
    public int getPartition(Person2Bean person2Bean, NullWritable nullWritable, int numPartitions) {
        //Make three partitions based on infected areas
        switch (person2Bean.getArea()){
            case "Wuhan City":
                return 0;
            case "Huangshi City":
                return 1;
            default:
                return 2;
        }
    }
}

PersonPartition


 6.<strong>Person2Reduce.java</strong>

package com.jh.work8;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Person2Reduce extends Reducer<Person2Bean,NullWritable,Person2Bean,NullWritable> {
    @Override
    protected void reduce(Person2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // Output all
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}

Person2Reduce


 7.<strong>Person2Driver.java</strong>

package com.jh.work8;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Person2Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // Get the configuration file and job object
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //Set the path to the jar
        job.setJarByClass(Person2Driver.class);

        //Set the mapper class and reducer class
        job.setMapperClass(Person2Mapper.class);
        job.setReducerClass(Person2Reduce.class);

        //Set the data type of key and value output by mapper
        job.setMapOutputKeyClass(Person2Bean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //Set the data type of key and value output by reduce
        job.setOutputKeyClass(Person2Bean.class);
        job.setOutputValueClass(NullWritable.class);

        //Set the class of custom partition
        job.setPartitionerClass(PersonPartition.class);
        //Set the number of ReduceTasks
        job.setNumReduceTasks(3);

        //Set the input path of the file to be processed
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //Set the output path of the calculated data file
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        // Submit calculation task (job)
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0:1);
    }
}

Person2Driver


 8. The final output is three files:<br>
    
    
     
     <img src="//i2.wp.com/img-blog.csdnimg.cn/img_convert/0109a04e7e899c3368538629c0e294c6.png" alt="" style="outline: none;">
    
    
    
    
     
     <img src="//i2.wp.com/img-blog.csdnimg.cn/img_convert/d2f644492fe8545bdcb0373a716aa325.png" alt="" style="outline: none;">
    
    
    
    
     
     <img src="//i2.wp.com/img-blog.csdnimg.cn/img_convert/e02adb320b30d9324ae747561da2ec4e.png" alt="" style="outline: none;">
    
    





<br><br><br><br><br><br>
 
<br><br><br>