Start of destination support and some refactoring

This commit is contained in:
2022-07-05 20:43:32 +09:30
parent c1c1fc1866
commit 16d9ac368c
7 changed files with 189 additions and 81 deletions

View File

@@ -25,6 +25,7 @@ type Download struct {
ExitCode int `json:"exit_code"`
State string `json:"state"`
DownloadProfile config.DownloadProfile `json:"download_profile"`
Destination *config.Destination `json:"destination"`
Finished bool `json:"finished"`
FinishedTS time.Time `json:"finished_ts"`
Files []string `json:"files"`
@@ -34,8 +35,10 @@ type Download struct {
Percent float32 `json:"percent"`
Log []string `json:"log"`
Config *config.Config
Lock sync.Mutex
}
// The Manager holds and is responsible for all Download objects.
type Manager struct {
Downloads []*Download
MaxPerDomain int
@@ -48,11 +51,10 @@ var downloadId int32 = 0
func (m *Manager) ManageQueue() {
for {
m.Lock.Lock()
m.startQueued(m.MaxPerDomain)
m.cleanup()
// m.cleanup()
m.Lock.Unlock()
time.Sleep(time.Second)
@@ -62,33 +64,44 @@ func (m *Manager) ManageQueue() {
// startQueued starts any downloads that have been queued, we would not exceed
// maxRunning. If maxRunning is 0, there is no limit.
func (m *Manager) startQueued(maxRunning int) {
active := make(map[string]int)
for _, dl := range m.Downloads {
dl.Lock.Lock()
if dl.State == "downloading" {
if dl.State == "downloading" || dl.State == "preparing to start" {
active[dl.domain()]++
}
dl.Lock.Unlock()
}
for _, dl := range m.Downloads {
dl.Lock.Lock()
if dl.State == "queued" && (maxRunning == 0 || active[dl.domain()] < maxRunning) {
dl.State = "downloading"
dl.State = "preparing to start"
active[dl.domain()]++
log.Printf("Starting download for id:%d (%s)", dl.Id, dl.Url)
go func() {
m.Begin(dl.Id)
}()
dl.Lock.Unlock()
go func(sdl *Download) {
sdl.Begin()
}(dl)
} else {
dl.Lock.Unlock()
}
}
}
// cleanup removes old downloads from the list. Hardcoded to remove them one hour
// completion.
func (m *Manager) cleanup() {
func (m *Manager) XXXcleanup() {
newDLs := []*Download{}
for _, dl := range m.Downloads {
@@ -102,59 +115,68 @@ func (m *Manager) cleanup() {
m.Downloads = newDLs
}
func (m *Manager) DlById(id int) *Download {
// GetDlById returns one of the downloads in our current list.
func (m *Manager) GetDlById(id int) (*Download, error) {
m.Lock.Lock()
defer m.Lock.Unlock()
for _, dl := range m.Downloads {
if dl.Id == id {
return dl
return dl, nil
}
}
return nil
return nil, fmt.Errorf("no download with id %d", id)
}
// Queue queues a download
func (m *Manager) Queue(id int) {
dl := m.DlById(id)
func (m *Manager) Queue(dl *Download) {
dl.Lock.Lock()
defer dl.Lock.Unlock()
dl.State = "queued"
}
func (m *Manager) NewDownload(conf *config.Config, url string) int {
func NewDownload(url string, conf *config.Config) *Download {
atomic.AddInt32(&downloadId, 1)
dl := Download{
Config: conf,
Id: int(downloadId),
Url: url,
PopupUrl: fmt.Sprintf("/fetch/%d", int(downloadId)),
State: "choose profile",
Finished: false,
Eta: "?",
Percent: 0.0,
Log: make([]string, 0, 1000),
Files: []string{},
Log: []string{},
Config: conf,
Lock: sync.Mutex{},
}
m.Downloads = append(m.Downloads, &dl)
return int(downloadId)
return &dl
}
func (m *Manager) AppendLog(id int, text string) {
dl := m.DlById(id)
dl.Log = append(dl.Log, text)
func (m *Manager) AddDownload(dl *Download) {
m.Lock.Lock()
defer m.Lock.Unlock()
m.Downloads = append(m.Downloads, dl)
return
}
// func (dl *Download) AppendLog(text string) {
// dl.Lock.Lock()
// defer dl.Lock.Unlock()
// dl.Log = append(dl.Log, text)
// }
// Stop the download.
func (m *Manager) Stop(id int) {
func (dl *Download) Stop() {
if !CanStopDownload {
log.Print("attempted to stop download on a platform that it is not currently supported on - please report this as a bug")
os.Exit(1)
}
dl := m.DlById(id)
log.Printf("stopping the download")
dl.Lock.Lock()
defer dl.Lock.Unlock()
dl.Log = append(dl.Log, "aborted by user")
dl.Process.Kill()
}
// domain returns a domain for this Download. Download should be locked.
func (dl *Download) domain() string {
url, err := url.Parse(dl.Url)
@@ -169,10 +191,8 @@ func (dl *Download) domain() string {
// Begin starts a download, by starting the command specified in the DownloadProfile.
// It blocks until the download is complete.
func (m *Manager) Begin(id int) {
m.Lock.Lock()
dl := m.DlById(id)
func (dl *Download) Begin() {
dl.Lock.Lock()
dl.State = "downloading"
cmdSlice := []string{}
@@ -192,7 +212,7 @@ func (m *Manager) Begin(id int) {
dl.Finished = true
dl.FinishedTS = time.Now()
dl.Log = append(dl.Log, fmt.Sprintf("error setting up stdout pipe: %v", err))
m.Lock.Unlock()
dl.Lock.Unlock()
return
}
@@ -203,7 +223,7 @@ func (m *Manager) Begin(id int) {
dl.Finished = true
dl.FinishedTS = time.Now()
dl.Log = append(dl.Log, fmt.Sprintf("error setting up stderr pipe: %v", err))
m.Lock.Unlock()
dl.Lock.Unlock()
return
}
@@ -215,7 +235,7 @@ func (m *Manager) Begin(id int) {
dl.Finished = true
dl.FinishedTS = time.Now()
dl.Log = append(dl.Log, fmt.Sprintf("error starting command '%s': %v", dl.DownloadProfile.Command, err))
m.Lock.Unlock()
dl.Lock.Unlock()
return
}
@@ -225,24 +245,24 @@ func (m *Manager) Begin(id int) {
wg.Add(2)
m.Lock.Unlock()
dl.Lock.Unlock()
go func() {
defer wg.Done()
m.updateDownload(dl, stdout)
dl.updateDownload(stdout)
}()
go func() {
defer wg.Done()
m.updateDownload(dl, stderr)
dl.updateDownload(stderr)
}()
wg.Wait()
cmd.Wait()
log.Printf("Process finished for id: %d (%v)", dl.Id, cmd)
dl.Lock.Lock()
m.Lock.Lock()
log.Printf("Process finished for id: %d (%v)", dl.Id, cmd)
dl.State = "complete"
dl.Finished = true
@@ -252,12 +272,14 @@ func (m *Manager) Begin(id int) {
if dl.ExitCode != 0 {
dl.State = "failed"
}
m.Lock.Unlock()
dl.Lock.Unlock()
}
func (m *Manager) updateDownload(dl *Download, r io.Reader) {
// updateDownload updates the download based on data from the reader. Expects the
// Download to be unlocked.
func (dl *Download) updateDownload(r io.Reader) {
// XXX not sure if we might get a partial line?
buf := make([]byte, 1024)
for {
@@ -272,15 +294,12 @@ func (m *Manager) updateDownload(dl *Download, r io.Reader) {
continue
}
m.Lock.Lock()
// append the raw log
dl.Lock.Lock()
dl.Log = append(dl.Log, l)
// look for the percent and eta and other metadata
dl.updateMetadata(l)
m.Lock.Unlock()
dl.Lock.Unlock()
}
}
@@ -290,6 +309,7 @@ func (m *Manager) updateDownload(dl *Download, r io.Reader) {
}
}
// updateMetadata parses some metadata and updates the Download. Download must be locked.
func (dl *Download) updateMetadata(s string) {
// [download] 49.7% of ~15.72MiB at 5.83MiB/s ETA 00:07