From ef7c4f4120b0e22debca615f3627f26e57bfd26b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 24 Jul 2014 13:42:31 +1000 Subject: [PATCH] Move priority management entirely into the download strategies --- client.go | 44 +------------- download_strategies.go | 133 +++++++++++++++++++++++++++++++++++------ torrent.go | 39 ++++++------ 3 files changed, 139 insertions(+), 77 deletions(-) diff --git a/client.go b/client.go index 0567192b..093cbe01 100644 --- a/client.go +++ b/client.go @@ -19,7 +19,6 @@ import ( "bitbucket.org/anacrolix/go.torrent/dht" "bitbucket.org/anacrolix/go.torrent/util" "bufio" - "container/list" "crypto/rand" "crypto/sha1" "errors" @@ -64,30 +63,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { if !t.haveInfo() { return errors.New("missing metadata") } - newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize) - for len_ > 0 { - req, ok := t.offsetRequest(off) - if !ok { - return errors.New("bad offset") - } - reqOff := t.requestOffset(req) - // Gain the alignment adjustment. - len_ += off - reqOff - // Lose the length of this block. - len_ -= int64(req.Length) - off = reqOff + int64(req.Length) - if !t.wantPiece(int(req.Index)) { - continue - } - newPriorities = append(newPriorities, req) - } - if len(newPriorities) == 0 { - return nil - } - t.Priorities.PushFront(newPriorities[0]) - for _, req := range newPriorities[1:] { - t.Priorities.PushBack(req) - } + me.DownloadStrategy.TorrentPrioritize(t, off, len_) for _, cn := range t.Conns { me.replenishConnRequests(t, cn) } @@ -158,7 +134,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er err = io.EOF return } - t.lastReadPiece = int(index) piece := t.Pieces[index] pieceOff := pp.Integer(off % int64(t.PieceLength(0))) high := int(t.PieceLength(index) - pieceOff) @@ -1142,13 +1117,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er } // Unprioritize the chunk. - var next *list.Element - for e := t.Priorities.Front(); e != nil; e = next { - next = e.Next() - if e.Value.(request) == req { - t.Priorities.Remove(e) - } - } + me.DownloadStrategy.TorrentGotChunk(t, req) // Cancel pending requests for this chunk. cancelled := false @@ -1189,14 +1158,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { p.EverHashed = true if correct { p.PendingChunkSpecs = nil - // log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces()) - var next *list.Element - for e := t.Priorities.Front(); e != nil; e = next { - next = e.Next() - if e.Value.(request).Index == piece { - t.Priorities.Remove(e) - } - } + me.DownloadStrategy.TorrentGotPiece(t, int(piece)) me.dataReady(dataSpec{ t.InfoHash, request{ diff --git a/download_strategies.go b/download_strategies.go index 43bd1d2d..cd42ecbd 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -2,6 +2,7 @@ package torrent import ( pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" + "container/list" ) type DownloadStrategy interface { @@ -9,6 +10,9 @@ type DownloadStrategy interface { TorrentStarted(t *torrent) TorrentStopped(t *torrent) DeleteRequest(t *torrent, r request) + TorrentPrioritize(t *torrent, off, _len int64) + TorrentGotChunk(t *torrent, r request) + TorrentGotPiece(t *torrent, piece int) } type DefaultDownloadStrategy struct { @@ -45,11 +49,11 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { return } // First request prioritized chunks. - for e := t.Priorities.Front(); e != nil; e = e.Next() { - if !addRequest(e.Value.(request)) { - return - } - } + // for e := t.Priorities.Front(); e != nil; e = e.Next() { + // if !addRequest(e.Value.(request)) { + // return + // } + // } // Then finish off incomplete pieces in order of bytes remaining. for _, heatThreshold := range []int{1, 4, 15, 60} { for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { @@ -93,31 +97,68 @@ func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { m[r]-- } -type ResponsiveDownloadStrategy struct { +func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} +func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} +func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} + +func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy { + return &responsiveDownloadStrategy{ + Readahead: readahead, + lastReadPiece: make(map[*torrent]int), + priorities: make(map[*torrent]*list.List), + } +} + +type responsiveDownloadStrategy struct { // How many bytes to preemptively download starting at the beginning of // the last piece read for a given torrent. - Readahead int + Readahead int + lastReadPiece map[*torrent]int + priorities map[*torrent]*list.List } -func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {} -func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {} -func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {} +func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) { + me.priorities[t] = list.New() +} + +func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) { + delete(me.lastReadPiece, t) + delete(me.priorities, t) +} +func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {} -func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { - for e := t.Priorities.Front(); e != nil; e = e.Next() { +func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 { + return + } + + // Short circuit request fills at a level that might reduce receiving of + // unnecessary chunks. + requestWrapper := func(r request) bool { + if len(c.Requests) >= 64 { + return false + } + return c.Request(r) + } + + prios := me.priorities[t] + for e := prios.Front(); e != nil; e = e.Next() { req := e.Value.(request) if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { panic(req) } - if !c.Request(e.Value.(request)) { + if !requestWrapper(e.Value.(request)) { return } } - readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize() - for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ { - for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() { - if !c.Request(request{pp.Integer(i), cs}) { - return + + if lastReadPiece, ok := me.lastReadPiece[t]; ok { + readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize() + for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ { + for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() { + if !requestWrapper(request{pp.Integer(i), cs}) { + return + } } } } @@ -131,9 +172,63 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { // Request chunks in random order to reduce overlap with other // connections. for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() { - if !c.Request(request{pp.Integer(index), cs}) { + if !requestWrapper(request{pp.Integer(index), cs}) { return } } } } + +func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) { + prios := me.priorities[t] + var next *list.Element + for e := prios.Front(); e != nil; e = next { + next = e.Next() + if e.Value.(request) == req { + prios.Remove(e) + } + } +} + +func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) { + var next *list.Element + prios := me.priorities[t] + for e := prios.Front(); e != nil; e = next { + next = e.Next() + if int(e.Value.(request).Index) == piece { + prios.Remove(e) + } + } +} + +func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) { + newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize) + for _len > 0 { + req, ok := t.offsetRequest(off) + if !ok { + panic("bad offset") + } + reqOff := t.requestOffset(req) + // Gain the alignment adjustment. + _len += off - reqOff + // Lose the length of this block. + _len -= int64(req.Length) + off = reqOff + int64(req.Length) + if !t.wantPiece(int(req.Index)) { + continue + } + newPriorities = append(newPriorities, req) + } + if len(newPriorities) == 0 { + return + } + s.lastReadPiece[t] = int(newPriorities[0].Index) + if t.wantChunk(newPriorities[0]) { + s.priorities[t].PushFront(newPriorities[0]) + } + for _, req := range newPriorities[1:] { + if t.wantChunk(req) { + s.priorities[t].PushBack(req) + } + } +} diff --git a/torrent.go b/torrent.go index 7140652f..c6f26716 100644 --- a/torrent.go +++ b/torrent.go @@ -42,18 +42,16 @@ type torrent struct { PiecesByBytesLeft *OrderedList Data mmap_span.MMapSpan // Prevent mutations to Data memory maps while in use as they're not safe. - dataLock sync.RWMutex - Info *metainfo.Info - Conns []*connection - Peers []Peer - Priorities *list.List + dataLock sync.RWMutex + Info *metainfo.Info + Conns []*connection + Peers []Peer // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key. - Trackers [][]tracker.Client - lastReadPiece int - DisplayName string - MetaData []byte - metadataHave []bool + Trackers [][]tracker.Client + DisplayName string + MetaData []byte + metadataHave []bool } func (t *torrent) InvalidateMetadata() { @@ -124,7 +122,6 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index) t.pendAllChunkSpecs(pp.Integer(index)) } - t.Priorities = list.New() for _, conn := range t.Conns { if err := conn.setNumPieces(t.NumPieces()); err != nil { log.Printf("closing connection: %s", err) @@ -211,12 +208,12 @@ func (t *torrent) WriteStatus(w io.Writer) { fmt.Fprintf(w, "%c", t.pieceStatusChar(index)) } fmt.Fprintln(w) - fmt.Fprintln(w, "Priorities: ") - if t.Priorities != nil { - for e := t.Priorities.Front(); e != nil; e = e.Next() { - fmt.Fprintf(w, "\t%v\n", e.Value) - } - } + // fmt.Fprintln(w, "Priorities: ") + // if t.Priorities != nil { + // for e := t.Priorities.Front(); e != nil; e = e.Next() { + // fmt.Fprintf(w, "\t%v\n", e.Value) + // } + // } fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers)) for _, c := range t.Conns { c.WriteStatus(w) @@ -419,6 +416,14 @@ func (me *torrent) haveAnyPieces() bool { return false } +func (t *torrent) wantChunk(r request) bool { + if !t.wantPiece(int(r.Index)) { + return false + } + _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec] + return ok +} + func (t *torrent) wantPiece(index int) bool { if !t.haveInfo() { return false -- 2.48.1