]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Piece priorities, torrent read interface and many fixes
authorMatt Joiner <anacrolix@gmail.com>
Wed, 3 Dec 2014 07:07:50 +0000 (01:07 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 3 Dec 2014 07:07:50 +0000 (01:07 -0600)
client.go
client_test.go
connection.go
fs/torrentfs.go
misc.go
torrent.go
worst_conns.go

index 7b3a8967513fddae41f0894bb7769e5347fe4e9e..26dc55a4625589b71588d531471798f6367e584c 100644 (file)
--- a/client.go
+++ b/client.go
@@ -243,15 +243,10 @@ func (cl *Client) WriteStatus(_w io.Writer) {
 
 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
 // isn't available.
-func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
-       cl.mu.RLock()
-       defer cl.mu.RUnlock()
-       t := cl.torrent(ih)
-       if t == nil {
-               err = errors.New("unknown torrent")
-               return
-       }
-       index := pp.Integer(off / int64(t.UsualPieceSize()))
+func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       index := int(off / int64(t.UsualPieceSize()))
        // Reading outside the bounds of a file is an error.
        if index < 0 {
                err = os.ErrInvalid
@@ -263,7 +258,7 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
        }
        piece := t.Pieces[index]
        pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
-       pieceLeft := int(t.PieceLength(index) - pieceOff)
+       pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
        if pieceLeft <= 0 {
                err = io.EOF
                return
@@ -271,24 +266,16 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
        if len(p) > pieceLeft {
                p = p[:pieceLeft]
        }
-       for cs, _ := range piece.PendingChunkSpecs {
-               chunkOff := int64(pieceOff) - int64(cs.Begin)
-               if chunkOff >= int64(t.PieceLength(index)) {
-                       panic(chunkOff)
-               }
-               if 0 <= chunkOff && chunkOff < int64(cs.Length) {
-                       // read begins in a pending chunk
-                       err = ErrDataNotReady
-                       return
-               }
-               // pending chunk caps available data
-               if chunkOff < 0 && int64(len(p)) > -chunkOff {
-                       p = p[:-chunkOff]
-               }
-       }
        if len(p) == 0 {
                panic(len(p))
        }
+       cl.prioritizePiece(t, index, piecePriorityHigh)
+       for i := index + 1; i < index+7 && i < t.NumPieces(); i++ {
+               cl.prioritizePiece(t, i, piecePriorityNormal)
+       }
+       for !piece.Complete() {
+               piece.Event.Wait()
+       }
        return t.Data.ReadAt(p, off)
 }
 
@@ -296,6 +283,30 @@ func (cl *Client) configDir() string {
        return filepath.Join(os.Getenv("HOME"), ".config/torrent")
 }
 
+func (cl *Client) ConfigDir() string {
+       return cl.configDir()
+}
+
+func (t *torrent) connPendPiece(c *connection, piece int) {
+       c.pendPiece(piece, t.Pieces[piece].Priority)
+}
+
+func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
+       if t.havePiece(piece) {
+               return
+       }
+       cl.queueFirstHash(t, piece)
+       t.Pieces[piece].Priority = priority
+       if t.wantPiece(piece) {
+               for _, c := range t.Conns {
+                       if c.PeerHasPiece(pp.Integer(piece)) {
+                               t.connPendPiece(c, piece)
+                               cl.replenishConnRequests(t, c)
+                       }
+               }
+       }
+}
+
 func (cl *Client) setEnvBlocklist() (err error) {
        filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
        defaultBlocklist := filename == ""
@@ -915,7 +926,7 @@ func (t *torrent) initRequestOrdering(c *connection) {
                if !t.wantPiece(i) {
                        continue
                }
-               c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i])
+               t.connPendPiece(c, i)
        }
 }
 
@@ -931,7 +942,7 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
        }
        c.PeerPieces[piece] = true
        if t.wantPiece(piece) {
-               c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece])
+               t.connPendPiece(c, piece)
                me.replenishConnRequests(t, c)
        }
 }
@@ -1318,6 +1329,9 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
                return false
        default:
        }
+       if !me.wantConns(t) {
+               return false
+       }
        for _, c0 := range t.Conns {
                if c.PeerID == c0.PeerID {
                        // Already connected to a client with that ID.
@@ -1334,21 +1348,60 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
        return true
 }
 
+func (t *torrent) needData() bool {
+       if !t.haveInfo() {
+               return true
+       }
+       for i := range t.Pieces {
+               if t.wantPiece(i) {
+                       return true
+               }
+       }
+       return false
+}
+
+// TODO: I'm sure there's something here to do with seeding.
+func (t *torrent) badConn(c *connection) bool {
+       if time.Now().Sub(c.completedHandshake) < 30*time.Second {
+               return false
+       }
+       if !t.haveInfo() {
+               return !c.supportsExtension("ut_metadata")
+       }
+       return !t.connHasWantedPieces(c)
+}
+
+func (t *torrent) numGoodConns() (num int) {
+       for _, c := range t.Conns {
+               if !t.badConn(c) {
+                       num++
+               }
+       }
+       return
+}
+
+func (me *Client) wantConns(t *torrent) bool {
+       if !t.needData() && me.noUpload {
+               return false
+       }
+       if t.numGoodConns() >= socketsPerTorrent {
+               return false
+       }
+       return true
+}
+
 func (me *Client) openNewConns(t *torrent) {
        select {
        case <-t.ceasingNetworking:
                return
        default:
        }
-       if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
-               return
-       }
        for len(t.Peers) != 0 {
-               if len(t.Conns) >= socketsPerTorrent {
-                       break
+               if !me.wantConns(t) {
+                       return
                }
                if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
-                       break
+                       return
                }
                var (
                        k peersKey
@@ -1414,7 +1467,7 @@ func (cl *Client) saveTorrentFile(t *torrent) error {
 }
 
 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
-       err = t.setMetadata(md, cl.dataDir, bytes)
+       err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu)
        if err != nil {
                return
        }
@@ -1527,19 +1580,16 @@ func (t Torrent) AddPeers(pp []Peer) error {
 func (t Torrent) DownloadAll() {
        t.cl.mu.Lock()
        for i := 0; i < t.NumPieces(); i++ {
-               t.cl.queueFirstHash(t.torrent, i)
+               // TODO: Leave higher priorities as they were?
+               t.cl.prioritizePiece(t.torrent, i, piecePriorityNormal)
        }
+       t.cl.prioritizePiece(t.torrent, 0, piecePriorityHigh)
+       t.cl.prioritizePiece(t.torrent, t.NumPieces()-1, piecePriorityHigh)
        t.cl.mu.Unlock()
 }
 
 func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
-       err = me.cl.PrioritizeDataRegion(me.InfoHash, off, int64(len(p)))
-       if err != nil {
-               err = fmt.Errorf("error prioritizing: %s", err)
-               return
-       }
-       <-me.cl.DataWaiter(me.InfoHash, off)
-       return me.cl.TorrentReadAt(me.InfoHash, off, p)
+       return me.cl.torrentReadAt(me.torrent, off, p)
 }
 
 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
@@ -1799,7 +1849,7 @@ func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest,
                        go func(tr tracker.Client) {
                                err := cl.announceTorrentSingleTracker(tr, req, t)
                                if err != nil {
-                                       log.Printf("error announcing to %s: %s", tr, err)
+                                       log.Printf("error announcing %q to %s: %s", t, tr, err)
                                }
                                oks <- err == nil
                        }(tr)
@@ -2004,7 +2054,9 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        }
        p.EverHashed = true
        if correct {
+               p.Priority = piecePriorityNone
                p.PendingChunkSpecs = nil
+               p.Event.Broadcast()
                me.downloadStrategy.TorrentGotPiece(t, int(piece))
                me.dataReady(t, request{
                        pp.Integer(piece),
@@ -2014,6 +2066,9 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                if len(p.PendingChunkSpecs) == 0 {
                        t.pendAllChunkSpecs(piece)
                }
+               if p.Priority != piecePriorityNone {
+                       me.openNewConns(t)
+               }
        }
        t.PieceBytesLeftChanged(int(piece))
        for _, conn := range t.Conns {
@@ -2031,7 +2086,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                        conn.pieceRequestOrder.RemovePiece(int(piece))
                }
                if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
-                       conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece])
+                       conn.pendPiece(int(piece), t.Pieces[piece].Priority)
                }
        }
        if t.haveAllPieces() && me.noUpload {
@@ -2064,10 +2119,10 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
        cl.pieceHashed(t, index, sum == p.Hash)
 }
 
-func (me *Client) Torrents() (ret []*torrent) {
+func (me *Client) Torrents() (ret []Torrent) {
        me.mu.Lock()
        for _, t := range me.torrents {
-               ret = append(ret, t)
+               ret = append(ret, Torrent{me, t})
        }
        me.mu.Unlock()
        return
index e5ee79d268f70174b5ba20c24830338295ea3ef5..4db505df6398bac8efe968d3f37948a47a5ec50e 100644 (file)
@@ -48,7 +48,7 @@ func TestTorrentInitialState(t *testing.T) {
        if err != nil {
                t.Fatal(err)
        }
-       err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes)
+       err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes, nil)
        if err != nil {
                t.Fatal(err)
        }
index 16ffedb7235853c1a175630975ca399bc33a0c38..1d55dd5732458de86b8d49807f40be1018689702 100644 (file)
@@ -8,6 +8,7 @@ import (
        "expvar"
        "fmt"
        "io"
+       "log"
        "net"
        "sync"
        "time"
@@ -87,6 +88,26 @@ func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP b
        return
 }
 
+func (cn *connection) pendPiece(piece int, priority piecePriority) {
+       if priority == piecePriorityNone {
+               cn.pieceRequestOrder.RemovePiece(piece)
+               return
+       }
+       key := cn.piecePriorities[piece]
+       if priority == piecePriorityHigh {
+               key = -key
+       }
+       if piece == 0 {
+               log.Print(key)
+       }
+       cn.pieceRequestOrder.SetPiece(piece, key)
+}
+
+func (cn *connection) supportsExtension(ext string) bool {
+       _, ok := cn.PeerExtensionIDs[ext]
+       return ok
+}
+
 func (cn *connection) completedString() string {
        if cn.PeerPieces == nil {
                return "?"
index 835db4626cdc8d2b5f3ec157e8cd5580dd5d1fa7..6fd721b71a4fcefabb45f5870a7e1608cd71a2d4 100644 (file)
@@ -53,7 +53,7 @@ type node struct {
        path     []string
        metadata *torrent.MetaInfo
        FS       *TorrentFS
-       InfoHash torrent.InfoHash
+       t        torrent.Torrent
 }
 
 type fileNode struct {
@@ -72,25 +72,32 @@ func (n *node) fsPath() string {
        return "/" + strings.Join(append([]string{n.metadata.Name}, n.path...), "/")
 }
 
-func blockingRead(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
-       dataWaiter := fs.Client.DataWaiter(ih, off)
+func blockingRead(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
+       var (
+               _n   int
+               _err fuse.Error
+       )
+       readDone := make(chan struct{})
+       go func() {
+               _n, _err = t.ReadAt(p, off)
+               close(readDone)
+       }()
        select {
-       case <-dataWaiter:
+       case <-readDone:
+               n = _n
+               err = _err
        case <-fs.destroyed:
                err = fuse.EIO
-               return
        case <-intr:
                err = fuse.EINTR
-               return
        }
-       n, err = fs.Client.TorrentReadAt(ih, off, p)
        return
 }
 
-func readFull(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
+func readFull(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
        for len(p) != 0 {
                var nn int
-               nn, err = blockingRead(fs, ih, off, p, intr)
+               nn, err = blockingRead(fs, t, off, p, intr)
                if err != nil {
                        break
                }
@@ -126,14 +133,8 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
        if len(resp.Data) == 0 {
                return nil
        }
-       infoHash := fn.InfoHash
        torrentOff := fn.TorrentOffset + req.Offset
-       go func() {
-               if err := fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(size)); err != nil {
-                       log.Printf("error prioritizing %s: %s", fn.fsPath(), err)
-               }
-       }()
-       n, err := readFull(fn.FS, infoHash, torrentOff, resp.Data, intr)
+       n, err := readFull(fn.FS, fn.t, torrentOff, resp.Data, intr)
        if err != nil {
                return err
        }
@@ -231,7 +232,7 @@ func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err
                __node := node{
                        metadata: t.Info,
                        FS:       me.fs,
-                       InfoHash: t.InfoHash,
+                       t:        t,
                }
                if t.Info.SingleFile() {
                        _node = fileNode{__node, uint64(t.Info.Length), 0}
diff --git a/misc.go b/misc.go
index 7e8b70fb9c220fa8f02ec917593cd756d7133636..96723ed588ca6c9caa634d81d3fa83bfb7b99936 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -7,6 +7,7 @@ import (
        "math/rand"
        "os"
        "path/filepath"
+       "sync"
        "time"
 
        "bitbucket.org/anacrolix/go.torrent/mmap_span"
@@ -37,12 +38,22 @@ func (ih *InfoHash) HexString() string {
        return fmt.Sprintf("%x", ih[:])
 }
 
+type piecePriority byte
+
+const (
+       piecePriorityNone piecePriority = iota
+       piecePriorityNormal
+       piecePriorityHigh
+)
+
 type piece struct {
        Hash              pieceSum
        PendingChunkSpecs map[chunkSpec]struct{}
        Hashing           bool
        QueuedForHash     bool
        EverHashed        bool
+       Event             sync.Cond
+       Priority          piecePriority
 }
 
 func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) {
index e4114e576ba3f9029d4401694432626ea49d1475..07f7b0b696a0b7d83efcf57e0aed89789ac42bc2 100644 (file)
@@ -200,7 +200,7 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) {
 }
 
 // Called when metadata for a torrent becomes available.
-func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte) (err error) {
+func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) {
        t.Info = newMetaInfo(&md)
        t.MetaData = infoBytes
        t.metadataHave = nil
@@ -223,6 +223,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
        })
        for index, hash := range infoPieceHashes(&md) {
                piece := &torrentPiece{}
+               piece.Event.L = eventLocker
                util.CopyExact(piece.Hash[:], hash)
                t.Pieces = append(t.Pieces, piece)
                piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
@@ -280,6 +281,8 @@ func (t *torrent) pieceStatusChar(index int) byte {
                return 'H'
        case !p.EverHashed:
                return '?'
+       case p.Priority == piecePriorityHigh:
+               return '!'
        case t.PiecePartiallyDownloaded(index):
                return 'P'
        default:
@@ -629,7 +632,7 @@ func (t *torrent) wantPiece(index int) bool {
                return false
        }
        p := t.Pieces[index]
-       return p.EverHashed && len(p.PendingChunkSpecs) != 0
+       return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
index 3176ba2a85c8d210f7ee96a1fb8546009f4a67dd..69f67e93776a19e9946fd3a91f4eb65d1364b28b 100644 (file)
@@ -44,9 +44,7 @@ func (me worstConns) key(i int) (key worstConnsSortKey) {
        // Peer has had time to declare what they have.
        if time.Now().Sub(c.completedHandshake) >= 30*time.Second {
                if !me.t.haveInfo() {
-                       if _, ok := c.PeerExtensionIDs["ut_metadata"]; !ok {
-                               key.useless = true
-                       }
+                       key.useless = !c.supportsExtension("ut_metadata")
                } else {
                        if !me.t.connHasWantedPieces(c) {
                                key.useless = true