From 03b1af07cd18f4e1c4fe0bce75138b72ce6c38cb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 15 Oct 2013 19:42:30 +1100 Subject: [PATCH] Always finish off partially downloaded pieces --- client.go | 89 +++++++++++++++++++++++++++++++++---------- cmd/torrentfs/main.go | 1 + 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 5509da6f..f6d07a7b 100644 --- a/client.go +++ b/client.go @@ -16,6 +16,7 @@ import ( "net" "os" "path/filepath" + "sort" ) const ( @@ -56,6 +57,13 @@ type piece struct { PendingChunkSpecs map[ChunkSpec]struct{} } +func (p piece) NumPendingBytes() (count peer_protocol.Integer) { + for cs, _ := range p.PendingChunkSpecs { + count += cs.Length + } + return +} + type ChunkSpec struct { Begin, Length peer_protocol.Integer } @@ -192,6 +200,35 @@ type Torrent struct { Priorities *list.List } +type pieceByBytesPendingSlice struct { + Torrent *Torrent + Indices []peer_protocol.Integer +} + +func (pcs pieceByBytesPendingSlice) Len() int { + return len(pcs.Indices) +} + +func (me pieceByBytesPendingSlice) Less(i, j int) bool { + return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes() +} + +func (me pieceByBytesPendingSlice) Swap(i, j int) { + me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i] +} + +func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { + slice := pieceByBytesPendingSlice{ + Torrent: t, + Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)), + } + for i := range t.Pieces { + slice.Indices = append(slice.Indices, peer_protocol.Integer(i)) + } + sort.Sort(sort.Reverse(slice)) + return slice.Indices +} + func (t *Torrent) PrioritizeDataRegion(off, len_ int64) { newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) for len_ > 0 { @@ -363,7 +400,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er n, err = t.Data.ReadAt(p, off) }) <-done - log.Println(n, err) return } @@ -460,7 +496,10 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { return } log.Printf("connected to %s", conn.RemoteAddr()) - me.handshake(conn, torrent, peer.Id) + err = me.handshake(conn, torrent, peer.Id) + if err != nil { + log.Print(err) + } }() } @@ -473,7 +512,7 @@ func (me *Torrent) haveAnyPieces() bool { return false } -func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { +func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) { conn := &connection{ Socket: sock, Choked: true, @@ -491,7 +530,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { conn.post <- peer_protocol.Bytes(me.PeerId[:]) } var b [28]byte - _, err := io.ReadFull(conn.Socket, b[:]) + _, err = io.ReadFull(conn.Socket, b[:]) switch err { case nil: case io.EOF: @@ -541,9 +580,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { me.dropConnection(torrent, conn) }) err = me.runConnection(torrent, conn) - if err != nil { - log.Print(err) - } + return } func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) { @@ -584,6 +621,7 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error { switch msg.Type { case peer_protocol.Choke: conn.PeerChoked = true + conn.Requests = nil case peer_protocol.Unchoke: conn.PeerChoked = false me.peerUnchoked(torrent, conn) @@ -722,26 +760,33 @@ func (me *Client) withContext(f func()) { func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { requestHeatMap := torrent.requestHeat() - if torrent.Priorities == nil { - return - } - for e := torrent.Priorities.Front(); e != nil; e = e.Next() { - req := e.Value.(Request) + addRequest := func(req Request) (again bool) { if !conn.PeerPieces[req.Index] { - continue + return true } - switch torrent.Pieces[req.Index].State { - case pieceStateUnknown: - continue - case pieceStateIncomplete: - default: - panic("prioritized chunk for invalid piece state") + if torrent.Pieces[req.Index].State != pieceStateIncomplete { + return true } if requestHeatMap[req] > 0 { + return true + } + return conn.Request(req) + } + if torrent.Priorities != nil { + for e := torrent.Priorities.Front(); e != nil; e = e.Next() { + if !addRequest(e.Value.(Request)) { + return + } + } + } + for _, index := range torrent.piecesByPendingBytesDesc() { + if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) { continue } - if !conn.Request(req) { - break + for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs { + if !addRequest(Request{index, chunkSpec}) { + return + } } } } @@ -752,6 +797,7 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) log.Printf("got unnecessary chunk: %s", request) return } + log.Printf("got chunk %s", request) err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece) if err != nil { return @@ -816,6 +862,7 @@ func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))}, }, }) + torrent.Pieces[piece].PendingChunkSpecs = nil } for _, conn := range torrent.Conns { if correct { diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 8689285e..9ea5ab5b 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -118,6 +118,7 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data))) for { n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data) + // log.Println(torrentOff, len(data), n, err) switch err { case nil: resp.Data = data[:n] -- 2.48.1