ErlangOTP build application – process pool

ErlangOTP build application – process pool

  • foreword
  • process pool
    • process pool supervision tree
  • implement supervisor
  • process pool server
  • implementation worker
  • test

Foreword

Will use the general server (gen_sever), FSM (gen_fsm), event processor (gen_event) and supervisor (supervisor) to build applications for actual combat

Process pool

The idea behind process pools is to manage and limit the resource usage of running systems in a general way.
With process pooling, we can limit the number of processes that run concurrently. When the number of running worker processes reaches the upper limit, the process pool can also put tasks into the queue. As long as process resources are released, the queued tasks can be run, otherwise the tasks can only be blocked, and the user can do nothing.

Why use a process pool? There are several possibilities as follows.

  1. Limit concurrent connections: The process pool can be used to limit the number of connections a server can handle at the same time. By controlling the number of worker processes in the process pool, you can ensure that system resources are not over-occupied and prevent performance degradation caused by too many concurrent connections.
  2. Limit the number of open files: The process pool can also be used to limit the number of files an application can open. By assigning each task to a pool of worker processes, you can ensure that the number of open files does not exceed a predetermined limit, protecting the system from the risk of resource exhaustion.
  3. Priority of allocated resources: By allocating different amounts of process resources to different subsystems, you can assign priorities to each subsystem in the release. For example, you may want to allocate more resources to processes that handle customer requests and fewer resources to processes that generate management reports to meet the needs and priorities of different subsystems.
  4. Stability and load balancing: By queuing tasks and executing them as soon as idle processes become available, process pools can maintain application stability under sudden high load situations. When the system resources are not enough to process all the tasks at the same time, the process pool puts the tasks into the queue, and executes the tasks one by one according to the queued order when there are idle processes to ensure the orderly execution of the tasks.

Process pool supervision tree

How should we organize these process pools? There are two views on this. One view is bottom-up design (write the individual components first, then assemble them as required). Another point of view is top-down design (assume that all the components are in place first, and then actually build these components after completing the high-level design). Both methods are equivalent, and which method to use depends on the application scenario and personal style. To make everything understandable, this example will take a top-down approach.

The process pool needs to be started as a whole, containing multiple process pools and multiple worker processes in each process pool. To achieve this requirement, each process pool needs a server to maintain the worker process count and task queue. But who will oversee the worker process?

A single supervisor can be used to supervise all process pools. This supervisor consists of pool server and worker process supervisors. The pool server is aware of the existence of worker process supervisors and can request the addition of new worker processes. In order to dynamically increase child processes, it is recommended to use simple_one_for_one supervisor.

However, if one process pool or server restarts too many times in a short period of time, it will cause other process pools to be terminated. In order to solve this problem, another layer of supervisor can be added to the supervisor layer to handle the situation where multiple process pools exist at the same time.

This design makes the process pools independent of each other, provides a monitoring and restart mechanism for worker processes, solves the problem of too many restarts, and achieves good error isolation.

Implement a supervisor

It consists of several functions: start_link/0, used to start the entire application; stop/0, used to stop the application; start_pool/3, to create a specific process pool, and stop_pool/1, to delete a process pool. init/1, which is the only callback function required by the supervisor behavior.

-module(ppool_supersup).
-behavior(supervisor).
-export([start_link/0, stop/0, start_pool/3, stop_pool/1]).
-export([init/1]).

start_link() ->
    supervisor:start_link({<!-- -->local, ppool}, ?MODULE, []).

init([]) ->
    MaxRestart = 6,
    MaxTime = 3600,
    {<!-- -->ok, {<!-- -->{<!-- -->one_for_one, MaxRestart, MaxTime}, []}}.

start_pool(Name, Limit, MFA) ->
    ChildSpec = {<!-- -->Name,
                   {<!-- -->ppool_sup, start_link, [Name, Limit, MFA]},
                   permanent, 10500, supervisor, [ppool_sup]},
    supervisor:start_child(ppool, ChildSpec).

stop_pool(Name) ->
    supervisor:terminate_child(ppool, Name),
    supervisor:delete_child(ppool, Name).

%% Technically, it is not easy to kill an Overseer
%% We resort to violence!
stop() ->
    case where is (ppool) of
        P when is_pid(P) ->
            exit(P, kill);
        _ -> ok
    end.

The process pool server dynamically adds supervisors of worker processes to ppool_sup

-module(ppool_sup).
-export([start_link/3, init/1]).
-behavior(supervisor).

start_link(Name, Limit, MFA) ->
  supervisor:start_link(?MODULE, {<!-- -->Name, Limit, MFA}).

init({<!-- -->Name, Limit, MFA}) ->
  MaxRestart = 1,
  MaxTime = 3600,
  {<!-- -->ok, {<!-- -->{<!-- -->one_for_all, MaxRestart, MaxTime},
    [{<!-- -->serv,
      {<!-- -->ppool_serv, start_link, [Name, Limit, self(), MFA]},
      permanent,
      5000, % off time
      worker,
      [ppool_serv]}]}}.

Write the last supervisor ppool_worker_sup in the application, which is responsible for managing and supervising all worker processes.

-module(ppool_worker_sup).
-export([start_link/1, init/1]).
-behavior(supervisor).

start_link(MFA = {<!-- -->_, _, _}) ->
  supervisor:start_link(?MODULE, MFA).

init({<!-- -->M, F, A}) ->
  MaxRestart = 5,
  MaxTime = 3600,
  {<!-- -->ok, {<!-- -->{<!-- -->simple_one_for_one, MaxRestart, MaxTime},
    [{<!-- -->ppool_worker,
      {<!-- -->M, F, A},
      temporary, 5000, worker, [M]}]}}.

Process pool server

The operations that the server must support are listed below

  • To run a task in the process pool, if the number of processes in the pool is full, an indication that it cannot run will be given.
  • Runs a task in the process pool as soon as there is room in the process pool; otherwise, keeps the caller process waiting and enqueues the task until it is ready to run.
  • Asynchronously run a task in the process pool, try to run it immediately; if there is no room in the pool, enqueue the task to run later.
-module(ppool_serv).
-behavior(gen_server).
-export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  code_change/3, terminate/2]).

-record(state, {<!-- -->limit = 0,
  up,
  refs,
  queue = queue:new()}).
%% Our overseer friend is dynamically activated!
-define(SPEC(MFA),
  {<!-- -->worker_sup,
    {<!-- -->ppool_worker_sup, start_link, [MFA]},
    permanent,
    10000,
    supervisor,
    [ppool_worker_sup]}).


init({<!-- -->Limit, MFA, Sup}) ->
  %% We want to get the pid of the worker process supervisor here,
  %% But alas, this requires calling the supervisor, which is waiting for our response
  self() ! {<!-- -->start_worker_supervisor, Sup, MFA},
  {<!-- -->ok, #state{<!-- -->limit = Limit, refs = gb_sets:empty()}}.

handle_info({<!-- -->'DOWN', Ref, process, _Pid, _}, S = #state{<!-- -->limit = L, sup = Sup, refs = Refs}) ->
  io:format("received down msg~n"),
  case gb_sets:is_element(Ref, Refs) of
    true ->
      handle_down_worker(Ref, S);
    false -> %% do not handle this case
      {<!-- -->noreply, S}
  end;
handle_info({<!-- -->start_worker_supervisor, Sup, MFA}, S = #state{<!-- -->}) ->
  {<!-- -->ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)),
  {<!-- -->noreply, S#state{<!-- -->sup = Pid}};
handle_info(Msg, State) ->
  io:format("Unknown msg: ~p~n", [Msg]),
  {<!-- -->noreply, State}.

handle_call({<!-- -->run, Args}, _From, S = #state{<!-- -->limit = N, sup = Sup, refs = R}) when N > 0 ->
  {<!-- -->ok, Pid} = supervisor:start_child(Sup, Args),
  Ref = erlang:monitor(process, Pid),
  {<!-- -->reply, {<!-- -->ok, Pid}, S#state{<!-- -->limit = N - 1, refs = gb_sets:add(Ref, R)}};
handle_call({<!-- -->run, _Args}, _From, S = #state{<!-- -->limit = N}) when N =< 0 ->
  {<!-- -->reply, noalloc, S};
handle_call({<!-- -->sync, Args}, _From, S = #state{<!-- -->limit = N, sup = Sup, refs = R}) when N > 0 ->
  {<!-- -->ok, Pid} = supervisor:start_child(Sup, Args),
  Ref = erlang:monitor(process, Pid),
  {<!-- -->reply, {<!-- -->ok, Pid}, S#state{<!-- -->limit = N - 1, refs = gb_sets:add(Ref, R)}};
handle_call({<!-- -->sync, Args}, From, S = #state{<!-- -->queue = Q}) ->
  {<!-- -->noreply, S#state{<!-- -->queue = queue:in({<!-- -->From, Args}, Q)}};
handle_call(stop, _From, State) ->
  {<!-- -->stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
  {<!-- -->noreply, State}.
handle_cast({<!-- -->async, Args}, S = #state{<!-- -->limit = N, sup = Sup, refs = R}) when N > 0 ->
  {<!-- -->ok, Pid} = supervisor:start_child(Sup, Args),
  Ref = erlang:monitor(process, Pid),
  {<!-- -->noreply, S#state{<!-- -->limit = N - 1, refs = gb_sets:add(Ref, R)}};
handle_cast({<!-- -->async, Args}, S = #state{<!-- -->limit = N, queue = Q}) when N =< 0 ->
  {<!-- -->noreply, S#state{<!-- -->queue = queue:in(Args, Q)}};
%% The following needs no explanation!
handle_cast(_Msg, State) ->
  {<!-- -->noreply, State}.

handle_down_worker(Ref, S = #state{<!-- -->limit = L, sup = Sup, refs = Refs}) ->
  case queue:out(S#state.queue) of
    {<!-- -->{<!-- -->value, {<!-- -->From, Args}}, Q} ->
      {<!-- -->ok, Pid} = supervisor:start_child(Sup, Args),
      NewRef = erlang:monitor(process, Pid),
      NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
      gen_server:reply(From, {<!-- -->ok, Pid}),
      {<!-- -->noreply, S#state{<!-- -->refs = NewRefs, queue = Q}};
    {<!-- -->{<!-- -->value, Args}, Q} ->
      {<!-- -->ok, Pid} = supervisor:start_child(Sup, Args),
      NewRef = erlang:monitor(process, Pid),
      NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
      {<!-- -->noreply, S#state{<!-- -->refs = NewRefs, queue = Q}};
    {<!-- -->empty, _} ->
      {<!-- -->noreply, S#state{<!-- -->limit = L + 1, refs = gb_sets:delete(Ref, Refs)}}
  end.

code_change(_OldVsn, State, _Extra) ->
  {<!-- -->ok, State}.

terminate(_Reason, _State) ->
  ok.

start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
  gen_server:start({<!-- -->local, Name}, ?MODULE, {<!-- -->Limit, MFA, Sup}, []).

start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
  gen_server:start_link({<!-- -->local, Name}, ?MODULE, {<!-- -->Limit, MFA, Sup}, []).

run(Name, Args) ->
  gen_server:call(Name, {<!-- -->run, Args}).

sync_queue(Name, Args) ->
  gen_server:call(Name, {<!-- -->sync, Args}, infinity).

async_queue(Name, Args) ->
  gen_server:cast(Name, {<!-- -->async, Args}).

stop(Name) ->
  gen_server: call(Name, stop).
%%% process pool API module
-module(ppool).
-export([start_link/0, stop/0, start_pool/3,
  run/2, sync_queue/2, async_queue/2, stop_pool/1]).

start_link() ->
  ppool_supersup:start_link().

stop() ->
  ppool_supersup:stop().

start_pool(Name, Limit, {<!-- -->M, F, A}) ->
  ppool_supersup:start_pool(Name, Limit, {<!-- -->M, F, A}).

stop_pool(Name) ->
  ppool_supersup:stop_pool(Name).

run(Name, Args) ->
  ppool_serv:run(Name, Args).

async_queue(Name, Args) ->
  ppool_serv:async_queue(Name, Args).

sync_queue(Name, Args) ->
  ppool_serv:sync_queue(Name, Args).

Implement Workers

Create a worker per task

-module(ppool_nagger).
-behavior(gen_server).
-export([start_link/4, stop/1]).
-export([init/1, handle_call/3, handle_cast/2,
  handle_info/2, code_change/3, terminate/2]).

start_link(Task, Delay, Max, SendTo) ->
  gen_server:start_link(?MODULE, {<!-- -->Task, Delay, Max, SendTo}, []).

stop(Pid) ->
  gen_server: call(Pid, stop).
init({<!-- -->Task, Delay, Max, SendTo}) ->
  {<!-- -->ok, {<!-- -->Task, Delay, Max, SendTo}, Delay}.
%%% OTP callback function
handle_call(stop, _From, State) ->
  {<!-- -->stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
  {<!-- -->noreply, State}.

handle_cast(_Msg, State) ->
  {<!-- -->noreply, State}.

handle_info(timeout, {<!-- -->Task, Delay, Max, SendTo}) ->
  SendTo ! {<!-- -->self(), Task},
  if Max =:= infinity ->
    {<!-- -->noreply, {<!-- -->Task, Delay, Max, SendTo}, Delay};
    Max =< 1 ->
      {<!-- -->stop, normal, {<!-- -->Task, Delay, 0, SendTo}};
    Max > 1 ->
      {<!-- -->noreply, {<!-- -->Task, Delay, Max - 1, SendTo}, Delay}
  end.
%% Do not use the following handle_info clause:
%% If this clause is executed, the timer will be canceled and the process will basically become a zombie
%% At this point, crashing is the best option.
%% handle_info(_Msg, State) ->
%%{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
  {<!-- -->ok, State}.

terminate(_Reason, _State) -> ok.

Test

10> ppool:start_link().
{<!-- -->ok,<0.93.0>}
11> ppool:start_pool(nagger, 2, {<!-- -->ppool_nagger, start_link, []}).
{<!-- -->ok,<0.95.0>}
12> ppool:run(nagger, ["finish the chapter!", 10000, 10, self()]).
{<!-- -->ok,<0.99.0>}
13> ppool:run(nagger, ["Watch a good movie", 10000, 10, self()]).
{<!-- -->ok,<0.101.0>}
14> flush().
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
ok
15> ppool:run(nagger, ["clean up a bit", 10000, 10, self()]).
noalloc
16> flush().
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
ok

For task runs that are not enqueued synchronously, everything works fine. The process pool is started, tasks are added, and messages are sent to the correct destinations. When we tried to run more tasks than allowed, we got rejected.

Now let’s try the function of the task queue (asynchronous call) 0

17> ppool:async_queue(nagger, ["Pay the bills", 30000, 1, self()]).
ok
18> ppool:async_queue(nagger, ["Take a shower", 30000, 1, self()]).
ok
19> ppool:async_queue(nagger, ["Plant a tree", 30000, 1, self()]).
ok
`Wait for a while`
20> received down msg
20> received down msg
20> received down msg
20> flush().
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.99.0>,"finish the chapter!"}
...
Shell got {<!-- --><0.101.0>,"Watch a good movie"}
Shell got {<!-- --><0.106.0>,"Pay the bills"}

Test the behavior of the synchronization queue

22> ppool:sync_queue(nagger, ["Pet a dog", 20000, 1, self()]).
{<!-- -->ok,<0.114.0>}
23> ppool:sync_queue(nagger, ["Make some noise", 20000, 1, self()]).
{<!-- -->ok,<0.116.0>}
24> received down msg
24> ppool:sync_queue(nagger, ["Chase a tornado", 20000, 1, self()]).
{<!-- -->ok,<0.118.0>}
25> received down msg
25> received down msg
25> received down msg
25> flush().
Shell got {<!-- --><0.114.0>,"Pet a dog"}
Shell got {<!-- --><0.116.0>,"Make some noise"}
Shell got {<!-- --><0.118.0>,"Chase a tornado"}
ok

normal

close now

27> ppool:stop_pool(nagger).
ok
28> ppool: stop().
** exception exit: killed