diff --git a/makefile b/makefile index 8d46e29..1eca05c 100644 --- a/makefile +++ b/makefile @@ -1,5 +1,5 @@ debug: - go run -tags "debug" ./... + go run -tags "debug" -race ./... all: clean build diff --git a/runtime.go b/runtime.go index f9659ca..b5db011 100644 --- a/runtime.go +++ b/runtime.go @@ -51,6 +51,7 @@ func run(ctx context.Context, debug bool) int { if err := docker.StartWatcher(ctx, config.DaemonConfiguration.PullInterval); err != nil { slog.Error("unable to start the docker watcher", "thread", "main", "err", err) + return failure } el, err := runtime.NewEventLoop(ctx, docker) @@ -61,6 +62,7 @@ func run(ctx context.Context, debug bool) int { defer el.Close() go func() { + interruptNum := 0 c := make(chan os.Signal, 10) signal.Notify(c) defer close(c) @@ -69,16 +71,21 @@ func run(ctx context.Context, debug bool) int { switch sig { case os.Interrupt: { + interruptNum++ slog.Info("the process received an interruption signal", "thread", "signal_watcher", "signal", sig.String()) el.Close() } case os.Kill: { + interruptNum++ slog.Info("the process received a kill signal", "thread", "signal_watcher", "signal", sig.String()) el.Close() } } - + if interruptNum == 3 { + slog.Info("received 3 times a stop signal, forcing the program to shut down", "thread", "signal_watch") + os.Exit(failure) + } } }() diff --git a/runtime/runtime.go b/runtime/runtime.go index fe0d254..18012a1 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -21,18 +21,22 @@ type ( executors map[string]cron.EntryID ctx context.Context - eventMu sync.Mutex + cancelFunc context.CancelFunc + eventMu sync.RWMutex eventBuffer []helper.ContainerEvent eventTrigger chan struct{} } ) func NewEventLoop(ctx context.Context, docker *helper.DockerHelper) (*EventLoop, error) { + loopCtx, cancelFunc := context.WithCancel(contextutil.WithThreadName(context.Background(), "event_loop")) loop := &EventLoop{ - docker: docker, - cr: cron.New(), - ctx: contextutil.WithThreadName(context.Background(), "event_loop"), - executors: make(map[string]cron.EntryID), + docker: docker, + cr: cron.New(), + ctx: loopCtx, + cancelFunc: cancelFunc, + executors: make(map[string]cron.EntryID), + eventTrigger: make(chan struct{}), } if err := loop.firstRun(ctx); err != nil { @@ -46,6 +50,7 @@ func NewEventLoop(ctx context.Context, docker *helper.DockerHelper) (*EventLoop, } func (el *EventLoop) Close() error { + el.cancelFunc() ctx := el.cr.Stop() <-ctx.Done() return nil @@ -146,15 +151,33 @@ func (el *EventLoop) unregister(container helper.Container) { func (el *EventLoop) startEventLoop() { for { - if len(el.eventBuffer) == 0 { - <-el.eventTrigger + el.eventMu.RLock() + eventBufferLen := len(el.eventBuffer) + el.eventMu.RUnlock() + + if eventBufferLen == 0 { + select { + case <-el.eventTrigger: + { + slog.Debug("event loop waken up", "thread", contextutil.ThreadName(el.ctx)) + } + case <-el.ctx.Done(): + { + return + } + } } + var ev helper.ContainerEvent el.eventMu.Lock() - ev := el.eventBuffer[0] - el.eventBuffer = append([]helper.ContainerEvent{}, el.eventBuffer[1:]...) + if len(el.eventBuffer) == 0 { + el.eventMu.Unlock() + continue + } + ev, el.eventBuffer = el.eventBuffer[0], el.eventBuffer[1:] el.eventMu.Unlock() + slog.Debug("processing docker event", "thread", contextutil.ThreadName(el.ctx)) el.process(ev) } }