refactoring sync
Some checks failed
CloudSave/pipeline/head There was a failure building this commit
Some checks failed
CloudSave/pipeline/head There was a failure building this commit
This commit is contained in:
@@ -7,11 +7,11 @@ import (
|
||||
"cloudsave/pkg/remote"
|
||||
"cloudsave/pkg/remote/client"
|
||||
"cloudsave/pkg/repository"
|
||||
"cloudsave/pkg/sync"
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -61,224 +61,70 @@ func (p *SyncCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{})
|
||||
return subcommands.ExitFailure
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
|
||||
pg := progressbar.New(-1)
|
||||
destroyPg := func() {
|
||||
if err := pg.Finish(); err != nil {
|
||||
slog.Error("failed to finish progressbar", "err", err)
|
||||
}
|
||||
if err := pg.Clear(); err != nil {
|
||||
slog.Error("failed to clear progressbar", "err", err)
|
||||
}
|
||||
if err := pg.Close(); err != nil {
|
||||
slog.Error("failed to close progressbar", "err", err)
|
||||
}
|
||||
pg.Finish()
|
||||
pg.Clear()
|
||||
pg.Close()
|
||||
}
|
||||
|
||||
pg.Describe(fmt.Sprintf("[%s] Checking status...", g.Name))
|
||||
exists, err := cli.Exists(r.GameID)
|
||||
if err != nil {
|
||||
slog.Error(err.Error())
|
||||
continue
|
||||
}
|
||||
syncer := sync.NewSyncer(cli, p.Service)
|
||||
|
||||
if !exists {
|
||||
pg.Describe(fmt.Sprintf("[%s] Pushing data...", g.Name))
|
||||
if err := p.push(g, cli); err != nil {
|
||||
syncer.SetStateCallback(func(s sync.State, g repository.Metadata) {
|
||||
switch s {
|
||||
case sync.FetchingMetdata:
|
||||
pg.Describe(fmt.Sprintf("%s: fetching metadata from repository", g.Name))
|
||||
case sync.Pushing:
|
||||
pg.Describe(fmt.Sprintf("%s: pushing data to the server", g.Name))
|
||||
case sync.Pulling:
|
||||
pg.Describe(fmt.Sprintf("%s: pull data from the server", g.Name))
|
||||
case sync.UpToDate:
|
||||
destroyPg()
|
||||
fmt.Fprintln(os.Stderr, "failed to push:", err)
|
||||
return subcommands.ExitFailure
|
||||
}
|
||||
pg.Describe(fmt.Sprintf("[%s] Pushing backup...", g.Name))
|
||||
if err := p.pushBackup(g, cli); err != nil {
|
||||
fmt.Println("🆗", g.Name+": already up-to-date")
|
||||
case sync.Pushed:
|
||||
destroyPg()
|
||||
slog.Warn("failed to push backup files", "err", err)
|
||||
}
|
||||
destroyPg()
|
||||
fmt.Println("⬆️", g.Name+": pushed")
|
||||
continue
|
||||
}
|
||||
|
||||
pg.Describe(fmt.Sprintf("[%s] Fetching metadata...", g.Name))
|
||||
|
||||
remoteMetadata, err := cli.Metadata(r.GameID)
|
||||
if err != nil {
|
||||
destroyPg()
|
||||
fmt.Fprintln(os.Stderr, "error: failed to get the game metadata from the remote:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
pg.Describe(fmt.Sprintf("[%s] Pulling backup...", g.Name))
|
||||
if err := p.pullBackup(g, cli); err != nil {
|
||||
slog.Warn("failed to pull backup files", "err", err)
|
||||
}
|
||||
|
||||
pg.Describe(fmt.Sprintf("[%s] Pushing backup...", g.Name))
|
||||
if err := p.pushBackup(g, cli); err != nil {
|
||||
slog.Warn("failed to push backup files", "err", err)
|
||||
}
|
||||
|
||||
if g.MD5 == remoteMetadata.MD5 {
|
||||
destroyPg()
|
||||
if g.Version != remoteMetadata.Version {
|
||||
slog.Debug("version is not the same, but the hash is equal. Updating local database")
|
||||
if err := p.Service.SetVersion(r.GameID, remoteMetadata.Version); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error: failed to synchronize version number:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
fmt.Println("🆗", g.Name+": already up-to-date")
|
||||
continue
|
||||
}
|
||||
|
||||
if g.Version > remoteMetadata.Version {
|
||||
pg.Describe(fmt.Sprintf("[%s] Pushing data...", g.Name))
|
||||
if err := p.push(g, cli); err != nil {
|
||||
fmt.Println("⬆️", g.Name+": pushed")
|
||||
case sync.Pulled:
|
||||
destroyPg()
|
||||
fmt.Fprintln(os.Stderr, "failed to push:", err)
|
||||
return subcommands.ExitFailure
|
||||
fmt.Println("⬇️", g.Name+": pulled")
|
||||
}
|
||||
})
|
||||
|
||||
syncer.SetErrorCallback(func(err error) {
|
||||
destroyPg()
|
||||
fmt.Println("⬆️", g.Name+": pushed")
|
||||
continue
|
||||
}
|
||||
fmt.Println("❌", g.Name+": "+err.Error())
|
||||
})
|
||||
|
||||
if g.Version < remoteMetadata.Version {
|
||||
destroyPg()
|
||||
if err := p.pull(g, cli); err != nil {
|
||||
destroyPg()
|
||||
fmt.Fprintln(os.Stderr, "failed to push:", err)
|
||||
return subcommands.ExitFailure
|
||||
syncer.SetConflictCallback(func(a, b repository.Metadata) sync.ConflictResolution {
|
||||
fmt.Println()
|
||||
fmt.Println("--- ⚠️ CONFLICT ---")
|
||||
fmt.Println(a.Name, "(", a.Path, ")")
|
||||
fmt.Println("----")
|
||||
fmt.Println("Your version:", a.Date.Format(time.RFC1123))
|
||||
fmt.Println("Their version:", b.Date.Format(time.RFC1123))
|
||||
fmt.Println()
|
||||
|
||||
res := prompt.Conflict()
|
||||
|
||||
switch res {
|
||||
case prompt.Their:
|
||||
return sync.Their
|
||||
case prompt.My:
|
||||
return sync.Mine
|
||||
}
|
||||
|
||||
g.Version = remoteMetadata.Version
|
||||
g.Date = remoteMetadata.Date
|
||||
return sync.None
|
||||
})
|
||||
|
||||
if err := p.Service.UpdateMetadata(g.ID, g); err != nil {
|
||||
destroyPg()
|
||||
fmt.Fprintln(os.Stderr, "failed to push:", err)
|
||||
return subcommands.ExitFailure
|
||||
}
|
||||
fmt.Println("⬇️", g.Name+": pulled")
|
||||
continue
|
||||
}
|
||||
|
||||
destroyPg()
|
||||
|
||||
if g.Version == remoteMetadata.Version {
|
||||
if err := p.conflict(r.GameID, g, remoteMetadata, cli); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error: failed to resolve conflict:", err)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
syncer.Sync()
|
||||
}
|
||||
|
||||
fmt.Println("done.")
|
||||
return subcommands.ExitSuccess
|
||||
}
|
||||
|
||||
func (p *SyncCmd) conflict(gameID string, m, remoteMetadata repository.Metadata, cli *client.Client) error {
|
||||
g, err := p.Service.One(gameID)
|
||||
if err != nil {
|
||||
slog.Warn("a conflict was found but the game is not found in the database")
|
||||
slog.Debug("debug info", "gameID", gameID)
|
||||
return nil
|
||||
}
|
||||
fmt.Println()
|
||||
fmt.Println("--- ⚠️ CONFLICT ---")
|
||||
fmt.Println(g.Name, "(", g.Path, ")")
|
||||
fmt.Println("----")
|
||||
fmt.Println("Your version:", g.Date.Format(time.RFC1123))
|
||||
fmt.Println("Their version:", remoteMetadata.Date.Format(time.RFC1123))
|
||||
fmt.Println()
|
||||
|
||||
res := prompt.Conflict()
|
||||
|
||||
switch res {
|
||||
case prompt.My:
|
||||
{
|
||||
if err := p.push(m, cli); err != nil {
|
||||
return fmt.Errorf("failed to push: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
case prompt.Their:
|
||||
{
|
||||
if err := p.pull(g, cli); err != nil {
|
||||
return fmt.Errorf("failed to push: %w", err)
|
||||
}
|
||||
g.Version = remoteMetadata.Version
|
||||
g.Date = remoteMetadata.Date
|
||||
|
||||
if err := p.Service.UpdateMetadata(g.ID, g); err != nil {
|
||||
return fmt.Errorf("failed to push: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncCmd) push(m repository.Metadata, cli *client.Client) error {
|
||||
return p.Service.PushArchive(m.ID, "", cli)
|
||||
}
|
||||
|
||||
func (p *SyncCmd) pushBackup(m repository.Metadata, cli *client.Client) error {
|
||||
bs, err := p.Service.AllBackups(m.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, b := range bs {
|
||||
binfo, err := cli.ArchiveInfo(m.ID, b.UUID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, client.ErrNotFound) {
|
||||
return fmt.Errorf("failed to get remote information about the backup file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if binfo.MD5 != b.MD5 {
|
||||
if err := cli.PushBackup(b, m); err != nil {
|
||||
return fmt.Errorf("failed to push backup: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncCmd) pullBackup(m repository.Metadata, cli *client.Client) error {
|
||||
bs, err := cli.ListArchives(m.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, uuid := range bs {
|
||||
rinfo, err := cli.ArchiveInfo(m.ID, uuid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
linfo, err := p.Service.Backup(m.ID, uuid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if linfo.MD5 != rinfo.MD5 {
|
||||
if err := p.Service.PullBackup(m.ID, uuid, cli); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncCmd) pull(g repository.Metadata, cli *client.Client) error {
|
||||
if err := p.Service.PullArchive(g.ID, "", cli); err != nil {
|
||||
return err
|
||||
}
|
||||
return p.Service.ApplyCurrent(g.ID)
|
||||
}
|
||||
|
||||
func connect(remoteCred map[string]map[string]string, r remote.Remote) (*client.Client, error) {
|
||||
var cli *client.Client
|
||||
|
||||
|
||||
@@ -384,6 +384,10 @@ func (c *Client) All() ([]repository.Metadata, error) {
|
||||
return nil, errors.New("invalid payload sent by the server")
|
||||
}
|
||||
|
||||
func (c *Client) BaseURL() string {
|
||||
return c.baseURL
|
||||
}
|
||||
|
||||
func (c *Client) get(url string) (obj.HTTPObject, error) {
|
||||
cli := http.Client{}
|
||||
|
||||
|
||||
233
pkg/sync/sync.go
233
pkg/sync/sync.go
@@ -1,2 +1,235 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"cloudsave/pkg/data"
|
||||
"cloudsave/pkg/remote"
|
||||
"cloudsave/pkg/remote/client"
|
||||
"cloudsave/pkg/repository"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type (
|
||||
ConflictResolution int
|
||||
State int
|
||||
decision int
|
||||
|
||||
Syncer struct {
|
||||
cli *client.Client
|
||||
service *data.Service
|
||||
|
||||
stateCallback func(s State, g repository.Metadata)
|
||||
errorCallback func(err error)
|
||||
conflictCallback func(a, b repository.Metadata) ConflictResolution
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
Their ConflictResolution = iota
|
||||
Mine
|
||||
None
|
||||
)
|
||||
|
||||
const (
|
||||
ignore decision = iota
|
||||
push
|
||||
pull
|
||||
)
|
||||
|
||||
const (
|
||||
FetchingMetdata State = iota
|
||||
Pushing
|
||||
Pulling
|
||||
Pushed
|
||||
Pulled
|
||||
UpToDate
|
||||
)
|
||||
|
||||
var (
|
||||
ErrFetching error = errors.New("failed to fetch the metadata")
|
||||
ErrPushing error = errors.New("failed to push data")
|
||||
ErrPulling error = errors.New("failed to pull data")
|
||||
ErrDatastore error = errors.New("failed to get data from local datastore")
|
||||
)
|
||||
|
||||
func NewSyncer(cli *client.Client, service *data.Service) *Syncer {
|
||||
return &Syncer{
|
||||
cli: cli,
|
||||
service: service,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) SetStateCallback(fn func(s State, g repository.Metadata)) {
|
||||
s.stateCallback = fn
|
||||
}
|
||||
|
||||
func (s *Syncer) SetErrorCallback(fn func(err error)) {
|
||||
s.errorCallback = fn
|
||||
}
|
||||
|
||||
func (s *Syncer) SetConflictCallback(fn func(a, b repository.Metadata) ConflictResolution) {
|
||||
s.conflictCallback = fn
|
||||
}
|
||||
|
||||
func (s *Syncer) Sync() {
|
||||
games, err := s.service.AllGames()
|
||||
if err != nil {
|
||||
s.errorCallback(fmt.Errorf("failed to get all games: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, g := range games {
|
||||
r, err := remote.One(g.ID)
|
||||
if err != nil {
|
||||
s.errorCallback(fmt.Errorf("%w: %s", ErrDatastore, err))
|
||||
}
|
||||
if r.URL != s.cli.BaseURL() {
|
||||
continue
|
||||
}
|
||||
if err := s.sync(g); err != nil {
|
||||
s.errorCallback(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) sync(g repository.Metadata) error {
|
||||
s.stateCallback(FetchingMetdata, g)
|
||||
|
||||
remoteMetadata, err := s.cli.Metadata(g.ID)
|
||||
if err != nil {
|
||||
if errors.Is(err, client.ErrNotFound) {
|
||||
s.stateCallback(Pushing, g)
|
||||
if err := s.push(g); err != nil {
|
||||
return fmt.Errorf("%w: %s", ErrPushing, err)
|
||||
}
|
||||
s.stateCallback(Pushed, g)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("%w: %s", ErrFetching, err)
|
||||
}
|
||||
|
||||
if g.MD5 == remoteMetadata.MD5 {
|
||||
s.stateCallback(UpToDate, g)
|
||||
return nil
|
||||
}
|
||||
|
||||
d := ignore
|
||||
|
||||
if g.Version > remoteMetadata.Version {
|
||||
d = push
|
||||
}
|
||||
|
||||
if g.Version < remoteMetadata.Version {
|
||||
d = pull
|
||||
}
|
||||
|
||||
if g.Version == remoteMetadata.Version {
|
||||
r := s.conflictCallback(g, remoteMetadata)
|
||||
switch r {
|
||||
case Mine:
|
||||
{
|
||||
d = push
|
||||
}
|
||||
case Their:
|
||||
{
|
||||
d = pull
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
switch d {
|
||||
case push:
|
||||
{
|
||||
s.stateCallback(Pushing, g)
|
||||
if err := s.push(g); err != nil {
|
||||
return fmt.Errorf("%w: %s", ErrPushing, err)
|
||||
}
|
||||
s.stateCallback(Pushed, g)
|
||||
return nil
|
||||
}
|
||||
case pull:
|
||||
{
|
||||
s.stateCallback(Pulling, g)
|
||||
if err := s.pull(g, remoteMetadata); err != nil {
|
||||
return fmt.Errorf("%w: %s", ErrPulling, err)
|
||||
}
|
||||
s.stateCallback(Pulled, g)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) push(g repository.Metadata) error {
|
||||
if err := s.service.PushArchive(g.ID, "", s.cli); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// manage backup
|
||||
bs, err := s.service.AllBackups(g.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, b := range bs {
|
||||
binfo, err := s.cli.ArchiveInfo(g.ID, b.UUID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, client.ErrNotFound) {
|
||||
return fmt.Errorf("failed to get remote information about the backup file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if binfo.MD5 != b.MD5 {
|
||||
if err := s.cli.PushBackup(b, g); err != nil {
|
||||
return fmt.Errorf("failed to push backup: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) pull(g, r repository.Metadata) error {
|
||||
g.Version = r.Version
|
||||
g.Date = r.Date
|
||||
|
||||
if err := s.service.UpdateMetadata(g.ID, g); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.service.PullArchive(g.ID, "", s.cli); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.service.ApplyCurrent(g.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// manage backup
|
||||
bs, err := s.cli.ListArchives(g.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, uuid := range bs {
|
||||
rinfo, err := s.cli.ArchiveInfo(g.ID, uuid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
linfo, err := s.service.Backup(g.ID, uuid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if linfo.MD5 != rinfo.MD5 {
|
||||
if err := s.service.PullBackup(g.ID, uuid, s.cli); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user