From 91c2c1f5c740a4fd801fd2830a1de022a3f2c815 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 3 Dec 2014 01:07:50 -0600 Subject: [PATCH] Piece priorities, torrent read interface and many fixes --- client.go | 147 +++++++++++++++++++++++++++++++++--------------- client_test.go | 2 +- connection.go | 21 +++++++ fs/torrentfs.go | 35 ++++++------ misc.go | 11 ++++ torrent.go | 7 ++- worst_conns.go | 4 +- 7 files changed, 158 insertions(+), 69 deletions(-) diff --git a/client.go b/client.go index 7b3a8967..26dc55a4 100644 --- a/client.go +++ b/client.go @@ -243,15 +243,10 @@ func (cl *Client) WriteStatus(_w io.Writer) { // Read torrent data at the given offset. Returns ErrDataNotReady if the data // isn't available. -func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) { - cl.mu.RLock() - defer cl.mu.RUnlock() - t := cl.torrent(ih) - if t == nil { - err = errors.New("unknown torrent") - return - } - index := pp.Integer(off / int64(t.UsualPieceSize())) +func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) { + cl.mu.Lock() + defer cl.mu.Unlock() + index := int(off / int64(t.UsualPieceSize())) // Reading outside the bounds of a file is an error. if index < 0 { err = os.ErrInvalid @@ -263,7 +258,7 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er } piece := t.Pieces[index] pieceOff := pp.Integer(off % int64(t.UsualPieceSize())) - pieceLeft := int(t.PieceLength(index) - pieceOff) + pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff) if pieceLeft <= 0 { err = io.EOF return @@ -271,24 +266,16 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er if len(p) > pieceLeft { p = p[:pieceLeft] } - for cs, _ := range piece.PendingChunkSpecs { - chunkOff := int64(pieceOff) - int64(cs.Begin) - if chunkOff >= int64(t.PieceLength(index)) { - panic(chunkOff) - } - if 0 <= chunkOff && chunkOff < int64(cs.Length) { - // read begins in a pending chunk - err = ErrDataNotReady - return - } - // pending chunk caps available data - if chunkOff < 0 && int64(len(p)) > -chunkOff { - p = p[:-chunkOff] - } - } if len(p) == 0 { panic(len(p)) } + cl.prioritizePiece(t, index, piecePriorityHigh) + for i := index + 1; i < index+7 && i < t.NumPieces(); i++ { + cl.prioritizePiece(t, i, piecePriorityNormal) + } + for !piece.Complete() { + piece.Event.Wait() + } return t.Data.ReadAt(p, off) } @@ -296,6 +283,30 @@ func (cl *Client) configDir() string { return filepath.Join(os.Getenv("HOME"), ".config/torrent") } +func (cl *Client) ConfigDir() string { + return cl.configDir() +} + +func (t *torrent) connPendPiece(c *connection, piece int) { + c.pendPiece(piece, t.Pieces[piece].Priority) +} + +func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) { + if t.havePiece(piece) { + return + } + cl.queueFirstHash(t, piece) + t.Pieces[piece].Priority = priority + if t.wantPiece(piece) { + for _, c := range t.Conns { + if c.PeerHasPiece(pp.Integer(piece)) { + t.connPendPiece(c, piece) + cl.replenishConnRequests(t, c) + } + } + } +} + func (cl *Client) setEnvBlocklist() (err error) { filename := os.Getenv("TORRENT_BLOCKLIST_FILE") defaultBlocklist := filename == "" @@ -915,7 +926,7 @@ func (t *torrent) initRequestOrdering(c *connection) { if !t.wantPiece(i) { continue } - c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i]) + t.connPendPiece(c, i) } } @@ -931,7 +942,7 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) { } c.PeerPieces[piece] = true if t.wantPiece(piece) { - c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece]) + t.connPendPiece(c, piece) me.replenishConnRequests(t, c) } } @@ -1318,6 +1329,9 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { return false default: } + if !me.wantConns(t) { + return false + } for _, c0 := range t.Conns { if c.PeerID == c0.PeerID { // Already connected to a client with that ID. @@ -1334,21 +1348,60 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { return true } +func (t *torrent) needData() bool { + if !t.haveInfo() { + return true + } + for i := range t.Pieces { + if t.wantPiece(i) { + return true + } + } + return false +} + +// TODO: I'm sure there's something here to do with seeding. +func (t *torrent) badConn(c *connection) bool { + if time.Now().Sub(c.completedHandshake) < 30*time.Second { + return false + } + if !t.haveInfo() { + return !c.supportsExtension("ut_metadata") + } + return !t.connHasWantedPieces(c) +} + +func (t *torrent) numGoodConns() (num int) { + for _, c := range t.Conns { + if !t.badConn(c) { + num++ + } + } + return +} + +func (me *Client) wantConns(t *torrent) bool { + if !t.needData() && me.noUpload { + return false + } + if t.numGoodConns() >= socketsPerTorrent { + return false + } + return true +} + func (me *Client) openNewConns(t *torrent) { select { case <-t.ceasingNetworking: return default: } - if t.haveInfo() && !me.downloadStrategy.PendingData(t) { - return - } for len(t.Peers) != 0 { - if len(t.Conns) >= socketsPerTorrent { - break + if !me.wantConns(t) { + return } if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit { - break + return } var ( k peersKey @@ -1414,7 +1467,7 @@ func (cl *Client) saveTorrentFile(t *torrent) error { } func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { - err = t.setMetadata(md, cl.dataDir, bytes) + err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu) if err != nil { return } @@ -1527,19 +1580,16 @@ func (t Torrent) AddPeers(pp []Peer) error { func (t Torrent) DownloadAll() { t.cl.mu.Lock() for i := 0; i < t.NumPieces(); i++ { - t.cl.queueFirstHash(t.torrent, i) + // TODO: Leave higher priorities as they were? + t.cl.prioritizePiece(t.torrent, i, piecePriorityNormal) } + t.cl.prioritizePiece(t.torrent, 0, piecePriorityHigh) + t.cl.prioritizePiece(t.torrent, t.NumPieces()-1, piecePriorityHigh) t.cl.mu.Unlock() } func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) { - err = me.cl.PrioritizeDataRegion(me.InfoHash, off, int64(len(p))) - if err != nil { - err = fmt.Errorf("error prioritizing: %s", err) - return - } - <-me.cl.DataWaiter(me.InfoHash, off) - return me.cl.TorrentReadAt(me.InfoHash, off, p) + return me.cl.torrentReadAt(me.torrent, off, p) } func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { @@ -1799,7 +1849,7 @@ func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, go func(tr tracker.Client) { err := cl.announceTorrentSingleTracker(tr, req, t) if err != nil { - log.Printf("error announcing to %s: %s", tr, err) + log.Printf("error announcing %q to %s: %s", t, tr, err) } oks <- err == nil }(tr) @@ -2004,7 +2054,9 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { } p.EverHashed = true if correct { + p.Priority = piecePriorityNone p.PendingChunkSpecs = nil + p.Event.Broadcast() me.downloadStrategy.TorrentGotPiece(t, int(piece)) me.dataReady(t, request{ pp.Integer(piece), @@ -2014,6 +2066,9 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { if len(p.PendingChunkSpecs) == 0 { t.pendAllChunkSpecs(piece) } + if p.Priority != piecePriorityNone { + me.openNewConns(t) + } } t.PieceBytesLeftChanged(int(piece)) for _, conn := range t.Conns { @@ -2031,7 +2086,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { conn.pieceRequestOrder.RemovePiece(int(piece)) } if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) { - conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece]) + conn.pendPiece(int(piece), t.Pieces[piece].Priority) } } if t.haveAllPieces() && me.noUpload { @@ -2064,10 +2119,10 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { cl.pieceHashed(t, index, sum == p.Hash) } -func (me *Client) Torrents() (ret []*torrent) { +func (me *Client) Torrents() (ret []Torrent) { me.mu.Lock() for _, t := range me.torrents { - ret = append(ret, t) + ret = append(ret, Torrent{me, t}) } me.mu.Unlock() return diff --git a/client_test.go b/client_test.go index e5ee79d2..4db505df 100644 --- a/client_test.go +++ b/client_test.go @@ -48,7 +48,7 @@ func TestTorrentInitialState(t *testing.T) { if err != nil { t.Fatal(err) } - err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes) + err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes, nil) if err != nil { t.Fatal(err) } diff --git a/connection.go b/connection.go index 16ffedb7..1d55dd57 100644 --- a/connection.go +++ b/connection.go @@ -8,6 +8,7 @@ import ( "expvar" "fmt" "io" + "log" "net" "sync" "time" @@ -87,6 +88,26 @@ func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP b return } +func (cn *connection) pendPiece(piece int, priority piecePriority) { + if priority == piecePriorityNone { + cn.pieceRequestOrder.RemovePiece(piece) + return + } + key := cn.piecePriorities[piece] + if priority == piecePriorityHigh { + key = -key + } + if piece == 0 { + log.Print(key) + } + cn.pieceRequestOrder.SetPiece(piece, key) +} + +func (cn *connection) supportsExtension(ext string) bool { + _, ok := cn.PeerExtensionIDs[ext] + return ok +} + func (cn *connection) completedString() string { if cn.PeerPieces == nil { return "?" diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 835db462..6fd721b7 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -53,7 +53,7 @@ type node struct { path []string metadata *torrent.MetaInfo FS *TorrentFS - InfoHash torrent.InfoHash + t torrent.Torrent } type fileNode struct { @@ -72,25 +72,32 @@ func (n *node) fsPath() string { return "/" + strings.Join(append([]string{n.metadata.Name}, n.path...), "/") } -func blockingRead(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) { - dataWaiter := fs.Client.DataWaiter(ih, off) +func blockingRead(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) { + var ( + _n int + _err fuse.Error + ) + readDone := make(chan struct{}) + go func() { + _n, _err = t.ReadAt(p, off) + close(readDone) + }() select { - case <-dataWaiter: + case <-readDone: + n = _n + err = _err case <-fs.destroyed: err = fuse.EIO - return case <-intr: err = fuse.EINTR - return } - n, err = fs.Client.TorrentReadAt(ih, off, p) return } -func readFull(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) { +func readFull(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) { for len(p) != 0 { var nn int - nn, err = blockingRead(fs, ih, off, p, intr) + nn, err = blockingRead(fs, t, off, p, intr) if err != nil { break } @@ -126,14 +133,8 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus if len(resp.Data) == 0 { return nil } - infoHash := fn.InfoHash torrentOff := fn.TorrentOffset + req.Offset - go func() { - if err := fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(size)); err != nil { - log.Printf("error prioritizing %s: %s", fn.fsPath(), err) - } - }() - n, err := readFull(fn.FS, infoHash, torrentOff, resp.Data, intr) + n, err := readFull(fn.FS, fn.t, torrentOff, resp.Data, intr) if err != nil { return err } @@ -231,7 +232,7 @@ func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err __node := node{ metadata: t.Info, FS: me.fs, - InfoHash: t.InfoHash, + t: t, } if t.Info.SingleFile() { _node = fileNode{__node, uint64(t.Info.Length), 0} diff --git a/misc.go b/misc.go index 7e8b70fb..96723ed5 100644 --- a/misc.go +++ b/misc.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "path/filepath" + "sync" "time" "bitbucket.org/anacrolix/go.torrent/mmap_span" @@ -37,12 +38,22 @@ func (ih *InfoHash) HexString() string { return fmt.Sprintf("%x", ih[:]) } +type piecePriority byte + +const ( + piecePriorityNone piecePriority = iota + piecePriorityNormal + piecePriorityHigh +) + type piece struct { Hash pieceSum PendingChunkSpecs map[chunkSpec]struct{} Hashing bool QueuedForHash bool EverHashed bool + Event sync.Cond + Priority piecePriority } func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) { diff --git a/torrent.go b/torrent.go index e4114e57..07f7b0b6 100644 --- a/torrent.go +++ b/torrent.go @@ -200,7 +200,7 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) { } // Called when metadata for a torrent becomes available. -func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte) (err error) { +func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) { t.Info = newMetaInfo(&md) t.MetaData = infoBytes t.metadataHave = nil @@ -223,6 +223,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte }) for index, hash := range infoPieceHashes(&md) { piece := &torrentPiece{} + piece.Event.L = eventLocker util.CopyExact(piece.Hash[:], hash) t.Pieces = append(t.Pieces, piece) piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index) @@ -280,6 +281,8 @@ func (t *torrent) pieceStatusChar(index int) byte { return 'H' case !p.EverHashed: return '?' + case p.Priority == piecePriorityHigh: + return '!' case t.PiecePartiallyDownloaded(index): return 'P' default: @@ -629,7 +632,7 @@ func (t *torrent) wantPiece(index int) bool { return false } p := t.Pieces[index] - return p.EverHashed && len(p.PendingChunkSpecs) != 0 + return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone } func (t *torrent) connHasWantedPieces(c *connection) bool { diff --git a/worst_conns.go b/worst_conns.go index 3176ba2a..69f67e93 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -44,9 +44,7 @@ func (me worstConns) key(i int) (key worstConnsSortKey) { // Peer has had time to declare what they have. if time.Now().Sub(c.completedHandshake) >= 30*time.Second { if !me.t.haveInfo() { - if _, ok := c.PeerExtensionIDs["ut_metadata"]; !ok { - key.useless = true - } + key.useless = !c.supportsExtension("ut_metadata") } else { if !me.t.connHasWantedPieces(c) { key.useless = true -- 2.44.0