first version
This commit is contained in:
267
runtime/runtime.go
Normal file
267
runtime/runtime.go
Normal file
@@ -0,0 +1,267 @@
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"docker-updater/contextutil"
|
||||
"docker-updater/docker/helper"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
type (
|
||||
EventLoop struct {
|
||||
mu sync.Mutex
|
||||
|
||||
docker *helper.DockerHelper
|
||||
|
||||
cr *cron.Cron
|
||||
executors map[string]cron.EntryID
|
||||
|
||||
ctx context.Context
|
||||
eventMu sync.Mutex
|
||||
eventBuffer []helper.ContainerEvent
|
||||
eventTrigger chan struct{}
|
||||
}
|
||||
)
|
||||
|
||||
func NewEventLoop(ctx context.Context, docker *helper.DockerHelper) (*EventLoop, error) {
|
||||
loop := &EventLoop{
|
||||
docker: docker,
|
||||
cr: cron.New(),
|
||||
ctx: contextutil.WithThreadName(context.Background(), "event_loop"),
|
||||
executors: make(map[string]cron.EntryID),
|
||||
}
|
||||
|
||||
if err := loop.firstRun(ctx); err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize the event loop: %s", err)
|
||||
}
|
||||
|
||||
docker.SetContainersEventCallback(loop.onEventReceived)
|
||||
go loop.startEventLoop()
|
||||
|
||||
return loop, nil
|
||||
}
|
||||
|
||||
func (el *EventLoop) Close() error {
|
||||
ctx := el.cr.Stop()
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (el *EventLoop) Execute() {
|
||||
el.cr.Run()
|
||||
slog.Info("event loop stopped", "thread", "main")
|
||||
}
|
||||
|
||||
func (el *EventLoop) firstRun(ctx context.Context) error {
|
||||
el.mu.Lock()
|
||||
defer el.mu.Unlock()
|
||||
|
||||
containers, err := el.docker.RunningContainers(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get the list of running containers: %s", err)
|
||||
}
|
||||
|
||||
for _, container := range containers {
|
||||
if container.Enabled() {
|
||||
if err := el.register(container); err != nil {
|
||||
slog.Error("an error occured while registering a container in the scheduler",
|
||||
"thread", contextutil.ThreadName(ctx),
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
"err", err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
slog.Info("a container was scheduled",
|
||||
"thread", contextutil.ThreadName(ctx),
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
)
|
||||
} else {
|
||||
slog.Debug("this container is disabled, ignoring",
|
||||
"thread", contextutil.ThreadName(ctx),
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (el *EventLoop) onEventReceived(ev helper.ContainerEvent) {
|
||||
el.eventMu.Lock()
|
||||
defer el.eventMu.Unlock()
|
||||
|
||||
slog.Debug("event from daemon", "thread", contextutil.ThreadName(ev.Context()), "event_type", ev.EventType, "container_name", ev.Data.Name())
|
||||
|
||||
wakeUpEventLoop := false
|
||||
if len(el.eventBuffer) == 0 {
|
||||
wakeUpEventLoop = true
|
||||
}
|
||||
|
||||
el.eventBuffer = append(el.eventBuffer, ev)
|
||||
|
||||
if len(el.eventBuffer) > 100 {
|
||||
slog.Warn("slow event processing", "thread", contextutil.ThreadName(ev.Context()), "buffer_size", len(el.eventBuffer))
|
||||
}
|
||||
|
||||
if wakeUpEventLoop {
|
||||
el.eventTrigger <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (el *EventLoop) register(container helper.Container) error {
|
||||
id, err := el.cr.AddFunc(container.Schedule(), func() {
|
||||
if err := el.updateImage(container); err != nil {
|
||||
slog.Error("failed to update the container image",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"image_name", container.Image().Name(),
|
||||
"image_id", container.Image().ID(),
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("the registration of the container in the scheduler has failed: %s", err)
|
||||
}
|
||||
el.executors[container.ID()] = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func (el *EventLoop) unregister(container helper.Container) {
|
||||
if id, ok := el.executors[container.ID()]; ok {
|
||||
el.cr.Remove(id)
|
||||
}
|
||||
delete(el.executors, container.ID())
|
||||
}
|
||||
|
||||
func (el *EventLoop) startEventLoop() {
|
||||
for {
|
||||
if len(el.eventBuffer) == 0 {
|
||||
<-el.eventTrigger
|
||||
}
|
||||
|
||||
el.eventMu.Lock()
|
||||
ev := el.eventBuffer[0]
|
||||
el.eventBuffer = append([]helper.ContainerEvent{}, el.eventBuffer[1:]...)
|
||||
el.eventMu.Unlock()
|
||||
|
||||
el.process(ev)
|
||||
}
|
||||
}
|
||||
|
||||
func (el *EventLoop) process(ev helper.ContainerEvent) {
|
||||
el.mu.Lock()
|
||||
defer el.mu.Unlock()
|
||||
|
||||
container := ev.Data
|
||||
switch ev.EventType {
|
||||
case helper.NewContainer:
|
||||
{
|
||||
if container.Enabled() {
|
||||
if err := el.register(container); err != nil {
|
||||
slog.Error("an error occured while registering a container in the scheduler",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
"err", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
slog.Info("a new container was scheduled",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
)
|
||||
} else {
|
||||
slog.Debug("receiving an event for a disabled container, ignoring",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
)
|
||||
}
|
||||
}
|
||||
case helper.DeletedContainer:
|
||||
{
|
||||
el.unregister(container)
|
||||
}
|
||||
case helper.UpdatedContainer:
|
||||
{
|
||||
el.unregister(container)
|
||||
if container.Enabled() {
|
||||
if err := el.register(container); err != nil {
|
||||
slog.Error("an error occured while updating a container in the scheduler",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
"err", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
slog.Info("a container was updated",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"schedule", container.Schedule(),
|
||||
)
|
||||
} else {
|
||||
slog.Debug("a previously enabled container is now disabled, the container will not be updated in the future",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (el *EventLoop) updateImage(container helper.Container) error {
|
||||
image, err := el.docker.RemoteImageMetadata(el.ctx, container.Image().Name())
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get metadata from the registry: %s", err)
|
||||
}
|
||||
|
||||
if image.Hash() == container.Image().Hash() {
|
||||
slog.Debug("the image is already up-to-date",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"image_name", container.Image().Name(),
|
||||
"image_id", container.Image().ID(),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := el.docker.StopContainer(el.ctx, container); err != nil {
|
||||
return fmt.Errorf("unable to stop the container to update the image: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := el.docker.StartContainer(el.ctx, container); err != nil {
|
||||
slog.Error("unable to restart the container after the update",
|
||||
"thread", "event_loop",
|
||||
"container_name", container.Name(),
|
||||
"container_id", container.ID(),
|
||||
"image_name", container.Image().Name(),
|
||||
"image_id", container.Image().ID(),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := el.docker.PullImage(el.ctx, container.Image().Name()); err != nil {
|
||||
return fmt.Errorf("failed to pull the image from the registry: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user