Implementation principle of golang smooth restart library overseer

The overseer mainly completes three functions:

1. Lossless closing of the connection, 2. Smooth restart of the connection, 3. Automatic restart of file changes.

Let’s talk about it in turn:

1. Lossless closing of the connection

Golang’s official net package does not support lossless closing of connections. When the main listening coroutine exits, it will not wait for the completion of processing of each actual work coroutine.

The following is the official golang code:

Go/src/net/http/server.go

func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l) // call hook with unwrapped listener
}

origListener := l
l = & amp;onceCloseListener{Listener: l}
defer l.Close()

if err := srv.setupHTTP2_Serve(); err != nil {
return err
}

if !srv.trackListener( & amp;l, true) {
return ErrServerClosed
}
defer srv.trackListener( & amp;l, false)

baseCtx := context.Background()
if srv.BaseContext != nil {
baseCtx = srv.BaseContext(origListener)
if baseCtx == nil {
panic("BaseContext returned a nil context")
}
}

var tempDelay time.Duration // how long to sleep on accept failure

ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
if srv.shuttingDown() {
return ErrServerClosed
}
if ne, ok := err.(net.Error); ok & amp; & amp; ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {
connCtx = cc(connCtx, rw)
if connCtx == nil {
panic("ConnContext returned nil")
}
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}

When the listening socket is closed and l.Accept() exits the loop, it will not wait for the go c.serve(connCtx) coroutine to complete processing.

The overseer’s processing method is to wrap golang’s listening socket and connection socket, and provide support for the main coroutine to asynchronously wait for the work coroutine to complete processing through sync.WaitGroup.

The overseer code is as follows:

overseer-v1.1.6\graceful.go

func (l *overseerListener) Accept() (net.Conn, error) {
conn, err := l.Listener.(*net.TCPListener).AcceptTCP()
if err != nil {
return nil, err
}
conn.SetKeepAlive(true) // see http.tcpKeepAliveListener
conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
uconn := overseerConn{
Conn: conn,
wg: &l.wg,
closed: make(chan bool),
}
go func() {
//connection watcher
select {
case <-l.closeByForce:
uconn.Close()
case <-uconn.closed:
//closed manually
}
}()
l.wg.Add(1)
return uconn, nil
}

//non-blocking trigger close
func (l *overseerListener) release(timeout time.Duration) {
//stop accepting connections - release fd
l.closeError = l.Listener.Close()
//start timer, close by force if deadline not met
waited := make(chan bool)
go func() {
l.wg.Wait()
waited <- true
}()
go func() {
select {
case <-time.After(timeout):
close(l.closeByForce)
case <-waited:
//no need to force close
}
}()
}

//blocking wait for close
func (l *overseerListener) Close() error {
l.wg.Wait()
return l.closeError
}

func (o overseerConn) Close() error {
err := o.Conn.Close()
if err == nil {
o.wg.Done()
o.closed <- true
}
return err
}

In the (l *overseerListener) Accept function, l.wg.Add(1) is executed every time a work connection is generated. In the (o overseerConn) Close function, o.wg.Done() is executed every time a work connection is closed.

In the asynchronous shutdown mode (l *overseerListener) release function and in the synchronous shutdown mode (l *overseerListener) Close function, l.wg.Wait() will be called to wait for the completion of the work coroutine’s processing.

Listening socket closing process:

1. The work process receives the restart signal, or the master process receives the restart signal and forwards it to the work process.

2. The signal processing of the work process includes the call to (l *overseerListener) release.

3. Close the listening socket in (l *overseerListener) release and asynchronously l.wg.Wait().

4. In the (srv *Server) Serve of the official package net/http/server.go, l.Accept() returns an error, exits the listening loop, and then executes defer l.Close(), that is, (l *overseerListener) Close.

5. Execute l.wg.Wait() synchronously in (l *overseerListener) Close and wait for the work connection processing to be completed.

6. When the work connection processing is completed, (o overseerConn) Close() will be called, and then o.wg.Done() will be called.

7. After all work connection processing is completed, the SIGUSR1 signal is sent to the master process.

8. After the master process receives the SIGUSR1 signal, it writes true to the mp.descriptorsReleased pipe.

9. In the (mp *master) fork of the master process, after receiving mp.descriptorsReleased, end this fork and enter the next fork.

2. Smooth restart of connection

The so-called smooth restart means that the restart will not cause the client to be disconnected and will not be aware of the client. For example, the original queued connection will not be discarded, so the listening socket is passed between the new and old work processes through the master process, rather than newly started. The work process re-creates the listening connection.

The listening socket is created by the master process:

overseer-v1.1.6/proc_master.go

func (mp *master) retreiveFileDescriptors() error {
mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses))
for i, addr := range mp.Config.Addresses {
a, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return fmt.Errorf("Invalid address %s (%s)", addr, err)
}
l, err := net.ListenTCP("tcp", a)
if err != nil {
return err
}
f, err := l.File()
if err != nil {
return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err)
}
if err := l.Close(); err != nil {
return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err)
}
mp.slaveExtraFiles[i] = f
}
return nil
}

Get the address from mp.Config.Addresses, establish a listening connection, and finally store the file handle in mp.slaveExtraFiles.

(l *TCPListener) Close is called during this process, but it actually has no impact on the work process. What is affected is that the master process itself cannot read and write the listening socket.

Here is a quote on the difference between network socket close and shutdown:

close —- Closes the socket id of this process, but the connection is still open. Other processes using this socket id can still use this connection and can read or write this socket id.
shutdown —- destroys the socket connection. The EOF terminator may be detected when reading, and a SIGPIPE signal may be received when writing. This signal may not be received until the socket buffer is filled. Shutdown also has a shutdown For the parameters of the mode, 0 cannot be read, 1 cannot be written, and 2 cannot be read or written.

Pass mp.slaveExtraFiles to the child process, the work process:

overseer-v1.1.6/proc_master.go

func (mp *master) fork() error {
mp.debugf("starting %s", mp.binPath)
cmd := exec.Command(mp.binPath)
//mark this new process as the "active" slave process.
//This process is assumed to be holding the socket files.
mp.slaveCmd = cmd
mp.slaveID++
//provide the slave process with some state
e := os.Environ()
e = append(e, envBinID + "=" + hex.EncodeToString(mp.binHash))
e = append(e, envBinPath + "=" + mp.binPath)
e = append(e, envSlaveID + "=" + strconv.Itoa(mp.slaveID))
e = append(e, envIsSlave + "=1")
e = append(e, envNumFDs + "=" + strconv.Itoa(len(mp.slaveExtraFiles)))
cmd.Env = e
//inherit master args/stdfiles
cmd.Args = os.Args
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
//include socket files
cmd.ExtraFiles = mp.slaveExtraFiles
if err := cmd.Start(); err != nil {
return fmt.Errorf("Failed to start slave process: %s", err)
}
//was scheduled to restart, notify success
if mp.restarting {
mp.restartedAt = time.Now()
mp.restarting = false
mp.restarted <- true
}
//convert wait into channel
cmdwait := make(chan error)
go func() {
cmdwait <- cmd.Wait()
}()
//wait....
select {
case err := <-cmdwait:
//program exited before releasing descriptors
//proxy exit code out to master
code := 0
if err != nil {
code=1
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
code = status.ExitStatus()
}
}
}
mp.debugf("prog exited with %d", code)
//if a restarts are disabled or if it was an
//unexpected crash, proxy this exit straight
//through to the main process
if mp.NoRestart || !mp.restarting {
os.Exit(code)
}
case <-mp.descriptorsReleased:
//if descriptors are released, the program
//has yielded control of its sockets and
//a parallel instance of the program can be
//started safely. it should serve state.Listeners
//to ensure downtime is kept at <1sec. The previous
//cmd.Wait() will still be consumed though the
//result will be discarded.
}
return nil
}

Pass the socket to the child process through the cmd.ExtraFiles = mp.slaveExtraFiles statement. This parameter is eventually passed to the fork system call, and the passed fd will be inherited by the child process.

The child process, the work process, handles the inherited socket:

overseer-v1.1.6/proc_slave.go

func (sp *slave) run() error {
sp.id = os.Getenv(envSlaveID)
sp.debugf("run")
sp.state.Enabled = true
sp.state.ID = os.Getenv(envBinID)
sp.state.StartedAt = time.Now()
sp.state.Address = sp.Config.Address
sp.state.Addresses = sp.Config.Addresses
sp.state.GracefulShutdown = make(chan bool, 1)
sp.state.BinPath = os.Getenv(envBinPath)
if err := sp.watchParent(); err != nil {
return err
}
if err := sp.initFileDescriptors(); err != nil {
return err
}
sp.watchSignal()
//run program with state
sp.debugf("start program")
sp.Config.Program(sp.state)
return nil
}

func (sp *slave) initFileDescriptors() error {
//inspect file descriptors
numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
if err != nil {
return fmt.Errorf("invalid %s integer", envNumFDs)
}
sp.listeners = make([]*overseerListener, numFDs)
sp.state.Listeners = make([]net.Listener, numFDs)
for i := 0; i < numFDs; i + + {
f := os.NewFile(uintptr(3 + i), "")
l, err := net.FileListener(f)
if err != nil {
return fmt.Errorf("failed to inherit file descriptor: %d", i)
}
u := newOverseerListener(l)
sp.listeners[i] = u
sp.state.Listeners[i] = u
}
if len(sp.state.Listeners) > 0 {
sp.state.Listener = sp.state.Listeners[0]
}
return nil
}

The child process only repackages the sockets, and does not create new listening connections. It is packaged into u := newOverseerListener(l) type. These listening sockets are finally passed to sp.Config.Program(sp.state), which is the user’s startup program. :

overseer-v1.1.6/example/main.go

// convert your 'main()' into a 'prog(state)'
// 'prog()' is run in a child process
func prog(state overseer.State) {
fmt.Printf("app#%s (%s) listening...\\
", BuildID, state.ID)
http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d, _ := time.ParseDuration(r.URL.Query().Get("d"))
time.Sleep(d)
fmt.Fprintf(w, "app#%s (%s) %v says hello\\
", BuildID, state.ID, state.StartedAt)
}))
http.Serve(state.Listener, nil)
fmt.Printf("app#%s (%s) exiting...\\
", BuildID, state.ID)
}

// then create another 'main' which runs the upgrades
// 'main()' is run in the initial process
func main() {
overseer.Run(overseer.Config{
Program: prog,
Address: ":5001",
Fetcher: & amp;fetcher.File{Path: "my_app_next"},
Debug: true, //display log of overseer actions
TerminateTimeout: 10 * time.Minute,
})
}

Call http.Serve(state.Listener, nil) in the user program:

1. The accept method used is packaged (l *overseerListener) Accept().

2. defer l.Close() also uses the packaged (l *overseerListener) Close().

3. The work connections created by (l *overseerListener) Accept() are also packaged into overseerConn connections, and (o overseerConn) Close() will be called when closing.

3. Automatic restart of file changes

It can automatically monitor file changes and automatically trigger the restart process when there are changes.

Check the configuration when the master process starts. If mp.Config.Fetcher is set, enter fetchLoop:

overseer-v1.1.6/proc_master.go

// fetchLoop is run in a goroutine
func (mp *master) fetchLoop() {
min := mp.Config.MinFetchInterval
time.Sleep(min)
for {
t0 := time.Now()
mp.fetch()
//duration fetch of fetch
diff := time.Now().Sub(t0)
if diff < min {
delay := min-diff
//ensures at least MinFetchInterval delay.
//should be throttled by the fetcher!
time.Sleep(delay)
}
}
}

The default value of mp.Config.MinFetchInterval is 1 second, which means changes are checked every second. time.Duration type, you can set smaller granularity.

Already supported fetchers include: fetcher_file.go, fetcher_github.go, fetcher_http.go, fetcher_s3.go.

Take fetcher_file.go as an example.

1. Judgment of file changes:

overseer-v1.1.6/proc_master.go

 //tee off to sha1
hash := sha1.New()
reader = io.TeeReader(reader, hash)
//write to a temp file
_, err = io.Copy(tmpBin, reader)
if err != nil {
mp.warnf("failed to write temp binary: %s", err)
return
}
//compare hash
newHash := hash.Sum(nil)
if bytes.Equal(mp.binHash, newHash) {
mp.debugf("hash match - skip")
return
}

Implemented through the sha1 algorithm, comparing old and new hash values without paying attention to file timestamps.

2. The verification is an executable file and supports overseaser:

overseer-v1.1.6/proc_master.go

 tokenIn := token()
cmd := exec.Command(tmpBinPath)
cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...)
cmd.Args = os.Args
returned := false
go func() {
time.Sleep(5 * time.Second)
if !returned {
mp.warnf("sanity check against fetched executable timed-out, check overseer is running")
if cmd.Process != nil {
cmd.Process.Kill()
}
}
}()
tokenOut, err := cmd.CombinedOutput()
returned = true
if err != nil {
mp.warnf("failed to run temp binary: %s (%s) output "%s"", err, tmpBinPath, tokenOut)
return
}
if tokenIn != string(tokenOut) {
mp.warnf("sanity check failed")
return
}

This is achieved through the pre-embedded code of Overeser:

overseer-v1.1.6/overseer.go

//sanityCheck returns true if a check was performed
func sanityCheck() bool {
//sanity check
if token := os.Getenv(envBinCheck); token != "" {
fmt.Fprint(os.Stdout, token)
return true
}
//legacy sanity check using old env var
if token := os.Getenv(envBinCheckLegacy); token != "" {
fmt.Fprint(os.Stdout, token)
return true
}
return false
}

This code will be called in overseer.Run when main starts, passing fixed environment variables, and then the command line output will be displayed as it is, indicating success.

3. Overwrite old files and trigger a restart.

overseer-v1.1.6/proc_master.go

 //overwrite!
if err := overwrite(mp.binPath, tmpBinPath); err != nil {
mp.warnf("failed to overwrite binary: %s", err)
return
}
mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12])
mp.binHash = newHash
//binary successfully replaced
if !mp.Config.NoRestartAfterFetch {
mp.triggerRestart()
}

Enter the restart process from (mp *master) triggerRestart:

overseer-v1.1.6/proc_master.go

func (mp *master) triggerRestart() {
if mp.restarting {
mp.debugf("already graceful restarting")
return //skip
} else if mp.slaveCmd == nil || mp.restarting {
mp.debugf("no slave process")
return //skip
}
mp.debugf("graceful restart triggered")
mp.restarting = true
mp.awaitingUSR1 = true
mp.signalledAt = time.Now()
mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate
select {
case <-mp.restarted:
//success
mp.debugf("restart success")
case <-time.After(mp.TerminateTimeout):
//times up mr. process, we did ask nicely!
mp.debugf("graceful timeout, forcing exit")
mp.sendSignal(os.Kill)
}
}

Send the mp.Config.RestartSignal signal to the child process. After the child process receives the signal, it closes the listening socket and sends the SIGUSR1 signal to the parent process:

overseer-v1.1.6/proc_slave.go

 if len(sp.listeners) > 0 {
//perform graceful shutdown
for _, l := range sp.listeners {
l.release(sp.Config.TerminateTimeout)
}
//signal release of held sockets, allows master to start
//a new process before this child has actually exited.
//early restarts not supported with restarts disabled.
if !sp.NoRestart {
sp.masterProc.Signal(SIGUSR1)
}
//listeners should be waiting on connections to close...
}

After the parent process receives the SIGUSR1 signal, it notifies mp.descriptorsReleased that the pipe listening socket has been closed:

overseer-v1.1.6/proc_master.go

 //**during a restart** a SIGUSR1 signals
//to the master process that, the file
//descriptors have been released
if mp.awaitingUSR1 & amp; & amp; s == SIGUSR1 {
mp.debugf("signaled, sockets ready")
mp.awaitingUSR1 = false
mp.descriptorsReleased <- true
} else

Finally, we return to the (mp *master) fork function. The fork function has been waiting for the mp.descriptorsReleased notification or the cmd.Wait sub-process to exit. After receiving the pipeline notification, the fork exits and enters the next round of fork loop.

overseer-v1.1.6/proc_master.go

func (mp *master) fork() error {
    //... ...
    //... ...
    //... ...
//convert wait into channel
cmdwait := make(chan error)
go func() {
cmdwait <- cmd.Wait()
}()
//wait....
select {
case err := <-cmdwait:
//program exited before releasing descriptors
//proxy exit code out to master
code := 0
if err != nil {
code=1
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
code = status.ExitStatus()
}
}
}
mp.debugf("prog exited with %d", code)
//if a restarts are disabled or if it was an
//unexpected crash, proxy this exit straight
//through to the main process
if mp.NoRestart || !mp.restarting {
os.Exit(code)
}
case <-mp.descriptorsReleased:
//if descriptors are released, the program
//has yielded control of its sockets and
//a parallel instance of the program can be
//started safely. it should serve state.Listeners
//to ensure downtime is kept at <1sec. The previous
//cmd.Wait() will still be consumed though the
//result will be discarded.
}
return nil
}

–end–