From 04879b0002cb90f805059412e2a1006beed81468 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 21 May 2014 17:40:54 +1000 Subject: [PATCH] Add Client.DownloadStrategy; fix priority of incomplete pieces --- client.go | 93 +++++++++++++++++++++++++++++++------------ cmd/torrentfs/main.go | 5 ++- fs/torrentfs_test.go | 3 +- torrent.go | 2 +- 4 files changed, 73 insertions(+), 30 deletions(-) diff --git a/client.go b/client.go index 3e027787..2ae3b832 100644 --- a/client.go +++ b/client.go @@ -94,11 +94,12 @@ type dataSpec struct { } type Client struct { - DataDir string - HalfOpenLimit int - PeerId [20]byte - Listener net.Listener - DisableTrackers bool + DataDir string + HalfOpenLimit int + PeerId [20]byte + Listener net.Listener + DisableTrackers bool + DownloadStrategy DownloadStrategy mu sync.Mutex event sync.Cond @@ -173,6 +174,9 @@ func (c *Client) Start() { panic("error generating peer id") } c.quit = make(chan struct{}) + if c.DownloadStrategy == nil { + c.DownloadStrategy = DefaultDownloadStrategy{} + } if c.Listener != nil { go c.acceptConnections() } @@ -672,7 +676,13 @@ func (me *Client) WaitAll() bool { return true } -func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) { +type DownloadStrategy interface { + FillRequests(t *torrent, c *connection) +} + +type DefaultDownloadStrategy struct{} + +func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) { requestHeatMap := torrent.requestHeat() addRequest := func(req request) (again bool) { piece := torrent.Pieces[req.Index] @@ -697,44 +707,75 @@ func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) { } } // Then finish off incomplete pieces in order of bytes remaining. - for _, index := range torrent.piecesByPendingBytesDesc() { - if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) { - continue - } + for _, index := range torrent.piecesByPendingBytes() { for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs { if !addRequest(request{index, chunkSpec}) { return } } } - if len(conn.Requests) == 0 { - conn.SetInterested(false) +} + +type ResponsiveDownloadStrategy struct{} + +func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + for e := t.Priorities.Front(); e != nil; e = e.Next() { + if !c.Request(e.Value.(request)) { + break + } } } -func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) { - req := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} - if _, ok := torrent.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { - log.Printf("got unnecessary chunk: %s", req) - return +func (me *Client) replenishConnRequests(t *torrent, c *connection) { + me.DownloadStrategy.FillRequests(t, c) + if len(c.Requests) == 0 { + c.SetInterested(false) } - err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece) +} + +func (me *Client) downloadedChunk(t *torrent, c *connection, msg *peer_protocol.Message) error { + req := newRequest(msg.Index, msg.Begin, peer_protocol.Integer(len(msg.Piece))) + + // Request has been satisfied. + delete(c.Requests, req) + + defer me.replenishConnRequests(t, c) + + // Do we actually want this chunk? + if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { + return nil + } + + // Write the chunk out. + err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece) if err != nil { - return + return err + } + me.dataReady(dataSpec{t.InfoHash, req}) + + // Cancel pending requests for this chunk. + for _, c := range t.Conns { + if c.Cancel(req) { + me.replenishConnRequests(t, c) + } } - delete(torrent.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) - if len(torrent.Pieces[req.Index].PendingChunkSpecs) == 0 { - me.queuePieceCheck(torrent, req.Index) + + // Record that we have the chunk. + delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) + if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { + me.queuePieceCheck(t, req.Index) } + + // Unprioritize the chunk. var next *list.Element - for e := torrent.Priorities.Front(); e != nil; e = next { + for e := t.Priorities.Front(); e != nil; e = next { next = e.Next() if e.Value.(request) == req { - torrent.Priorities.Remove(e) + t.Priorities.Remove(e) } } - me.dataReady(dataSpec{torrent.InfoHash, req}) - return + + return nil } func (cl *Client) dataReady(ds dataSpec) { diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 5233b22b..8ffeee73 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -131,8 +131,9 @@ func main() { setSignalHandlers() defer conn.Close() client := &torrent.Client{ - DataDir: downloadDir, - DisableTrackers: *disableTrackers, + DataDir: downloadDir, + DisableTrackers: *disableTrackers, + DownloadStrategy: torrent.ResponsiveDownloadStrategy{}, } client.Start() addTorrentDir(client, torrentPath) diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index a1ae3f82..47662c25 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -138,7 +138,8 @@ func TestDownloadOnDemand(t *testing.T) { defer seeder.Stop() seeder.AddTorrent(layout.Metainfo) leecher := torrent.Client{ - DataDir: filepath.Join(layout.BaseDir, "download"), + DataDir: filepath.Join(layout.BaseDir, "download"), + DownloadStrategy: torrent.ResponsiveDownloadStrategy{}, } leecher.Start() defer leecher.Stop() diff --git a/torrent.go b/torrent.go index c747986e..08bce0b8 100644 --- a/torrent.go +++ b/torrent.go @@ -66,7 +66,7 @@ func (t *torrent) Close() (err error) { return } -func (t *torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { +func (t *torrent) piecesByPendingBytes() (indices []peer_protocol.Integer) { slice := pieceByBytesPendingSlice{ Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)), Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)), -- 2.48.1