From eb070383b1a81f8ab46d3db0a5d128413bb34b9a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 30 Jun 2014 00:22:05 +1000 Subject: [PATCH] Move download strategies into their own file --- client.go | 126 +--------------------------------------- client_test.go | 4 +- download_strategies.go | 129 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 128 deletions(-) create mode 100644 download_strategies.go diff --git a/client.go b/client.go index 77e9dce1..c0df7ffb 100644 --- a/client.go +++ b/client.go @@ -971,7 +971,7 @@ func (cl *Client) allTorrentsCompleted() bool { } // Returns true when all torrents are completely downloaded and false if the -// client is stopped. +// client is stopped before that. func (me *Client) WaitAll() bool { me.mu.Lock() defer me.mu.Unlock() @@ -984,17 +984,6 @@ func (me *Client) WaitAll() bool { return true } -type DownloadStrategy interface { - FillRequests(t *torrent, c *connection) - TorrentStarted(t *torrent) - TorrentStopped(t *torrent) - DeleteRequest(t *torrent, r request) -} - -type DefaultDownloadStrategy struct { - heat map[*torrent]map[request]int -} - func (cl *Client) assertRequestHeat() { dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy) if !ok { @@ -1015,119 +1004,6 @@ func (cl *Client) assertRequestHeat() { } } -func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { - th := s.heat[t] - addRequest := func(req request) (again bool) { - piece := t.Pieces[req.Index] - if piece.Hashing || piece.QueuedForHash { - // We can't be sure we want this. - return true - } - if piece.Complete() { - // We already have this. - return true - } - if c.RequestPending(req) { - return true - } - again = c.Request(req) - if c.RequestPending(req) { - th[req]++ - } - return - } - // First request prioritized chunks. - for e := t.Priorities.Front(); e != nil; e = e.Next() { - if !addRequest(e.Value.(request)) { - return - } - } - ppbs := t.piecesByPendingBytes() - // Then finish off incomplete pieces in order of bytes remaining. - for _, heatThreshold := range []int{0, 1, 4, 100} { - for _, pieceIndex := range ppbs { - for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { - r := request{pieceIndex, chunkSpec} - if th[r] > heatThreshold { - continue - } - if !addRequest(request{pieceIndex, chunkSpec}) { - return - } - } - } - } -} - -func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { - if s.heat[t] != nil { - panic("torrent already started") - } - if s.heat == nil { - s.heat = make(map[*torrent]map[request]int, 10) - } - s.heat[t] = make(map[request]int, t.ChunkCount()) -} - -func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { - if _, ok := s.heat[t]; !ok { - panic("torrent not yet started") - } - delete(s.heat, t) -} - -func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { - m := s.heat[t] - if m[r] <= 0 { - panic("no pending requests") - } - m[r]-- -} - -type ResponsiveDownloadStrategy struct { - // How many bytes to preemptively download starting at the beginning of - // the last piece read for a given torrent. - Readahead int -} - -func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {} -func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {} -func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {} - -func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { - for e := t.Priorities.Front(); e != nil; e = e.Next() { - req := e.Value.(request) - if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { - panic(req) - } - if !c.Request(e.Value.(request)) { - return - } - } - readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize() - for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ { - for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() { - if !c.Request(request{pp.Integer(i), cs}) { - return - } - } - } - // Then finish off incomplete pieces in order of bytes remaining. - for _, index := range t.piecesByPendingBytes() { - // Stop when we're onto untouched pieces. - if !t.PiecePartiallyDownloaded(int(index)) { - break - } - // Request chunks in random order to reduce overlap with other - // connections. - for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() { - if !c.Request(request{index, cs}) { - return - } - } - } -} - func (me *Client) replenishConnRequests(t *torrent, c *connection) { me.DownloadStrategy.FillRequests(t, c) me.assertRequestHeat() diff --git a/client_test.go b/client_test.go index 3c8bc9f0..d3dd0156 100644 --- a/client_test.go +++ b/client_test.go @@ -1,11 +1,9 @@ package torrent import ( + "bitbucket.org/anacrolix/go.torrent/testutil" "github.com/anacrolix/libtorgo/bencode" "os" - - "bitbucket.org/anacrolix/go.torrent/testutil" - "testing" ) diff --git a/download_strategies.go b/download_strategies.go new file mode 100644 index 00000000..2cc7289e --- /dev/null +++ b/download_strategies.go @@ -0,0 +1,129 @@ +package torrent + +import ( + pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" +) + +type DownloadStrategy interface { + FillRequests(t *torrent, c *connection) + TorrentStarted(t *torrent) + TorrentStopped(t *torrent) + DeleteRequest(t *torrent, r request) +} + +type DefaultDownloadStrategy struct { + heat map[*torrent]map[request]int +} + +func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { + th := s.heat[t] + addRequest := func(req request) (again bool) { + piece := t.Pieces[req.Index] + if piece.Hashing || piece.QueuedForHash { + // We can't be sure we want this. + return true + } + if piece.Complete() { + // We already have this. + return true + } + if c.RequestPending(req) { + return true + } + again = c.Request(req) + if c.RequestPending(req) { + th[req]++ + } + return + } + // First request prioritized chunks. + for e := t.Priorities.Front(); e != nil; e = e.Next() { + if !addRequest(e.Value.(request)) { + return + } + } + ppbs := t.piecesByPendingBytes() + // Then finish off incomplete pieces in order of bytes remaining. + for _, heatThreshold := range []int{0, 4, 60} { + for _, pieceIndex := range ppbs { + for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { + r := request{pieceIndex, chunkSpec} + if th[r] > heatThreshold { + continue + } + if !addRequest(request{pieceIndex, chunkSpec}) { + return + } + } + } + } +} + +func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { + if s.heat[t] != nil { + panic("torrent already started") + } + if s.heat == nil { + s.heat = make(map[*torrent]map[request]int, 10) + } + s.heat[t] = make(map[request]int, t.ChunkCount()) +} + +func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { + if _, ok := s.heat[t]; !ok { + panic("torrent not yet started") + } + delete(s.heat, t) +} + +func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { + m := s.heat[t] + if m[r] <= 0 { + panic("no pending requests") + } + m[r]-- +} + +type ResponsiveDownloadStrategy struct { + // How many bytes to preemptively download starting at the beginning of + // the last piece read for a given torrent. + Readahead int +} + +func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {} +func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {} +func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {} + +func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + for e := t.Priorities.Front(); e != nil; e = e.Next() { + req := e.Value.(request) + if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { + panic(req) + } + if !c.Request(e.Value.(request)) { + return + } + } + readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize() + for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ { + for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() { + if !c.Request(request{pp.Integer(i), cs}) { + return + } + } + } + // Then finish off incomplete pieces in order of bytes remaining. + for _, index := range t.piecesByPendingBytes() { + // Stop when we're onto untouched pieces. + if !t.PiecePartiallyDownloaded(int(index)) { + break + } + // Request chunks in random order to reduce overlap with other + // connections. + for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() { + if !c.Request(request{index, cs}) { + return + } + } + } +} -- 2.48.1