The overseer mainly completes three functions:
1. Lossless closing of the connection, 2. Smooth restart of the connection, 3. Automatic restart of file changes.
Let’s talk about it in turn:
1. Lossless closing of the connection
Golang’s official net package does not support lossless closing of connections. When the main listening coroutine exits, it will not wait for the completion of processing of each actual work coroutine.
The following is the official golang code:
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error { if fn := testHookServerServe; fn != nil { fn(srv, l) // call hook with unwrapped listener } origListener := l l = & amp;onceCloseListener{Listener: l} defer l.Close() if err := srv.setupHTTP2_Serve(); err != nil { return err } if !srv.trackListener( & amp;l, true) { return ErrServerClosed } defer srv.trackListener( & amp;l, false) baseCtx := context.Background() if srv.BaseContext != nil { baseCtx = srv.BaseContext(origListener) if baseCtx == nil { panic("BaseContext returned a nil context") } } var tempDelay time.Duration // how long to sleep on accept failure ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, err := l.Accept() if err != nil { if srv.shuttingDown() { return ErrServerClosed } if ne, ok := err.(net.Error); ok & amp; & amp; ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay) time.Sleep(tempDelay) continue } return err } connCtx := ctx if cc := srv.ConnContext; cc != nil { connCtx = cc(connCtx, rw) if connCtx == nil { panic("ConnContext returned nil") } } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) // before Serve can return go c.serve(connCtx) } }
When the listening socket is closed and l.Accept() exits the loop, it will not wait for the go c.serve(connCtx) coroutine to complete processing.
The overseer’s processing method is to wrap golang’s listening socket and connection socket, and provide support for the main coroutine to asynchronously wait for the work coroutine to complete processing through sync.WaitGroup.
The overseer code is as follows:
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) { conn, err := l.Listener.(*net.TCPListener).AcceptTCP() if err != nil { return nil, err } conn.SetKeepAlive(true) // see http.tcpKeepAliveListener conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener uconn := overseerConn{ Conn: conn, wg: &l.wg, closed: make(chan bool), } go func() { //connection watcher select { case <-l.closeByForce: uconn.Close() case <-uconn.closed: //closed manually } }() l.wg.Add(1) return uconn, nil } //non-blocking trigger close func (l *overseerListener) release(timeout time.Duration) { //stop accepting connections - release fd l.closeError = l.Listener.Close() //start timer, close by force if deadline not met waited := make(chan bool) go func() { l.wg.Wait() waited <- true }() go func() { select { case <-time.After(timeout): close(l.closeByForce) case <-waited: //no need to force close } }() } //blocking wait for close func (l *overseerListener) Close() error { l.wg.Wait() return l.closeError } func (o overseerConn) Close() error { err := o.Conn.Close() if err == nil { o.wg.Done() o.closed <- true } return err }
In the (l *overseerListener) Accept function, l.wg.Add(1) is executed every time a work connection is generated. In the (o overseerConn) Close function, o.wg.Done() is executed every time a work connection is closed.
In the asynchronous shutdown mode (l *overseerListener) release function and in the synchronous shutdown mode (l *overseerListener) Close function, l.wg.Wait() will be called to wait for the completion of the work coroutine’s processing.
Listening socket closing process:
1. The work process receives the restart signal, or the master process receives the restart signal and forwards it to the work process.
2. The signal processing of the work process includes the call to (l *overseerListener) release.
3. Close the listening socket in (l *overseerListener) release and asynchronously l.wg.Wait().
4. In the (srv *Server) Serve of the official package net/http/server.go, l.Accept() returns an error, exits the listening loop, and then executes defer l.Close(), that is, (l *overseerListener) Close.
5. Execute l.wg.Wait() synchronously in (l *overseerListener) Close and wait for the work connection processing to be completed.
6. When the work connection processing is completed, (o overseerConn) Close() will be called, and then o.wg.Done() will be called.
7. After all work connection processing is completed, the SIGUSR1 signal is sent to the master process.
8. After the master process receives the SIGUSR1 signal, it writes true to the mp.descriptorsReleased pipe.
9. In the (mp *master) fork of the master process, after receiving mp.descriptorsReleased, end this fork and enter the next fork.
2. Smooth restart of connection
The so-called smooth restart means that the restart will not cause the client to be disconnected and will not be aware of the client. For example, the original queued connection will not be discarded, so the listening socket is passed between the new and old work processes through the master process, rather than newly started. The work process re-creates the listening connection.
The listening socket is created by the master process:
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error { mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) for i, addr := range mp.Config.Addresses { a, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return fmt.Errorf("Invalid address %s (%s)", addr, err) } l, err := net.ListenTCP("tcp", a) if err != nil { return err } f, err := l.File() if err != nil { return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err) } if err := l.Close(); err != nil { return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err) } mp.slaveExtraFiles[i] = f } return nil }
Get the address from mp.Config.Addresses, establish a listening connection, and finally store the file handle in mp.slaveExtraFiles.
(l *TCPListener) Close is called during this process, but it actually has no impact on the work process. What is affected is that the master process itself cannot read and write the listening socket.
Here is a quote on the difference between network socket close and shutdown:
close —- Closes the socket id of this process, but the connection is still open. Other processes using this socket id can still use this connection and can read or write this socket id.
shutdown —- destroys the socket connection. The EOF terminator may be detected when reading, and a SIGPIPE signal may be received when writing. This signal may not be received until the socket buffer is filled. Shutdown also has a shutdown For the parameters of the mode, 0 cannot be read, 1 cannot be written, and 2 cannot be read or written.
Pass mp.slaveExtraFiles to the child process, the work process:
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { mp.debugf("starting %s", mp.binPath) cmd := exec.Command(mp.binPath) //mark this new process as the "active" slave process. //This process is assumed to be holding the socket files. mp.slaveCmd = cmd mp.slaveID++ //provide the slave process with some state e := os.Environ() e = append(e, envBinID + "=" + hex.EncodeToString(mp.binHash)) e = append(e, envBinPath + "=" + mp.binPath) e = append(e, envSlaveID + "=" + strconv.Itoa(mp.slaveID)) e = append(e, envIsSlave + "=1") e = append(e, envNumFDs + "=" + strconv.Itoa(len(mp.slaveExtraFiles))) cmd.Env = e //inherit master args/stdfiles cmd.Args = os.Args cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr //include socket files cmd.ExtraFiles = mp.slaveExtraFiles if err := cmd.Start(); err != nil { return fmt.Errorf("Failed to start slave process: %s", err) } //was scheduled to restart, notify success if mp.restarting { mp.restartedAt = time.Now() mp.restarting = false mp.restarted <- true } //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code=1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
Pass the socket to the child process through the cmd.ExtraFiles = mp.slaveExtraFiles statement. This parameter is eventually passed to the fork system call, and the passed fd will be inherited by the child process.
The child process, the work process, handles the inherited socket:
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error { sp.id = os.Getenv(envSlaveID) sp.debugf("run") sp.state.Enabled = true sp.state.ID = os.Getenv(envBinID) sp.state.StartedAt = time.Now() sp.state.Address = sp.Config.Address sp.state.Addresses = sp.Config.Addresses sp.state.GracefulShutdown = make(chan bool, 1) sp.state.BinPath = os.Getenv(envBinPath) if err := sp.watchParent(); err != nil { return err } if err := sp.initFileDescriptors(); err != nil { return err } sp.watchSignal() //run program with state sp.debugf("start program") sp.Config.Program(sp.state) return nil } func (sp *slave) initFileDescriptors() error { //inspect file descriptors numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) if err != nil { return fmt.Errorf("invalid %s integer", envNumFDs) } sp.listeners = make([]*overseerListener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs) for i := 0; i < numFDs; i + + { f := os.NewFile(uintptr(3 + i), "") l, err := net.FileListener(f) if err != nil { return fmt.Errorf("failed to inherit file descriptor: %d", i) } u := newOverseerListener(l) sp.listeners[i] = u sp.state.Listeners[i] = u } if len(sp.state.Listeners) > 0 { sp.state.Listener = sp.state.Listeners[0] } return nil }
The child process only repackages the sockets, and does not create new listening connections. It is packaged into u := newOverseerListener(l) type. These listening sockets are finally passed to sp.Config.Program(sp.state), which is the user’s startup program. :
overseer-v1.1.6/example/main.go
// convert your 'main()' into a 'prog(state)' // 'prog()' is run in a child process func prog(state overseer.State) { fmt.Printf("app#%s (%s) listening...\\ ", BuildID, state.ID) http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { d, _ := time.ParseDuration(r.URL.Query().Get("d")) time.Sleep(d) fmt.Fprintf(w, "app#%s (%s) %v says hello\\ ", BuildID, state.ID, state.StartedAt) })) http.Serve(state.Listener, nil) fmt.Printf("app#%s (%s) exiting...\\ ", BuildID, state.ID) } // then create another 'main' which runs the upgrades // 'main()' is run in the initial process func main() { overseer.Run(overseer.Config{ Program: prog, Address: ":5001", Fetcher: & amp;fetcher.File{Path: "my_app_next"}, Debug: true, //display log of overseer actions TerminateTimeout: 10 * time.Minute, }) }
Call http.Serve(state.Listener, nil) in the user program:
1. The accept method used is packaged (l *overseerListener) Accept().
2. defer l.Close() also uses the packaged (l *overseerListener) Close().
3. The work connections created by (l *overseerListener) Accept() are also packaged into overseerConn connections, and (o overseerConn) Close() will be called when closing.
3. Automatic restart of file changes
It can automatically monitor file changes and automatically trigger the restart process when there are changes.
Check the configuration when the master process starts. If mp.Config.Fetcher is set, enter fetchLoop:
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine func (mp *master) fetchLoop() { min := mp.Config.MinFetchInterval time.Sleep(min) for { t0 := time.Now() mp.fetch() //duration fetch of fetch diff := time.Now().Sub(t0) if diff < min { delay := min-diff //ensures at least MinFetchInterval delay. //should be throttled by the fetcher! time.Sleep(delay) } } }
The default value of mp.Config.MinFetchInterval is 1 second, which means changes are checked every second. time.Duration type, you can set smaller granularity.
Already supported fetchers include: fetcher_file.go, fetcher_github.go, fetcher_http.go, fetcher_s3.go.
Take fetcher_file.go as an example.
1. Judgment of file changes:
overseer-v1.1.6/proc_master.go
//tee off to sha1 hash := sha1.New() reader = io.TeeReader(reader, hash) //write to a temp file _, err = io.Copy(tmpBin, reader) if err != nil { mp.warnf("failed to write temp binary: %s", err) return } //compare hash newHash := hash.Sum(nil) if bytes.Equal(mp.binHash, newHash) { mp.debugf("hash match - skip") return }
Implemented through the sha1 algorithm, comparing old and new hash values without paying attention to file timestamps.
2. The verification is an executable file and supports overseaser:
overseer-v1.1.6/proc_master.go
tokenIn := token() cmd := exec.Command(tmpBinPath) cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...) cmd.Args = os.Args returned := false go func() { time.Sleep(5 * time.Second) if !returned { mp.warnf("sanity check against fetched executable timed-out, check overseer is running") if cmd.Process != nil { cmd.Process.Kill() } } }() tokenOut, err := cmd.CombinedOutput() returned = true if err != nil { mp.warnf("failed to run temp binary: %s (%s) output "%s"", err, tmpBinPath, tokenOut) return } if tokenIn != string(tokenOut) { mp.warnf("sanity check failed") return }
This is achieved through the pre-embedded code of Overeser:
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed func sanityCheck() bool { //sanity check if token := os.Getenv(envBinCheck); token != "" { fmt.Fprint(os.Stdout, token) return true } //legacy sanity check using old env var if token := os.Getenv(envBinCheckLegacy); token != "" { fmt.Fprint(os.Stdout, token) return true } return false }
This code will be called in overseer.Run when main starts, passing fixed environment variables, and then the command line output will be displayed as it is, indicating success.
3. Overwrite old files and trigger a restart.
overseer-v1.1.6/proc_master.go
//overwrite! if err := overwrite(mp.binPath, tmpBinPath); err != nil { mp.warnf("failed to overwrite binary: %s", err) return } mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.binHash = newHash //binary successfully replaced if !mp.Config.NoRestartAfterFetch { mp.triggerRestart() }
Enter the restart process from (mp *master) triggerRestart:
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() { if mp.restarting { mp.debugf("already graceful restarting") return //skip } else if mp.slaveCmd == nil || mp.restarting { mp.debugf("no slave process") return //skip } mp.debugf("graceful restart triggered") mp.restarting = true mp.awaitingUSR1 = true mp.signalledAt = time.Now() mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate select { case <-mp.restarted: //success mp.debugf("restart success") case <-time.After(mp.TerminateTimeout): //times up mr. process, we did ask nicely! mp.debugf("graceful timeout, forcing exit") mp.sendSignal(os.Kill) } }
Send the mp.Config.RestartSignal signal to the child process. After the child process receives the signal, it closes the listening socket and sends the SIGUSR1 signal to the parent process:
overseer-v1.1.6/proc_slave.go
if len(sp.listeners) > 0 { //perform graceful shutdown for _, l := range sp.listeners { l.release(sp.Config.TerminateTimeout) } //signal release of held sockets, allows master to start //a new process before this child has actually exited. //early restarts not supported with restarts disabled. if !sp.NoRestart { sp.masterProc.Signal(SIGUSR1) } //listeners should be waiting on connections to close... }
After the parent process receives the SIGUSR1 signal, it notifies mp.descriptorsReleased that the pipe listening socket has been closed:
overseer-v1.1.6/proc_master.go
//**during a restart** a SIGUSR1 signals //to the master process that, the file //descriptors have been released if mp.awaitingUSR1 & amp; & amp; s == SIGUSR1 { mp.debugf("signaled, sockets ready") mp.awaitingUSR1 = false mp.descriptorsReleased <- true } else
Finally, we return to the (mp *master) fork function. The fork function has been waiting for the mp.descriptorsReleased notification or the cmd.Wait sub-process to exit. After receiving the pipeline notification, the fork exits and enters the next round of fork loop.
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { //... ... //... ... //... ... //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code=1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
–end–