馃П Chunk is a download manager for slow and unstable servers
at main 5.6 kB view raw
1package chunk 2 3import ( 4 "crypto/md5" 5 "encoding/gob" 6 "fmt" 7 "log/slog" 8 "os" 9 "path/filepath" 10 "sync" 11 "sync/atomic" 12 13 gap "github.com/muesli/go-app-paths" 14) 15 16// DefaultChunkDir is the directory where Chunk keeps track of each chunk 17// downloaded of each file. It us created under the user's cache directory 18// by default. 19// It can be replaced by a full path in the environment variable CHUNK_DIR. 20const DefaultChunkDir = "chunk" 21 22// get the chunk directory under user's home directory or using the envvar 23func getChunkProgressDir(dir string) (string, error) { 24 if dir == "" { 25 dir = os.Getenv("CHUNK_DIR") 26 } 27 if dir == "" { 28 c, err := gap.NewScope(gap.User, "app").CacheDir() 29 if err != nil { 30 return "", fmt.Errorf("error getting user home directory: %w", err) 31 } 32 dir = filepath.Join(c, DefaultChunkDir) 33 } 34 if err := os.MkdirAll(dir, 0755); err != nil { 35 return "", fmt.Errorf("could not create chunk's directory %s: %w", dir, err) 36 } 37 return dir, nil 38} 39 40type progress struct { 41 // path for persistence of this progress file 42 path string 43 lock sync.Mutex 44 45 // download fields so the encoder/decoder has access to them 46 URL string 47 Path string 48 ChunkSize int64 49 Chunks []int64 50} 51 52// it loads a download progress from a file if restart is set to false 53func (p *progress) load(restart bool) error { 54 p.lock.Lock() 55 defer p.lock.Unlock() 56 if restart { 57 err := os.Remove(p.path) 58 if err != nil && !os.IsNotExist(err) { 59 return fmt.Errorf("could not delete %s: %w", p.path, err) 60 } 61 return nil 62 } 63 f, err := os.Open(p.path) 64 if os.IsNotExist(err) { 65 return nil 66 } 67 if err != nil { 68 return fmt.Errorf("error opening %s: %w", p.path, err) 69 } 70 defer func() { 71 if err := f.Close(); err != nil { 72 slog.Warn("error closing progress file", "path", p.path, "error", err) 73 } 74 }() 75 d := gob.NewDecoder(f) 76 var got progress 77 if err := d.Decode(&got); err != nil { 78 return fmt.Errorf("error decoding progress file %s: %w", p.path, err) 79 } 80 if got.URL != p.URL { 81 return fmt.Errorf("download progress file %s has unexpected url %s, expected %s", p.path, got.URL, p.URL) 82 } 83 if got.Path != p.Path { 84 return fmt.Errorf("download progress file %s has unexpected path %s, expected %s", p.path, got.Path, p.Path) 85 } 86 if got.ChunkSize != p.ChunkSize { 87 return fmt.Errorf("download progress file %s has unexpected chunk size %d, expected %d", p.path, got.ChunkSize, p.ChunkSize) 88 } 89 if len(got.Chunks) != len(p.Chunks) { 90 return fmt.Errorf("download progress file %s has unexpected number of chunks %d, expected %d", p.path, len(got.Chunks), len(p.Chunks)) 91 } 92 p.Chunks = got.Chunks 93 return nil 94} 95 96// checks if `idx` is a valid index for `p.Chunks` array 97func (p *progress) isValidIndex(idx int) bool { return idx >= 0 && idx < len(p.Chunks) } 98 99// checks if the chunk number `idx` should be downloaded (ie is not downloaded yet) 100func (p *progress) shouldDownload(idx int) (bool, error) { 101 if !p.isValidIndex(idx) { 102 return false, fmt.Errorf("%s does not have chunk #%d", p.Path, idx+1) 103 } 104 return p.Chunks[idx] == 0, nil 105} 106 107// marks the chunk number `idx` as done (ie successfully downloaded) 108func (p *progress) done(idx int, bytes int64) error { 109 if !p.isValidIndex(idx) { 110 return fmt.Errorf("%s does not have chunk #%d", p.Path, idx+1) 111 } 112 p.lock.Lock() 113 defer p.lock.Unlock() 114 atomic.StoreInt64(&p.Chunks[idx], bytes) 115 f, err := os.Create(p.path) 116 if err != nil { 117 return fmt.Errorf("error opening progress file %s: %w", p.path, err) 118 } 119 defer func() { 120 if err := f.Close(); err != nil { 121 slog.Warn("error closing progress file", "path", p.path, "error", err) 122 } 123 }() 124 e := gob.NewEncoder(f) 125 if err := e.Encode(p); err != nil { 126 return fmt.Errorf("error encoding progress file %s: %w", p.path, err) 127 } 128 return nil 129} 130 131// check is all the chunks of the current download are done 132func (p *progress) isDone() (bool, error) { 133 for idx := range p.Chunks { 134 s, err := p.shouldDownload(idx) 135 if err != nil { 136 return false, fmt.Errorf("error checking if chunk #%d for %s done: %w", idx+1, p.URL, err) 137 } 138 if s { 139 return false, nil 140 } 141 } 142 return true, nil 143} 144 145// removes this progress file it the download is done 146func (p *progress) close() error { 147 done, err := p.isDone() 148 if err != nil { 149 return fmt.Errorf("error checking if %s is done: %w", p.URL, err) 150 } 151 if done { 152 if err := os.Remove(p.path); err != nil && !os.IsNotExist(err) { 153 return fmt.Errorf("error cleaning up progress file %s: %w", p.path, err) 154 } 155 } 156 return nil // Either not empty or error, suits both cases 157} 158 159// calculates the number of bytes downloaded 160func (p *progress) downloadedBytes() int64 { 161 var downloaded int64 162 for _, c := range p.Chunks { 163 downloaded += c 164 } 165 return downloaded 166} 167 168func newProgress(path, dir string, url string, chunkSize int64, chunks int, restart bool) (*progress, error) { 169 dir, err := getChunkProgressDir(dir) 170 if err != nil { 171 return nil, fmt.Errorf("could not get chunk's directory: %w", err) 172 } 173 abs, err := filepath.Abs(path) 174 if err != nil { 175 return nil, fmt.Errorf("could not get the download absolute path: %w", err) 176 } 177 hash := fmt.Sprintf("%x", md5.Sum(fmt.Appendf(nil, "%s|%s", url, abs))) 178 179 // file name is a hash of the URL and local file path, plus the file name 180 // in an human-readable way for debugging purposes 181 name := fmt.Sprintf("%s-%s", hash, filepath.Base(path)) 182 p := progress{ 183 path: filepath.Join(dir, name), 184 URL: url, 185 Path: abs, 186 ChunkSize: chunkSize, 187 Chunks: make([]int64, chunks), 188 } 189 if err := p.load(restart); err != nil { 190 return nil, fmt.Errorf("error loading existing progress file: %w", err) 191 } 192 return &p, nil 193}