Files
docker-updater/runtime/runtime.go
2026-02-08 22:29:45 +01:00

268 lines
6.8 KiB
Go

package runtime
import (
"context"
"docker-updater/contextutil"
"docker-updater/docker/helper"
"fmt"
"log/slog"
"sync"
"github.com/robfig/cron/v3"
)
type (
EventLoop struct {
mu sync.Mutex
docker *helper.DockerHelper
cr *cron.Cron
executors map[string]cron.EntryID
ctx context.Context
eventMu sync.Mutex
eventBuffer []helper.ContainerEvent
eventTrigger chan struct{}
}
)
func NewEventLoop(ctx context.Context, docker *helper.DockerHelper) (*EventLoop, error) {
loop := &EventLoop{
docker: docker,
cr: cron.New(),
ctx: contextutil.WithThreadName(context.Background(), "event_loop"),
executors: make(map[string]cron.EntryID),
}
if err := loop.firstRun(ctx); err != nil {
return nil, fmt.Errorf("unable to initialize the event loop: %s", err)
}
docker.SetContainersEventCallback(loop.onEventReceived)
go loop.startEventLoop()
return loop, nil
}
func (el *EventLoop) Close() error {
ctx := el.cr.Stop()
<-ctx.Done()
return nil
}
func (el *EventLoop) Execute() {
el.cr.Run()
slog.Info("event loop stopped", "thread", "main")
}
func (el *EventLoop) firstRun(ctx context.Context) error {
el.mu.Lock()
defer el.mu.Unlock()
containers, err := el.docker.RunningContainers(ctx)
if err != nil {
return fmt.Errorf("unable to get the list of running containers: %s", err)
}
for _, container := range containers {
if container.Enabled() {
if err := el.register(container); err != nil {
slog.Error("an error occured while registering a container in the scheduler",
"thread", contextutil.ThreadName(ctx),
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
"err", err,
)
continue
}
slog.Info("a container was scheduled",
"thread", contextutil.ThreadName(ctx),
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
)
} else {
slog.Debug("this container is disabled, ignoring",
"thread", contextutil.ThreadName(ctx),
"container_name", container.Name(),
"container_id", container.ID(),
)
}
}
return nil
}
func (el *EventLoop) onEventReceived(ev helper.ContainerEvent) {
el.eventMu.Lock()
defer el.eventMu.Unlock()
slog.Debug("event from daemon", "thread", contextutil.ThreadName(ev.Context()), "event_type", ev.EventType, "container_name", ev.Data.Name())
wakeUpEventLoop := false
if len(el.eventBuffer) == 0 {
wakeUpEventLoop = true
}
el.eventBuffer = append(el.eventBuffer, ev)
if len(el.eventBuffer) > 100 {
slog.Warn("slow event processing", "thread", contextutil.ThreadName(ev.Context()), "buffer_size", len(el.eventBuffer))
}
if wakeUpEventLoop {
el.eventTrigger <- struct{}{}
}
}
func (el *EventLoop) register(container helper.Container) error {
id, err := el.cr.AddFunc(container.Schedule(), func() {
if err := el.updateImage(container); err != nil {
slog.Error("failed to update the container image",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"image_name", container.Image().Name(),
"image_id", container.Image().ID(),
"err", err,
)
}
})
if err != nil {
return fmt.Errorf("the registration of the container in the scheduler has failed: %s", err)
}
el.executors[container.ID()] = id
return nil
}
func (el *EventLoop) unregister(container helper.Container) {
if id, ok := el.executors[container.ID()]; ok {
el.cr.Remove(id)
}
delete(el.executors, container.ID())
}
func (el *EventLoop) startEventLoop() {
for {
if len(el.eventBuffer) == 0 {
<-el.eventTrigger
}
el.eventMu.Lock()
ev := el.eventBuffer[0]
el.eventBuffer = append([]helper.ContainerEvent{}, el.eventBuffer[1:]...)
el.eventMu.Unlock()
el.process(ev)
}
}
func (el *EventLoop) process(ev helper.ContainerEvent) {
el.mu.Lock()
defer el.mu.Unlock()
container := ev.Data
switch ev.EventType {
case helper.NewContainer:
{
if container.Enabled() {
if err := el.register(container); err != nil {
slog.Error("an error occured while registering a container in the scheduler",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
"err", err,
)
return
}
slog.Info("a new container was scheduled",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
)
} else {
slog.Debug("receiving an event for a disabled container, ignoring",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
)
}
}
case helper.DeletedContainer:
{
el.unregister(container)
}
case helper.UpdatedContainer:
{
el.unregister(container)
if container.Enabled() {
if err := el.register(container); err != nil {
slog.Error("an error occured while updating a container in the scheduler",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
"err", err,
)
return
}
slog.Info("a container was updated",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"schedule", container.Schedule(),
)
} else {
slog.Debug("a previously enabled container is now disabled, the container will not be updated in the future",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
)
}
}
}
}
func (el *EventLoop) updateImage(container helper.Container) error {
image, err := el.docker.RemoteImageMetadata(el.ctx, container.Image().Name())
if err != nil {
return fmt.Errorf("unable to get metadata from the registry: %s", err)
}
if image.Hash() == container.Image().Hash() {
slog.Debug("the image is already up-to-date",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"image_name", container.Image().Name(),
"image_id", container.Image().ID(),
)
return nil
}
if err := el.docker.StopContainer(el.ctx, container); err != nil {
return fmt.Errorf("unable to stop the container to update the image: %s", err)
}
defer func() {
if err := el.docker.StartContainer(el.ctx, container); err != nil {
slog.Error("unable to restart the container after the update",
"thread", "event_loop",
"container_name", container.Name(),
"container_id", container.ID(),
"image_name", container.Image().Name(),
"image_id", container.Image().ID(),
)
}
}()
if err := el.docker.PullImage(el.ctx, container.Image().Name()); err != nil {
return fmt.Errorf("failed to pull the image from the registry: %s", err)
}
return nil
}