Fix recursive lock
This commit is contained in:
@@ -3,11 +3,14 @@ package download
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/tardisx/gropple/config"
|
||||
@@ -28,6 +31,7 @@ type Download struct {
|
||||
Percent float32 `json:"percent"`
|
||||
Log []string `json:"log"`
|
||||
Config *config.Config
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
type Downloads []*Download
|
||||
@@ -35,26 +39,31 @@ type Downloads []*Download
|
||||
// StartQueued starts any downloads that have been queued, we would not exceed
|
||||
// maxRunning. If maxRunning is 0, there is no limit.
|
||||
func (dls Downloads) StartQueued(maxRunning int) {
|
||||
active := 0
|
||||
queued := 0
|
||||
active := make(map[string]int)
|
||||
|
||||
for _, dl := range dls {
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
if dl.State == "downloading" {
|
||||
active++
|
||||
}
|
||||
if dl.State == "queued" {
|
||||
queued++
|
||||
active[dl.domain()]++
|
||||
}
|
||||
dl.mutex.Unlock()
|
||||
|
||||
}
|
||||
|
||||
// there is room, so start one
|
||||
if queued > 0 && (active < maxRunning || maxRunning == 0) {
|
||||
for _, dl := range dls {
|
||||
if dl.State == "queued" {
|
||||
dl.State = "downloading"
|
||||
go func() { dl.Begin() }()
|
||||
return
|
||||
}
|
||||
for _, dl := range dls {
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
if dl.State == "queued" && (maxRunning == 0 || active[dl.domain()] < maxRunning) {
|
||||
dl.State = "downloading"
|
||||
active[dl.domain()]++
|
||||
log.Printf("Starting download for %#v", dl)
|
||||
dl.mutex.Unlock()
|
||||
go func() { dl.Begin() }()
|
||||
} else {
|
||||
dl.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,23 +74,57 @@ func (dls Downloads) StartQueued(maxRunning int) {
|
||||
func (dls Downloads) Cleanup() Downloads {
|
||||
newDLs := Downloads{}
|
||||
for _, dl := range dls {
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
if dl.Finished && time.Since(dl.FinishedTS) > time.Duration(time.Hour) {
|
||||
// do nothing
|
||||
} else {
|
||||
newDLs = append(newDLs, dl)
|
||||
}
|
||||
dl.mutex.Unlock()
|
||||
|
||||
}
|
||||
return newDLs
|
||||
}
|
||||
|
||||
// Queue queues a download
|
||||
func (dl *Download) Queue() {
|
||||
|
||||
dl.mutex.Lock()
|
||||
defer dl.mutex.Unlock()
|
||||
|
||||
dl.State = "queued"
|
||||
|
||||
}
|
||||
|
||||
func (dl *Download) Stop() {
|
||||
log.Printf("stopping the download")
|
||||
dl.mutex.Lock()
|
||||
defer dl.mutex.Unlock()
|
||||
|
||||
syscall.Kill(dl.Pid, syscall.SIGTERM)
|
||||
}
|
||||
|
||||
func (dl *Download) domain() string {
|
||||
|
||||
// note that we expect to already have the mutex locked by the caller
|
||||
url, err := url.Parse(dl.Url)
|
||||
if err != nil {
|
||||
log.Printf("Unknown domain for url: %s", dl.Url)
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
return url.Hostname()
|
||||
|
||||
}
|
||||
|
||||
// Begin starts a download, by starting the command specified in the DownloadProfile.
|
||||
// It blocks until the download is complete.
|
||||
func (dl *Download) Begin() {
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
dl.State = "downloading"
|
||||
cmdSlice := []string{}
|
||||
cmdSlice = append(cmdSlice, dl.DownloadProfile.Args...)
|
||||
@@ -112,6 +155,7 @@ func (dl *Download) Begin() {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Starting %v", cmd)
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
dl.State = "failed"
|
||||
@@ -124,6 +168,8 @@ func (dl *Download) Begin() {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
dl.mutex.Unlock()
|
||||
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -138,6 +184,8 @@ func (dl *Download) Begin() {
|
||||
wg.Wait()
|
||||
cmd.Wait()
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
dl.State = "complete"
|
||||
dl.Finished = true
|
||||
dl.FinishedTS = time.Now()
|
||||
@@ -146,6 +194,7 @@ func (dl *Download) Begin() {
|
||||
if dl.ExitCode != 0 {
|
||||
dl.State = "failed"
|
||||
}
|
||||
dl.mutex.Unlock()
|
||||
|
||||
}
|
||||
|
||||
@@ -164,9 +213,13 @@ func (dl *Download) updateDownload(r io.Reader) {
|
||||
continue
|
||||
}
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
// append the raw log
|
||||
dl.Log = append(dl.Log, l)
|
||||
|
||||
dl.mutex.Unlock()
|
||||
|
||||
// look for the percent and eta and other metadata
|
||||
dl.updateMetadata(l)
|
||||
}
|
||||
@@ -179,6 +232,10 @@ func (dl *Download) updateDownload(r io.Reader) {
|
||||
|
||||
func (dl *Download) updateMetadata(s string) {
|
||||
|
||||
dl.mutex.Lock()
|
||||
|
||||
defer dl.mutex.Unlock()
|
||||
|
||||
// [download] 49.7% of ~15.72MiB at 5.83MiB/s ETA 00:07
|
||||
etaRE := regexp.MustCompile(`download.+ETA +(\d\d:\d\d(?::\d\d)?)$`)
|
||||
matches := etaRE.FindStringSubmatch(s)
|
||||
|
||||
@@ -71,7 +71,7 @@ func TestQueue(t *testing.T) {
|
||||
new1 := Download{Id: 1, State: "queued", DownloadProfile: conf.DownloadProfiles[0], Config: conf}
|
||||
new2 := Download{Id: 2, State: "queued", DownloadProfile: conf.DownloadProfiles[0], Config: conf}
|
||||
new3 := Download{Id: 3, State: "queued", DownloadProfile: conf.DownloadProfiles[0], Config: conf}
|
||||
new4 := Download{Id: 4, State: "queued", DownloadProfile: conf.DownloadProfiles[0], Config: conf}
|
||||
new4 := Download{Id: 4, Url: "http://company.org/", State: "queued", DownloadProfile: conf.DownloadProfiles[0], Config: conf}
|
||||
|
||||
dls := Downloads{&new1, &new2, &new3, &new4}
|
||||
dls.StartQueued(1)
|
||||
@@ -81,6 +81,9 @@ func TestQueue(t *testing.T) {
|
||||
if dls[1].State != "queued" {
|
||||
t.Error("#2 is not queued")
|
||||
}
|
||||
if dls[3].State == "queued" {
|
||||
t.Error("#4 is not started")
|
||||
}
|
||||
|
||||
// this should start no more, as one is still going
|
||||
dls.StartQueued(1)
|
||||
@@ -93,4 +96,9 @@ func TestQueue(t *testing.T) {
|
||||
t.Error("#2 was not started but it should be")
|
||||
}
|
||||
|
||||
dls.StartQueued(2)
|
||||
if dls[3].State == "queued" {
|
||||
t.Error("#4 was not started but it should be")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user