HBase and Hadoop integration

1. Start the hadoop service process and hbase service process

[Command 001]:

start-all.sh

start-hbase.sh

2.Create the directory /root/experiment/datas on HDFS

[Command 002]:

hadoop fs -mkdir -p /root/experiment/hbase/file1.txt /root/experiment/datas

3. Upload the local directory /root/experiment/datas/hbase/file1.txt file to the /root/experiment/datas directory of HDFS

[Command 003]:

hadoop fs -put /root/experiment/datas/hbase/file1.txt /root/experiment/datas

2) Experimental process

1. Double-click the “IDEA” icon on the desktop. If it is the first time to use it, the “Welcome IntelliJ IDEA” window will pop up. Select the “Create New Project” option to start creating a new project, as shown in the figure.

2. The “New Project” window pops up, select the project type “Maven” to be created, and click the “Next” button.

3. The “New Project” window pops up, enter experiment as the GroupId and hadoop2hbase as the ArtifactId. Click the “Next” button.

4. Click the “Finish” button to complete the initial establishment of the project.

5. At this time, the project window will be entered. If the Tip of the Day window pops up, you can click the “Close” button to close the window and enter the program writing interface.

6. The following interface appears, as shown below

7. In the dialog box that pops up in the lower right corner, select Enable Auto-Import (please ignore this step if the dialog box does not pop up)

8. Change the pom.xml file as shown below:

[Code 001]:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
    http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>demo</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>demo</name>
    <url>http://maven.apache.org</url>
    <properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient
            </artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

9. Click the Java folder in the project, right-click and select New, and select Package from the submenu, as shown in the figure below:

10. A dialog box pops up, as shown below:

11. Fill in the created package name experiment in the window input box, as shown below:

12. Click the OK button, as shown below:

13. Click on the package experiment, right-click and select New, and select java Class in the submenu, as shown in the figure below:

14. A dialog box pops up, as shown below:

15. Enter the class name WordCountUpLoadToHBase in the pop-up window as shown below:

16. The code of class WordCountUpLoadToHbase is as follows:

[Code 002]:

package experiment;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.util.Tool;
public class WordCountUpLoadToHBase extends Configured {
    public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
            StringTokenizer strs = new StringTokenizer(value.toString());
            while(strs.hasMoreTokens()){
                word.set(strs.nextToken());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
            }
        }
    }
    public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{
        public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable val:values){
                sum + = val.get();
            }
            Put put = new Put(key.get());
         put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum + ""));
            context.write(key, put);
        }
    }
    @SuppressWarnings("all")
    public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","master");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        HBaseAdmin admin = new HBaseAdmin(conf);
        //If the table exists, delete it
        if(admin.tableExists(tableName)){
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
        tableDescriptor.addFamily(columnDescriptor);
        admin.createTable(tableDescriptor);

        Job job = new Job(conf,"upload to hbase");
        job.setJarByClass(WordCountUpLoadToHBase.class);
        job.setMapperClass(WCHBaseMapper.class);
        TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPaths(job, "hdfs://master:9000/root/experiment/datas/file1.txt");
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

17. Right-click in the code and select the Run option, as shown below:

18. The running results are as shown below

19. Open the command line and enter hbase shell

[Command 004]:

hbase shell

20. Enter the command to view the table list on the hbase command line

[Command 005]:

list

21. Enter the command to view the contents of the wordcount table in the hbase command line

[Command 006]:

scan 'wordcount'

22. You can see that the table wordcount stores each word and the number of times the word appears.

23. In the same way, create the class MRReadFromHbase on the package experiment, as shown below

24. The code of class MRReadFromHbase is as follows:

[Code 003]:

package experiment;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRReadFromHbase extends Configured {
    public static class WCHBaseMapper extends TableMapper<Text, Text>{
        @Override
        public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
            StringBuffer sb =new StringBuffer("");
            for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
                String str =new String(value.getValue());
                System.out.println(new String("" + key.getLength() + key.get()) + new String(value.getKey()) + ":" + str);
                if(str!=null){
                    sb.append(str);
                }

                context.write(new Text(key.get()), new Text(sb.toString()));
            }
        }
    }
    public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
        private Text result =new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            for(Text val:values){
                result.set(val);
                context.write(key,result);
            }
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf =HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        Job job =new Job(conf,"read from hbase to hdfs");
        job.setJarByClass(MRReadFromHbase.class);
        job.setReducerClass(WCHBaseReducer.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/root/experiment/output"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

25. Right-click in the code and select the Run option, as shown below:

26. The running results are as shown below

27. Reopen a command line window and view the result output file

[Command 007]:

hadoop fs -cat /root/experiment/output/part-r-00000

28. You can see that the data has been taken out from the wordcount table in Hbase and printed.