package runtime import ( "context" "docker-updater/contextutil" "docker-updater/docker/helper" "fmt" "log/slog" "sync" "github.com/robfig/cron/v3" ) type ( EventLoop struct { mu sync.Mutex docker *helper.DockerHelper cr *cron.Cron executors map[string]cron.EntryID ctx context.Context cancelFunc context.CancelFunc eventMu sync.RWMutex eventBuffer []helper.ContainerEvent eventTrigger chan struct{} } ) func NewEventLoop(ctx context.Context, docker *helper.DockerHelper) (*EventLoop, error) { loopCtx, cancelFunc := context.WithCancel(contextutil.WithThreadName(context.Background(), "event_loop")) loop := &EventLoop{ docker: docker, cr: cron.New(), ctx: loopCtx, cancelFunc: cancelFunc, executors: make(map[string]cron.EntryID), eventTrigger: make(chan struct{}), } if err := loop.firstRun(ctx); err != nil { return nil, fmt.Errorf("unable to initialize the event loop: %s", err) } docker.SetContainersEventCallback(loop.onEventReceived) go loop.startEventLoop() return loop, nil } func (el *EventLoop) Close() error { el.cancelFunc() ctx := el.cr.Stop() <-ctx.Done() return nil } func (el *EventLoop) Execute() { el.cr.Run() slog.Info("event loop stopped", "thread", "main") } func (el *EventLoop) firstRun(ctx context.Context) error { el.mu.Lock() defer el.mu.Unlock() containers, err := el.docker.RunningContainers(ctx) if err != nil { return fmt.Errorf("unable to get the list of running containers: %s", err) } for _, container := range containers { if container.Enabled() { if err := el.register(container); err != nil { slog.Error("an error occured while registering a container in the scheduler", "thread", contextutil.ThreadName(ctx), "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), "err", err, ) continue } slog.Info("a container was scheduled", "thread", contextutil.ThreadName(ctx), "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), ) } else { slog.Debug("this container is disabled, ignoring", "thread", contextutil.ThreadName(ctx), "container_name", container.Name(), "container_id", container.ID(), ) } } return nil } func (el *EventLoop) onEventReceived(ev helper.ContainerEvent) { el.eventMu.Lock() defer el.eventMu.Unlock() slog.Debug("event from daemon", "thread", contextutil.ThreadName(ev.Context()), "event_type", ev.EventType, "container_name", ev.Data.Name()) wakeUpEventLoop := false if len(el.eventBuffer) == 0 { wakeUpEventLoop = true } el.eventBuffer = append(el.eventBuffer, ev) if len(el.eventBuffer) > 100 { slog.Warn("slow event processing", "thread", contextutil.ThreadName(ev.Context()), "buffer_size", len(el.eventBuffer)) } if wakeUpEventLoop { el.eventTrigger <- struct{}{} } } func (el *EventLoop) register(container helper.Container) error { id, err := el.cr.AddFunc(container.Schedule(), func() { if err := el.updateImage(container); err != nil { slog.Error("failed to update the container image", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "image_name", container.Image().Name(), "image_id", container.Image().ID(), "err", err, ) } }) if err != nil { return fmt.Errorf("the registration of the container in the scheduler has failed: %s", err) } el.executors[container.ID()] = id return nil } func (el *EventLoop) unregister(container helper.Container) { if id, ok := el.executors[container.ID()]; ok { el.cr.Remove(id) } delete(el.executors, container.ID()) } func (el *EventLoop) startEventLoop() { for { el.eventMu.RLock() eventBufferLen := len(el.eventBuffer) el.eventMu.RUnlock() if eventBufferLen == 0 { select { case <-el.eventTrigger: { slog.Debug("event loop waken up", "thread", contextutil.ThreadName(el.ctx)) } case <-el.ctx.Done(): { return } } } var ev helper.ContainerEvent el.eventMu.Lock() if len(el.eventBuffer) == 0 { el.eventMu.Unlock() continue } ev, el.eventBuffer = el.eventBuffer[0], el.eventBuffer[1:] el.eventMu.Unlock() slog.Debug("processing docker event", "thread", contextutil.ThreadName(el.ctx)) el.process(ev) } } func (el *EventLoop) process(ev helper.ContainerEvent) { el.mu.Lock() defer el.mu.Unlock() container := ev.Data switch ev.EventType { case helper.NewContainer: { if container.Enabled() { if err := el.register(container); err != nil { slog.Error("an error occured while registering a container in the scheduler", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), "err", err, ) return } slog.Info("a new container was scheduled", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), ) } else { slog.Debug("receiving an event for a disabled container, ignoring", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), ) } } case helper.DeletedContainer: { el.unregister(container) } case helper.UpdatedContainer: { el.unregister(container) if container.Enabled() { if err := el.register(container); err != nil { slog.Error("an error occured while updating a container in the scheduler", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), "err", err, ) return } slog.Info("a container was updated", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "schedule", container.Schedule(), ) } else { slog.Debug("a previously enabled container is now disabled, the container will not be updated in the future", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), ) } } } } func (el *EventLoop) updateImage(container helper.Container) error { image, err := el.docker.RemoteImageMetadata(el.ctx, container.Image().Name()) if err != nil { return fmt.Errorf("unable to get metadata from the registry: %s", err) } if image.Hash() == container.Image().Hash() { slog.Debug("the image is already up-to-date", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "image_name", container.Image().Name(), "image_id", container.Image().ID(), ) return nil } if err := el.docker.StopContainer(el.ctx, container); err != nil { return fmt.Errorf("unable to stop the container to update the image: %s", err) } defer func() { if err := el.docker.StartContainer(el.ctx, container); err != nil { slog.Error("unable to restart the container after the update", "thread", "event_loop", "container_name", container.Name(), "container_id", container.ID(), "image_name", container.Image().Name(), "image_id", container.Image().ID(), ) } }() if err := el.docker.PullImage(el.ctx, container.Image().Name()); err != nil { return fmt.Errorf("failed to pull the image from the registry: %s", err) } return nil }