From 6942eed15f46be93a83a7f017040aeda72d5b92c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 15 Sep 2014 03:25:53 +1000 Subject: [PATCH] Lazily hash pieces where possible --- client.go | 40 +++++++++++++++++++++++++++++----------- download_strategies.go | 28 +++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index 115a9904..39dc0a1e 100644 --- a/client.go +++ b/client.go @@ -1067,16 +1067,20 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e if err != nil { return } - // Queue all pieces for hashing. This is done sequentially to avoid - // spamming goroutines. - for _, p := range t.Pieces { - p.QueuedForHash = true - } - go func() { - for i := range t.Pieces { - cl.verifyPiece(t, pp.Integer(i)) + // If the client intends to upload, it needs to know what state pieces are + // in. + if !cl.noUpload { + // Queue all pieces for hashing. This is done sequentially to avoid + // spamming goroutines. + for _, p := range t.Pieces { + p.QueuedForHash = true } - }() + go func() { + for i := range t.Pieces { + cl.verifyPiece(t, pp.Integer(i)) + } + }() + } cl.downloadStrategy.TorrentStarted(t) return @@ -1313,7 +1317,17 @@ newAnnounce: func (cl *Client) allTorrentsCompleted() bool { for _, t := range cl.torrents { - if !t.haveAllPieces() { + if !t.haveInfo() { + return false + } + for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { + i := e.Value.(int) + if t.Pieces[i].Complete() { + continue + } + // If the piece isn't complete, make sure it's not because it's + // never been hashed. + cl.queueFirstHash(t, i) return false } } @@ -1358,7 +1372,11 @@ func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - me.downloadStrategy.FillRequests(t, c) + for _, p := range me.downloadStrategy.FillRequests(t, c) { + // Make sure the state of pieces that would have been requested is + // known. + me.queueFirstHash(t, p) + } //me.assertRequestHeat() if len(c.Requests) == 0 && !c.PeerChoked { c.SetInterested(false) diff --git a/download_strategies.go b/download_strategies.go index 569ea381..bc728759 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -10,7 +10,10 @@ import ( ) type DownloadStrategy interface { - FillRequests(*torrent, *connection) + // Tops up the outgoing pending requests. Returns the indices of pieces + // that would be requested. This is used to determine if pieces require + // hashing so the completed state is known. + FillRequests(*torrent, *connection) (pieces []int) TorrentStarted(*torrent) TorrentStopped(*torrent) DeleteRequest(*torrent, request) @@ -33,7 +36,7 @@ func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) { func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {} -func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { +func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { if c.Interested { if c.PeerChoked { return @@ -66,6 +69,11 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { for _, heatThreshold := range []int{1, 4, 15, 60} { for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { pieceIndex := pp.Integer(e.Value.(int)) + piece := t.Pieces[pieceIndex] + if !piece.EverHashed { + pieces = append(pieces, int(pieceIndex)) + return + } for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { r := request{pieceIndex, chunkSpec} if th[r] >= heatThreshold { @@ -77,6 +85,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { } } } + return } func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { @@ -160,10 +169,14 @@ type requestFiller struct { c *connection t *torrent s *responsiveDownloadStrategy + + // The set of pieces that were considered for requesting. + pieces map[int]struct{} } // Wrapper around connection.request that tracks request heat. func (me *requestFiller) request(req request) bool { + me.pieces[int(req.Index)] = struct{}{} if me.c.RequestPending(req) { return true } @@ -265,7 +278,7 @@ func (me *requestFiller) pendingReadaheadChunks() (ret []request) { } ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize) for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ { - if !t.wantPiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) { + if t.havePiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) { continue } for cs := range t.Pieces[pi].PendingChunkSpecs { @@ -316,8 +329,13 @@ func (me *requestFiller) readahead() bool { return true } -func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { - (requestFiller{c, t, me}).Run() +func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { + rf := requestFiller{c, t, me, make(map[int]struct{}, t.NumPieces())} + rf.Run() + for p := range rf.pieces { + pieces = append(pieces, p) + } + return } func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) { -- 2.48.1