5.Hadoop (Yarn)

5.Hadoop (Yarn)

  • 5.Hadoop (Yarn)
  • Chapter 1 Yarn Resource Scheduler
    • 1.1 Yarn infrastructure
    • 1.2 Yarn working mechanism
    • 1.3 The whole process of homework submission
    • 1.4 Yarn scheduler and scheduling algorithm
      • 1.4.1 First-in-first-out scheduler (FIFO)
      • 1.4.2 Capacity Scheduler
      • 1.4.3 Fair Scheduler
  • Chapter 2 Yarn case practice
    • 2.1 Yarn production environment core parameter configuration case
    • 2.2 Capacity scheduler multi-queue submission case
      • 2.2.1 Requirements
      • 2.2.2 Configuring a multi-queue capacity scheduler
      • 2.2.3 Submitting tasks to the Hive queue

5.Hadoop (Yarn)

Chapter 1 Yarn Resource Scheduler

think:

1) How to manage cluster resources?

2) How to reasonably allocate resources to tasks?

Yarn is a resource scheduling platform that is responsible for providing server computing resources for computing programs. It is equivalent to a distributed operating system platform, while computing programs such as MapReduce are equivalent to applications running on the operating system.

1.1 Yarn infrastructure

YARN is mainly composed of ResourceManager, NodeManager, ApplicationMaster and Container components.

img

1.2 Yarn working mechanism

img

? (1) The MR program is submitted to the node where the client is located.

? (2) YarnRunner applies for an Application from ResourceManager.

? (3) RM returns the resource path of the application to YarnRunner.

? (4) The program submits the resources required for operation to HDFS.

? (5) After the program resources are submitted, apply to run mrAppMaster.

? (6) RM initializes the user’s request into a Task.

? (7) One of the NodeManagers receives the Task task.

? (8) The NodeManager creates a container Container and generates MRAppmaster.

? (9) Container copies resources from HDFS to local.

? (10) MRAppmaster applies to RM for running MapTask resources.

? (11) RM assigns the task of running MapTask to two other NodeManagers, and the other two NodeManagers receive the tasks and create containers respectively.

? (12) MR sends the program startup script to the two NodeManagers that received the task. The two NodeManagers start MapTask respectively, and MapTask sorts the data partitions.

(13) MrAppMaster waits for all MapTasks to finish running, then applies for a container from RM and runs the ReduceTask.

? (14) ReduceTask obtains the data of the corresponding partition from MapTask.

? (15) After the program is completed, MR will apply to RM to cancel itself.

1.3 The whole process of homework submission

img

img

img

Detailed explanation of the entire homework submission process

(1) Homework submission

Step 1: Client calls the job.waitForCompletion method to submit MapReduce jobs to the entire cluster.

Step 2: Client applies for a job ID from RM.

Step 3: RM returns the submission path and job ID of the job resource to the Client.

Step 4: Client submits the jar package, slicing information and configuration files to the specified resource submission path.

Step 5: After the client submits the resources, it applies to RM to run MrAppMaster.

(2) Job initialization

Step 6: After RM receives the Client’s request, it adds the job to the capacity scheduler.

Step 7: An idle NM receives the job.

Step 8: The NM creates Container and generates MRAppmaster.

Step 9: Download the resources submitted by the Client to the local area.

(3) Task allocation

Step 10: MrAppMaster applies to RM for resources to run multiple MapTask tasks.

Step 11: RM assigns the task of running MapTask to two other NodeManagers, and the other two NodeManagers receive the tasks and create containers respectively.

(4) Task running

Step 12: MR sends the program startup script to the two NodeManagers that received the task. The two NodeManagers start MapTask respectively, and MapTask sorts the data partitions.

Step 13: MrAppMaster waits for all MapTasks to finish running, then applies for a container from RM and runs the ReduceTask.

Step 14: ReduceTask obtains the data of the corresponding partition from MapTask.

Step 15: After the program is finished running, MR will apply to RM to cancel itself.

(5) Progress and status updates

Tasks in YARN return their progress and status (including counter) to the application manager. The client requests progress updates from the application manager every second (set through mapreduce.client.progressmonitor.pollinterval) and displays them to the user.

(6) Homework completed

In addition to requesting job progress from the application manager, the client checks whether the job is completed by calling waitForCompletion() every 5 seconds. The time interval can be set through mapreduce.client.completion.pollinterval. After the job is completed, the application manager and Container will clean up the job status. Job information will be stored by the job history server for later user verification.

1.4 Yarn scheduler and scheduling algorithm

Currently, there are three main types of Hadoop job schedulers: FIFO, capacity (Capacity Scheduler) and fairness (Fair Scheduler). The default resource scheduler of Apache Hadoop3.1.3 is Capacity Scheduler.

The default scheduler of the CDH framework is Fair Scheduler.

For detailed settings, see: yarn-default.xml file

The class to use as the resource scheduler.

yarn.resourcemanager.scheduler.class

org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

1.4.1 First-in-first-out scheduler (FIFO)

FIFO scheduler (First In First Out): Single queue, first come first served according to the order in which jobs are submitted.

img

Advantages: Simple and easy to understand.

Disadvantages: Does not support multiple queues and is rarely used in production environments.

1.4.2 Capacity Scheduler

Capacity Scheduler is a multi-user scheduler developed by Yahoo.

img

img

1.4.3 Fair Scheduler

Fair Schedulere is a multi-user scheduler developed by Facebook.

img

img

img

img

img

img

Chapter 2 Yarn Case Practice

Note: Try to take a Linux snapshot before adjusting the following parameters, otherwise you will need to prepare the cluster again for subsequent cases.

2.1 Yarn production environment core parameter configuration case

1) Requirement: From 1G data, count the number of occurrences of each word. There are 3 servers, each equipped with 4G memory, 4-core CPU, and 4 threads.

2) Demand analysis:

1G / 128m = 8 MapTask; 1 ReduceTask; 1 mrAppMaster

On average, each node runs 10 / 3 units ≈ 3 tasks (4 3 3)

3) Modify the yarn-site.xml configuration parameters as follows:

<!-- Select scheduler, default capacity -->
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

<!-- The number of threads used by ResourceManager to process scheduler requests, the default is 50; if the number of submitted tasks is greater than 50, you can increase this value, but it cannot exceed 3 * 4 threads = 12 threads (the actual number cannot exceed 8 if other applications are removed) -->
<property>
<description>Number of threads to handle scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>8</value>
</property>


<!--
Whether to treat the virtual core number as the CPU core number. The default is false and the physical CPU core number is used.
-->
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.count-logical-processors-as-cores</name>
<value>false</value>
</property>

<!-- Whether to let yarn automatically detect hardware for configuration. The default is false. If the node has many other applications, manual configuration is recommended. If there are no other applications on the node, you can use automatic -->
<property>
<description>Enable auto-detection of node capabilities such as
memory and CPU.
</description>
<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
<value>false</value>
</property>


<!--
The number of cores converted into Vcores (the multiplier of the number of virtual cores and the number of physical cores, the default is 1.0)
The vcore in hadoop is not a real core. Usually the number of vcores is set to 1 to 5 times the number of logical CPUs.
-->
<property>
<description>Multiplier to determine how to convert phyiscal cores to vcores. This value is used if
yarn.nodemanager.resource.cpu-vcores is set to -1(which implies auto-calculate vcores) and
yarn.nodemanager.resource.detect-hardware-capabilities is set to true. The number of vcores will be calculated as number of CPUs * multiplier.
</description>
<name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
<value>1.0</value>
</property>

<!-- The amount of memory used by NodeManager, the default is 8G, change it to 4G memory -->
<property>
<description>Amount of physical memory, in MB, that can be allocated
for containers. If set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically calculated (in case of Windows and Linux).
In other cases, the default is 8192MB.
</description>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>

<!-- The number of CPU cores of nodemanager, if it is not automatically set according to the hardware environment, the default is 8, modified to 4 -->
<property>
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>

<!-- Minimum memory of container, default 1G -->
<property>
<description>The minimum allocation for every container request at the RM in MBs. Memory requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have less memory than this value will be shut down by the resource manager.
</description>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>

<!-- The maximum memory of the container, the default is 8G, modified to 2G -->
<property>
<description>The maximum allocation for every container request at the RM in MBs. Memory requests higher than this will throw an InvalidResourceRequestException.
</description>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>

<!-- Minimum number of CPU cores for the container, default 1 -->
<property>
<description>The minimum allocation for every container request at the RM in terms of virtual CPU cores. Requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have fewer virtual cores than this value will be shut down by the resource manager.
</description>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value>
</property>

<!-- The maximum number of CPU cores in the container, the default is 4, modified to 2 -->
<property>
<description>The maximum allocation for every container request at the RM in terms of virtual CPU cores. Requests higher than this will throw an
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>2</value>
</property>

<!-- Virtual memory check, turned on by default, modified to off -->
<property>
<description>Whether virtual memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- The ratio of virtual memory to physical memory settings, default 2.1 -->
<property>
<description>Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio.
</description>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

img

4) Distribution configuration

Note: If the hardware resources of the cluster are inconsistent, each NodeManager must be configured separately.

5) Restart the cluster

[root@hadoop102 hadoop-3.1.3]$ sbin/stop-yarn.sh

[root@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

6) Execute the WordCount program

[root@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output

7) Observe the Yarn task execution page

http://hadoop103:8088/cluster/apps

2.2 Capacity scheduler multi-queue submission case

1) How to create a queue in a production environment?

(1) The scheduler has only one default queue by default, which cannot meet production requirements.

(2) According to the framework: hive/spark/flink, the tasks of each framework are placed in the designated queue (not used by many enterprises).

(3) According to the business module: login registration, shopping cart, order, business department 1, business department 2.

2) What are the benefits of creating multiple queues?

(1) Because I am worried that employees will accidentally write recursive and infinite loop code and exhaust all resources.

(2) Realize the complete downgrading and use of tasks, and ensure sufficient resources for important task queues during special periods. 11.11 6.18

Business department 1 (important) => Business department 2 (more important) => Order (general) => Shopping cart (general) => Log in and register (minor).

2.2.1 Requirements

? Requirement 1: The default queue accounts for 40% of the total memory, the maximum resource capacity accounts for 60% of the total resources, the hive queue accounts for 60% of the total memory, and the maximum resource capacity accounts for 80% of the total resources.

2.2.2 Configuring a multi-queue capacity scheduler

1) Configure as follows in capacity-scheduler.xml

? (1) Modify the following configuration

<!-- Specify multiple queues and add hive queue -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>

<!-- Reduce the rated capacity of the default queue resource to 40%, the default is 100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>

<!-- Reduce the maximum capacity of the default queue resource to 60%, the default is 100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>
(2) Add necessary attributes to the new queue
<!-- Specify the resource rated capacity of the hive queue -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>

<!-- The maximum number of queue resources a user can use, 1 means all -->
<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>

<!-- Specify the maximum resource capacity of the hive queue -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value>80</value>
</property>

<!-- Start hive queue -->
<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>

<!-- Which users have the right to submit jobs to the queue -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>

<!-- Which users have the right to operate the queue, administrator rights (view/kill) -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>

<!-- Which users have the authority to configure submission task priority -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>

<!-- Task timeout setting: yarn application -appId appId -updateLifetime Timeout
Reference: https://blog.cloudera.com/enforcing-application-lifetime-slas-yarn/ -->

<!-- If the application specifies a timeout, the maximum timeout that can be specified by the application submitted to the queue cannot exceed this value.
-->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>

<!-- If the application does not specify a timeout, default-application-lifetime is used as the default value -->
<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>

2) Distribute configuration files

3) Restart Yarn or execute yarn rmadmin -refreshQueues to refresh the queue, and you will see two queues.

[root@hadoop102 hadoop-3.1.3]$ yarn rmadmin -refreshQueues

img

2.2.3 Submitting tasks to the Hive queue

1) Hadoop jar method

[root@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.queuename=hive /input /output

Note: -D means changing parameter values during runtime

2) How to create a jar package

The default task submissions are submitted to the default queue. If you want to submit tasks to other queues, you need to declare in the Driver:

public class WcDrvier {<!-- -->

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {<!-- -->

        Configuration conf = new Configuration();

        conf.set("mapreduce.job.queuename","hive");

        //1. Get a Job instance
        Job job = Job.getInstance(conf);

        . . . . . .

        //6. Submit Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

In this way, when the task is submitted to the cluster, it will be submitted to the hive queue:

img