Several indicator statistics about SparkRdd and SparkSql, scala language, packaging and uploading to spark cluster, running in yarn mode

need:

? Requirements: Use SparkRDD and SparkSQL programming methods to complete the following data analysis, compare performance with webUI monitoring, and give a rational explanation for the results.
1. Count the number of users, gender, and occupation respectively:
2. Check the statistical age distribution (divided into 7 segments according to age)
3. Check the statistical occupational distribution (number of people according to occupation)
4. Statistics of the highest rating, lowest rating, average rating, median rating, average number of ratings per user, and average number of ratings per video:
5. Statistical score distribution
6. Count the number of ratings from different users.
7. Statistics on the distribution of different types of movies
8. Make statistics on movie releases every year.
9. Count the number of user reviews for each movie, the total score, and the average score.
10. Count the number of evaluations by each user, the total evaluation score and the average score
11. Find the 10 most rated movies and give the number of ratings (movie name, number of ratings)
12. Find the 10 highest-rated movies among men and women (gender, movie title, movie score)
13. Find the 10 movies that men and women have watched the most (gender, movie title)
14. Men in the age group of “18-24” like to watch the 10 most popular movies.
15. Find the average movie reviews (age group, movie score) of this movie for each age group (age group is 7) for movieid = 2116
16. Find the average movie rating (viewer, movie name, movie rating) of the 10 highest-rated movies by the woman who likes watching movies the most (with the most reviews).
17. The 10 best movies of the year with the most requests for good movies (rating >= 4.0)
18. Find the 10 highest-rated comedy movies released in 1997.
19. The 5 highest-rated movies of various genres in the film review database (genre, movie title, average movie rating)
20. The highest-rated movie genre in each year (year, genre, movie rating)

Build maven project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dataAnalysis</groupId>
    <artifactId>SparkRddAndSparkSQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>

        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
           <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>MovieDataAnalysisBySparkRDD</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>Run</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>



</project>

hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--mysql connection information-->
    <!-- URL of jdbc connection -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
</property>
    <!-- Driver for jdbc connection-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
</property>

<!-- username of jdbc connection-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- Password for jdbc connection -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
    <!-- Hive is in the HDFS working directory by default -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    
    <!-- Specify the port number for hiveserver2 connection -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
   <!-- Specify the host for hiveserver2 connection -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop102</value>
</property>

    

    <!-- Metadata storage authorization -->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
</property>
<!-- Verification of Hive metadata storage version -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>

<!-- High availability parameter of hiveserver2. Turning on this parameter can improve the startup speed of hiveserver2 -->
<property>
<name>hive.server2.active.passive.ha.enable</name>
<value>true</value>
</property>
\t
<!--Configuring hiveserver2 high availability-->
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop102</value>
</property>
<!--Configure metastore high availability--><!-- Specify the address to connect to when storing metadata -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop102:9083,thrift://hadoop104:9083</value>
</property>

\t
<!--Spark dependency location (note: port number 8020 must be consistent with the port number of namenode) -->
<property>
<name>spark.yarn.jars</name>
<value>hdfs://yang-HA/spark-jars/*</value>
</property>
\t  
<!--Hive execution engine-->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>

<!--Hive and Spark connection timeout-->
<property>
<name>hive.spark.client.connect.timeout</name>
<value>10000ms</value>
</property>
\t
</configuration>

run.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{<!-- -->Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{<!-- -->AccumulatorV2, LongAccumulator}
import org.apache.spark.{<!-- -->SparkConf, SparkContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
 * @description:
 * @author: Yu Wenzhi
 * @date 2022/5/18 10:22
 * @version 1.0
 */
object MovieDataAnalysisBySparkRDD {<!-- -->

  //Use spark Rdd to count the number of users, gender, and occupations respectively.
  def demand_1_by_sparkRdd(spark:SparkSession): Unit = {<!-- -->

    val sc = spark.sparkContext

    //Count the number of users, gender, and occupation respectively: User identity::Gender::Age::Occupation::Postal code
    //local[*]: Default mode. Automatically help you set the number of threads according to the maximum number of CPU cores. For example, if the CPU has 8 cores, Spark will automatically set up 8 threads for calculation for you.
    val users: RDD[String] = sc.textFile("hdfs://yang-HA/movie/users.dat")

    println("--------View users ADD blood dependencies-----------")
    println(users.toDebugString)
    //users will be reused and the data will be cached
    //users.cache()

    //Use the persist method to change the storage level cache() is the abbreviation of rdd.persist(StorageLevel.MEMORY_ONLY)
    users.persist(StorageLevel.MEMORY_AND_DISK_2)

    //Set the checkpoint. If something goes wrong after the checkpoint, avoid calculating the data from scratch and reduce overhead.
    //A new job will be started immediately to specifically perform checkpoint operations.
    // Therefore, it is recommended to use Cache cache for the RDD of checkpoint(), so that the checkpoint job
    // Just read the data from the Cache cache, otherwise you need to calculate the RDD from scratch.
    sc.setCheckpointDir("hdfs://yang-HA/spark_checkpoint")

    val quotas: ListBuffer[MovieQuota] = ListBuffer()

    sc.setJobGroup("008", "Use rdd to calculate the total number of users")
    val cnt: Long = users.filter(!_.isEmpty).count() //count() is the action operator, generating a job => job_00
    sc.setJobGroup("008", "Use rdd to calculate the total number of users")
    println("Total number of users: " + cnt)
    quotas.append(MovieQuota("001", "Total number of users", cnt.toString))


    println("---------Use the system accumulator to calculate the total number of users---------")
    sc.setJobGroup("007", "Use the system accumulator to calculate the total number of users")
    val sum: LongAccumulator = sc.longAccumulator("sum")
    users.foreach(l=>
      sum.add(1)
    )
    sc.setJobGroup("007", "Use the system accumulator to calculate the total number of users")
    println("Total number of users:" + sum.value)

    sc.setJobGroup("2", "Use rdd operator to calculate the number of men and women")
    val gender: RDD[(String, Int)] = users.map(line => {<!-- -->
      val lineArr: Array[String] = line.split("::")
      (lineArr(1), 1)
    }).reduceByKey(_ + _) //reduceByKey comes with cache

    val genderCount: Array[String] = gender.map(x => {<!-- --> //foreach is the action operator, generating a job => job2
      if (x._1.equals("M")) {<!-- -->
        "Number of men:" + x._2
      } else {<!-- -->
        "Number of women:" + x._2
      }
    }).collect() //job_02
    sc.setJobGroup("2", "Use rdd operator to calculate the number of men and women")
    quotas.append(MovieQuota("002", "Male and Female Statistics", genderCount.mkString(",")))

    sc.setJobGroup("3", "Use a custom accumulator to calculate the number of men and women")
    val accumulator = new MyAccumulator()
    sc.register(accumulator)
    users.map(data => {<!-- -->
      val arr: Array[String] = data.split("::")
      accumulator.add(arr(1))
    }).collect() //job_03
    sc.setJobGroup("3", "Use a custom accumulator to calculate the number of men and women")
    println(accumulator.value)

    println("---------Use custom accumulator to calculate career statistics---------")

    sc.setJobGroup("4", "Use a custom accumulator to calculate career statistics")
    val professionAcc = new MyAccumulator()
    sc.register(professionAcc)
    users.map(line => {<!-- -->
      professionAcc.add(line.split("::")(3))
    }).collect() //job_04
    sc.setJobGroup("4", "Use a custom accumulator to calculate career statistics")
    println(professionAcc.value)


    sc.setJobGroup("b", "checkpoint fault tolerance mechanism starts a job, recalculates data and stores it in hdfs")
    val professionCount: RDD[(String, Int)] = users.map(line => {<!-- -->
      val arr: Array[String] = line.split("::")
      (arr(3), 1)
    }).reduceByKey(_ + _)
    professionCount.cache() //Only cache in memory
    professionCount.checkpoint() //job_05
    sc.setJobGroup("b", "checkpoint fault tolerance mechanism starts a job, recalculates data and stores it in hdfs")

    sc.setJobGroup("a", "Use rdd operator to calculate career statistics")
    val profession: RDD[String] = sc.textFile("hdfs://yang-HA/movie/profession.dat")
    val professionRelation: RDD[(String, String)] = profession.map(line => {<!-- -->
      val arr: Array[String] = line.split(":")
      (arr(0), arr(1))
    })
    val quotas1: Array[MovieQuota] = professionCount.join(professionRelation).map(line => {<!-- -->
      MovieQuota("003", "Career Statistics", line._2._2.trim + ": " + line._2._1)
    }).collect() //job_06
    sc.setJobGroup("a", "Use rdd operator to calculate career statistics")

    quotas.appendAll(quotas1)

    loadDataToHiveLocation(quotas,spark)

  }

  //Use sparkSQL to count the number of users, gender, and occupations respectively.
  def demand_1_by_sparkSql(spark:SparkSession): Unit = {<!-- -->

    spark.sparkContext.setJobGroup("sparksql", "sparksql")

    val ds: Dataset[String] = spark.read.textFile("hdfs://yang-HA/movie/users.dat")

    import spark.implicits._

    val userDS: Dataset[user] = ds.map(line => {<!-- -->
      val lineArr: Array[String] = line.split("::")
      user(lineArr(0), lineArr(1), lineArr(2), lineArr(3), lineArr(4))
    })
    val professionDS: Dataset[profession] = spark.read.textFile("hdfs://yang-HA/movie/profession.dat").map(line => {<!-- -->
      val lineArr: Array[String] = line.split(":")
      profession(lineArr(0), lineArr(1).trim)
    })

    userDS.groupBy("professionId").count
      .join(professionDS, List("professionId"), "left")
      .orderBy("professionId")
      .createOrReplaceTempView("tmp1")
    userDS.createOrReplaceTempView("user")

    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)

    spark.sql(
      """
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select '004','sparkSql career statistics',concat_ws(':',trim(professionName),count), current_date() dt from tmp1;
        |""".stripMargin)

  }

  //Statistics on the highest rating, lowest rating, average rating, median rating, average number of ratings per user, and average number of ratings per video:
  def demand_2_by_sparkRdd(spark:SparkSession): Unit = {<!-- -->
    var sc = spark.sparkContext
    //Statistics on the highest rating, lowest rating, average rating, median rating, average number of ratings per user, and average number of ratings per video:
    //Number of ratings for each user: Total number of ratings / Total number of ratings (requires deduplication)
    //Highest rating = take the maximum value of each movie's rating, and the same applies to the lowest rating
    val rating: RDD[String] = sc.textFile("hdfs://yang-HA/movie/ratings.dat")
    val quotas = new ListBuffer[MovieQuota]


    /*var rank = 0
    var beforeVal = -1.0


    rating.map(line => {
      val arr: Array[String] = line.split("::")
      (arr(1), arr(2).toDouble)
    }).groupByKey().map {
      case (k, v) => {
        val sum1: Double = v.sum
        (k, sum1 / v.size)
      }
    }.collect().sortWith((kv1, kv2) => {
      kv1._2 > kv2._2
    }).map(kv=>{
      if(kv._2 != beforeVal){
        beforeVal = kv._2
        rank + =1
      }
      (kv,rank)
    }).filter(_._2==1).foreach(println)*/

    import spark.implicits._
    val ratingTuples: RDD[(String, String, String, String)] = rating.map(line => {<!-- -->
      val arr: Array[String] = line.split("::")
      (arr(0), arr(1), arr(2), arr(3))
    })
    ratingTuples.cache()
    //userId, movieID, rating, timestamp
    val ratingCnt: Long = ratingTuples.count()
    val NumberPeoples: Long = ratingTuples.map(_._1).distinct(8).count()
    val movieCnt: Long = ratingTuples.map(_._2).distinct(8).count()

    quotas.append(MovieQuota("005","The average number of ratings per user, the average number of ratings per movie",(ratingCnt/NumberPeoples + "," + ratingCnt/movieCnt)))

    ratingTuples.map(line => (line._2, line._3.toDouble)).groupByKey().map(kv => {<!-- -->
      //Statistics on the highest rating, lowest rating, and average rating
      varmedian=0
      if (kv._2.size % 2 == 1) {<!-- -->
        //odd number
        median = (kv._2.size + 1) / 2
      } else {<!-- -->
        //even
        median = kv._2.size / 2
      }
      // println(median)
      val medianVal: Double = kv._2.toList.sortWith((v1, v2) => {<!-- -->
        v1 > v2
      }).apply(median - 1)

      val avgVal: Double = kv._2.sum / kv._2.size

      MovieQuota("006", "Highest rating, lowest rating, average rating, median rating", (kv._1, kv._2.max, kv._2.min, f"$avgVal%.3f ", medianVal).toString())
    }).toDS.createOrReplaceTempView("tmp2")

    spark.sql(
      """
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select *,current_date() dt from tmp2
        |""".stripMargin)

    loadDataToHiveLocation(quotas,spark)

  }

  //Load data into hive table
  def loadDataToHiveLocation(quotas: Seq[MovieQuota],spark:SparkSession): Unit ={<!-- -->

    import spark.implicits._
    val sc: SparkContext = spark.sparkContext

    quotas.toDS.createOrReplaceTempView("quotas")

    spark.sql(
      """
        |msck repair table spark_data_analysis_quota.movie_quota;
        |""".stripMargin)

    spark.sql(
      s"""
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select *, current_date() dt from quotas
        |""".stripMargin)

    sc.setJobGroup("c", "Save the file to the location of the hive table")
  }



}

object Run {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->

    //Set the username to access the HDFS cluster
    System.setProperty("HADOOP_USER_NAME", "atguigu")
    System.setProperty("file.encoding", "UTF-8")

    // 1 Create context environment configuration object
    val conf: SparkConf = new SparkConf()
      .setAppName("movie_data_analysis")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //Replace the default serialization mechanism
      .registerKryoClasses(Array(classOf[MovieQuota])) //Register custom classes using kryo serialization
      .setMaster("yarn")

    // 2 Create SparkSession object
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)

    //Use spark Rdd to count the number of users, gender, and occupations respectively.
    MovieDataAnalysisBySparkRDD.demand_1_by_sparkRdd(spark)
    //Use sparkSQL to count the number of users, gender, and occupations respectively.
    MovieDataAnalysisBySparkRDD.demand_1_by_sparkSql(spark)
    //Use spark Rdd to count the highest rating, lowest rating, average rating, median rating, average number of ratings per user, and average number of ratings per video:
    MovieDataAnalysisBySparkRDD.demand_2_by_sparkRdd(spark)

    spark.close()
  }
}

case class MovieQuota(var quota_id: String, var quota_name: String, var quota_value: String) {<!-- -->
  override def toString: String = {<!-- -->
    quota_id + '\t' + quota_name + '\t' + quota_value
  }
}


case class profession(professionId: String, professionName: String)

case class rating(userId: String, movieID: String, rating: String, timestamp: String)

case class user(userId: String, gender: String, ageGrades: String, professionId: String, postalCode: String)

case class movie(movieID: String, title: String, genres: String)


//Based on the input fields, count the total number of fields
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {<!-- -->

  private val genderCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()

  override def isZero: Boolean = genderCountMap.isEmpty

  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {<!-- -->
    newMyAccumulator
  }

  override def reset(): Unit = genderCountMap.clear

  override def add(v: String): Unit = {<!-- -->

    if (v.equals("M")) {<!-- -->
      genderCountMap("male") = genderCountMap.getOrElse("male", 0L) + 1L
    } else if (v.equals("F")) {<!-- -->
      genderCountMap("female") = genderCountMap.getOrElse("female", 0L) + 1L
    } else {<!-- -->
      genderCountMap(v) = genderCountMap.getOrElse(v, 0L) + 1L
    }
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {<!-- -->
    other.value.foreach {<!-- --> case (key, value) => {<!-- -->
      genderCountMap(key) = genderCountMap.getOrElse(key, 0L) + value
    }
    }
  }

  override def value: mutable.Map[String, Long] = this.genderCountMap
}

target table ddl

CREATE TABLE `spark_data_analysis_quota.movie_quota`(
     `quota_id` string COMMENT 'indicator id',
     `quota_name` string COMMENT 'Indicator name',
     `quota_value` string COMMENT 'indicator value')
    COMMENT 'Movie indicator analysis table'
    PARTITIONED BY (
        `dt` string)
    clustered by (quota_id) into 3 buckets
    stored as orc

Use maven install to create the jar package and put it on the spark cluster. Start each big data cluster component and execute run_spark_job_byJar.sh

$SPARK_HOME/bin/spark-submit \
--class Run \
--master yarn \
--deploy-mode cluster \
--queue spark \
--conf spark.executor.extraJavaOptions="-Dfile.encoding=UTF-8" \
--conf spark.driver.extraJavaOptions="-Dfile.encoding=UTF-8" \
MovieDataAnalysisBySparkRDD.jar \

View http://hadoop104:8088/cluster yarn history server

Click history and jump to [spark history server (start sbin/start-history-server.sh on hadoop102)] http://hadoop102:4000

View spark job logs

Attached:

Cluster start and stop script

cat hadoopHA.sh
#!/bin/bash
if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi

start_cluster(){<!-- -->
        echo " =================== Start hadoop cluster ==================="
        echo " --------------- Start hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/sbin/start-dfs.sh"
        echo " --------------- start yarn ---------------"
        ssh hadoop103 "/opt/module/hadoopHA/sbin/start-yarn.sh"
        echo " --------------- Start historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon start historyserver"
echo "---------start spark log server----------"
ssh hadoop102 "/opt/module/spark/sbin/start-history-server.sh "
echo "-----Start hiveservice------"
ssh hadoop102 "/home/atguigu/bin/hiveservices.sh start"
}


stop_cluster(){<!-- -->
        echo " =================== Shut down the hadoop cluster ==================="
echo "---------Close hiveservice-------------"
ssh hadoop102 "/home/atguigu/bin/hiveservices.sh stop"
        echo " --------------- Close historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon stop historyserver"
        echo " --------------- close yarn ---------------"
        ssh hadoop103 "/opt/module/hadoopHA/sbin/stop-yarn.sh"
        echo " --------------- close hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/sbin/stop-dfs.sh"
echo "---------Stop spark log server----------"
ssh hadoop102 "/opt/module/spark/sbin/stop-history-server.sh "
}

case $1 in
"start")
echo "--------Start zookeeper----------"
sh /home/atguigu/bin/dataCollectSystem/zk.sh start
echo "-------Start big data high availability cluster-------"
start_cluster
;;
"stop")
stop_cluster
echo "----------Close zookeeper------------"
sh /home/atguigu/bin/dataCollectSystem/zk.sh stop
;;
"restart")
echo "---------Restart the cluster---------"
stop_cluster
start_cluster
;;
"status")
echo "================hadoopHA cluster status of each node==========="
echo "==========hadoop102,nn1========="
n1_port=`ssh hadoop102 "jps | grep -v Jps | grep NameNode"`
nn1=`hdfs haadmin -getServiceState nn1`
echo ${<!-- -->n1_port}" "${<!-- -->nn1}
\t
echo "==========hadoop103,nn2,rm1========="
n2_port=`ssh hadoop103 "jps | grep -v Jps | grep NameNode"`
nn2=`hdfs haadmin -getServiceState nn2`
echo ${<!-- -->n2_port}" "${<!-- -->nn2}
rm1_port=`ssh hadoop103 "jps | grep -v Jps | grep ResourceManager"`
rm1=`yarn rmadmin -getServiceState rm1`
echo ${<!-- -->rm1_port}" "${<!-- -->rm1}
\t
echo "==========hadoop104,rm2========="
rm2_port=`ssh hadoop104 "jps | grep -v Jps | grep ResourceManager"`
rm2=`yarn rmadmin -getServiceState rm2`
echo ${<!-- -->rm2_port}" "${<!-- -->rm2}
;;
*)
    echo "Input Args Error..."
;;
esac

cat hiveservices.sh

#!/bin/bash
HIVE_LOG_DIR=$HIVE_HOME/logs
if [ ! -d $HIVE_LOG_DIR ]
then
mkdir -p $HIVE_LOG_DIR
fi
#Check whether the process is running normally. Parameter 1 is the process name and parameter 2 is the process port.
function check_process()
{<!-- -->
    pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')
    ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)
    echo $pid
    [[ "$pid" =~ "$ppid" ]] & amp; & amp; [ "$ppid" ] & amp; & amp; return 0 || return 1
}

function hive_start()
{<!-- -->
    metapid=$(check_process HiveMetastore 9083)
    cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2> & amp;1 & amp;"
    cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2> & amp;1"
    [ -z "$metapid" ] & amp; & amp; eval $cmd || echo "Metastroe service has started"
    server2pid=$(check_process HiveServer2 10000)
    cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2> & amp;1 & amp;"
    [ -z "$server2pid" ] & amp; & amp; eval $cmd || echo "HiveServer2 service has started"
}

function hive_stop()
{<!-- -->
    metapid=$(check_process HiveMetastore 9083)
    [ "$metapid" ] & amp; & amp; kill $metapid || echo "Metastore service is not started"
    server2pid=$(check_process HiveServer2 10000)
    [ "$server2pid" ] & amp; & amp; kill $server2pid || echo "HiveServer2 service is not started"
}

case $1 in
"start")
    hive_start
    ;;
"stop")
    hive_stop
    ;;
"restart")
    hive_stop
    sleep 2
    hive_start
    ;;
"status")
    check_process HiveMetastore 9083 >/dev/null & amp; & amp; echo "The Metastore service is running normally" || echo "The Metastore service is running abnormally"
    check_process HiveServer2 10000 >/dev/null & amp; & amp; echo "HiveServer2 service is running normally" || echo "HiveServer2 service is running abnormally"
    ;;
*)
    echo Invalid Args!
    echo 'Usage: '$(basename $0)' start|stop|restart|status'
    ;;
esac