TODO | 15 +++++++++------ client.go | 271 ++++++++++++----------------------------------------- client_test.go | 7 ++++--- connection.go | 9 +++++++-- data/blob/blob.go | 41 +++++++++++++++++++++++++++++++---------- file.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++++ fs/torrentfs.go | 5 ++++- reader.go | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++ t.go | 17 +++++++++++++++++ torrent.go | 136 +++++++++++++++++------------------------------------ diff --git a/TODO b/TODO index 72aabfc623f3fa1e5acefc526e0fa012417735a4..d1b2c6bf094033635e92ffc5238b34e878b61c76 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,10 @@ * Track upload and download data. - * Emulate a UDP server in the UDP tracker tests. - * Make use of sparse file regions in download data for faster hashing. - * If we're choked and interested, we never send not-interested if there's nothing we want? - * Don't announce torrents that don't need active peers. It spams UDP, fills memory, and publicizes what we have loaded. - * Randomize triedAddrs bloom filter to allow different Addr sets on each Announce. - * When lots of good connections, it'll do a huge readahead, then refuse to trickle new pieces because we sent not interested to them all, thereby reducing the number of unchoked connections. \ No newline at end of file + * Emulate a UDP server in the UDP tracker tests rather than communicating with the Internet. + * Make use of sparse file regions in download data for faster hashing. This is available as whence 3 and 4 on some OS? + * When we're choked and interested, are we not interested if there's no longer anything that we want? + * dht: Randomize triedAddrs bloom filter to allow different Addr sets on each Announce. + * dht: Verify that the triedAddrs bloom filter is working well, github's willf made a bunch of changes. + * Rearrange the local-peer choked/interested status flags to be more natural to read. + * Check that pruning is working correctly. worstConns sorting might need an adjustment to how it factors in the good/unwanted chunks ratio. + * data/blob: Deleting incomplete data triggers io.ErrUnexpectedEOF that isn't recovered from. + * Responsive reader needs to apply some readahead. \ No newline at end of file diff --git a/client.go b/client.go index 9b60a3a82c789f95643eee1b9286c54fe141797b..46d327919823778976bdca4c9d3aa7e0b069555c 100644 --- a/client.go +++ b/client.go @@ -257,80 +257,22 @@ fmt.Fprintln(w) } } -// Read torrent data at the given offset. Will block until it is available. -func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) { - cl.mu.Lock() - defer cl.mu.Unlock() - index := int(off / int64(t.usualPieceSize())) - // Reading outside the bounds of a file is an error. - if index < 0 { - err = os.ErrInvalid - return - } - if int(index) >= len(t.Pieces) { - err = io.EOF - return - } - pieceOff := pp.Integer(off % int64(t.usualPieceSize())) - pieceLeft := int(t.pieceLength(index) - pieceOff) - if pieceLeft <= 0 { - err = io.EOF - return - } - if len(p) > pieceLeft { - p = p[:pieceLeft] - } - if len(p) == 0 { - panic(len(p)) - } - // TODO: ReadAt should always try to fill the buffer. - for { - avail := cl.prepareRead(t, off) - if avail < int64(len(p)) { - p = p[:avail] - } - n, err = dataReadAt(t.data, p, off) - if n != 0 || err != io.ErrUnexpectedEOF { - break - } - // If we reach here, the data we thought was ready, isn't. So we - // prepare it again, and retry. - } - return -} - -// Sets priorities to download from the given offset. Returns when the piece -// at the given offset can be read. Returns the number of bytes that are -// immediately available from the offset. -func (cl *Client) prepareRead(t *torrent, off int64) (n int64) { - index := int(off / int64(t.usualPieceSize())) - // Reading outside the bounds of a file is an error. - if index < 0 || index >= t.numPieces() { - return - } - piece := t.Pieces[index] - cl.readRaisePiecePriorities(t, off) - for !t.pieceComplete(index) && !t.isClosed() { - // This is to prevent being starved if a piece is dropped before we - // can read it. - cl.readRaisePiecePriorities(t, off) - piece.Event.Wait() - } - return t.Info.Piece(index).Length() - off%t.Info.PieceLength -} - -func (T Torrent) prepareRead(off int64) (avail int64) { - T.cl.mu.Lock() - defer T.cl.mu.Unlock() - return T.cl.prepareRead(T.torrent, off) -} - -// Data implements a streaming interface that's more efficient than ReadAt. +// A Data that implements this has a streaming interface that should be +// preferred over ReadAt. For example, the data is stored in blocks on the +// network and have a fixed cost to open. type SectionOpener interface { + // Open a ReadCloser at the given offset into torrent data. n is how many + // bytes we intend to read. OpenSection(off, n int64) (io.ReadCloser, error) } func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { + // defer func() { + // if err == io.ErrUnexpectedEOF && n != 0 { + // err = nil + // } + // }() + // log.Println("data read at", len(b), off) again: if ra, ok := d.(io.ReaderAt); ok { return ra.ReadAt(b, off) @@ -357,7 +299,7 @@ func readaheadPieces(readahead, pieceLength int64) int { return int((readahead+pieceLength-1)/pieceLength - 1) } -func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { +func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) { index := int(off / int64(t.usualPieceSize())) cl.raisePiecePriority(t, index, piecePriorityNow) index++ @@ -365,13 +307,37 @@ if index >= t.numPieces() { return } cl.raisePiecePriority(t, index, piecePriorityNext) - for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) { + for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) { index++ if index >= t.numPieces() { break } cl.raisePiecePriority(t, index, piecePriorityReadahead) } +} + +func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) { + for n > 0 { + req, ok := t.offsetRequest(off) + if !ok { + break + } + if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) { + if t.urgent == nil { + t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize) + } + t.urgent[req] = struct{}{} + cl.event.Broadcast() // Why? + index := int(req.Index) + cl.queueFirstHash(t, index) + cl.pieceChanged(t, index) + } + reqOff := t.requestOffset(req) + n1 := req.Length - pp.Integer(off-reqOff) + off += int64(n1) + n -= int(n1) + } + // log.Print(t.urgent) } func (cl *Client) configDir() string { @@ -582,11 +548,11 @@ if dhtCfg.Conn == nil && cl.utpSock != nil { dhtCfg.Conn = cl.utpSock.PacketConn() } cl.dHT, err = dht.NewServer(dhtCfg) - if cl.ipBlockList != nil { - cl.dHT.SetIPBlockList(cl.ipBlockList) - } if err != nil { return + } + if cl.ipBlockList != nil { + cl.dHT.SetIPBlockList(cl.ipBlockList) } } @@ -1894,6 +1860,9 @@ cl.event.Broadcast() if err != nil { return } + for index := range iter.N(t.numPieces()) { + cl.pieceChanged(t, index) + } cl.startTorrent(t) return } @@ -1990,12 +1959,6 @@ } t.Trackers = newTrackers } -// A handle to a live torrent within a Client. -type Torrent struct { - cl *Client - *torrent -} - // Don't call this before the info is available. func (t *torrent) BytesCompleted() int64 { if !t.haveInfo() { @@ -2014,23 +1977,6 @@ t.cl.dropTorrent(t.InfoHash) t.cl.mu.Unlock() } -// Provides access to regions of torrent data that correspond to its files. -type File struct { - t Torrent - path string - offset int64 - length int64 - fi metainfo.FileInfo -} - -func (f File) FileInfo() metainfo.FileInfo { - return f.fi -} - -func (f File) Path() string { - return f.path -} - // A file-like handle to some torrent data resource. type Handle interface { io.Reader @@ -2039,114 +1985,6 @@ io.Closer io.ReaderAt } -// Implements a Handle within a subsection of another Handle. -type sectionHandle struct { - h Handle - off, n, cur int64 -} - -func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) { - if whence == 0 { - offset += me.off - } else if whence == 2 { - whence = 0 - offset += me.off + me.n - } - ret, err = me.h.Seek(offset, whence) - me.cur = ret - ret -= me.off - return -} - -func (me *sectionHandle) Close() error { - return me.h.Close() -} - -func (me *sectionHandle) Read(b []byte) (n int, err error) { - max := me.off + me.n - me.cur - if int64(len(b)) > max { - b = b[:max] - } - n, err = me.h.Read(b) - me.cur += int64(n) - if err != nil { - return - } - if me.cur == me.off+me.n { - err = io.EOF - } - return -} - -func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) { - if off >= me.n { - err = io.EOF - return - } - if int64(len(b)) >= me.n-off { - b = b[:me.n-off] - } - return me.h.ReadAt(b, me.off+off) -} - -func (f File) Open() (h Handle, err error) { - h = f.t.NewReadHandle() - _, err = h.Seek(f.offset, os.SEEK_SET) - if err != nil { - h.Close() - return - } - h = §ionHandle{h, f.offset, f.Length(), f.offset} - return -} - -func (f File) ReadAt(p []byte, off int64) (n int, err error) { - maxLen := f.length - off - if int64(len(p)) > maxLen { - p = p[:maxLen] - } - return f.t.ReadAt(p, off+f.offset) -} - -func (f *File) Length() int64 { - return f.length -} - -type FilePieceState struct { - Length int64 - State byte -} - -func (f *File) Progress() (ret []FilePieceState) { - pieceSize := int64(f.t.usualPieceSize()) - off := f.offset % pieceSize - remaining := f.length - for i := int(f.offset / pieceSize); ; i++ { - if remaining == 0 { - break - } - len1 := pieceSize - off - if len1 > remaining { - len1 = remaining - } - ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)}) - off = 0 - remaining -= len1 - } - return -} - -func (f *File) PrioritizeRegion(off, len int64) { - if off < 0 || off >= f.length { - return - } - if off+len > f.length { - len = f.length - off - } - off += f.offset - f.t.SetRegionPriority(off, len) -} - // Returns handles to the files in the torrent. This requires the metainfo is // available first. func (t Torrent) Files() (ret []File) { @@ -2198,10 +2036,6 @@ // Nice to have the first and last pieces sooner for various interactive // purposes. t.cl.raisePiecePriority(t.torrent, 0, piecePriorityReadahead) t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) -} - -func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) { - return me.cl.torrentReadAt(me.torrent, off, p) } // Returns nil metainfo if it isn't in the cache. Checks that the retrieved @@ -2612,10 +2446,16 @@ return } } addRequest := func(req request) (again bool) { + // TODO: Couldn't this check also be done *after* the request? if len(c.Requests) >= 64 { return false } return c.Request(req) + } + for req := range t.urgent { + if !addRequest(req) { + return + } } for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { pieceIndex := e.Piece() @@ -2664,7 +2504,7 @@ piece := t.Pieces[req.Index] // Do we actually want this chunk? - if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone { + if !t.wantChunk(req) { unusedDownloadedChunksCount.Add(1) c.UnwantedChunksReceived++ return nil @@ -2679,8 +2519,11 @@ if err != nil { return fmt.Errorf("error writing chunk: %s", err) } + // log.Println("got chunk", req) + piece.Event.Broadcast() // Record that we have the chunk. delete(piece.PendingChunkSpecs, req.chunkSpec) + delete(t.urgent, req) if len(piece.PendingChunkSpecs) == 0 { for _, c := range t.Conns { c.pieceRequestOrder.DeletePiece(int(req.Index)) @@ -2717,18 +2560,24 @@ } me.pieceChanged(t, int(piece)) } +// TODO: Check this isn't called more than once for each piece being correct. func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) p := t.Pieces[piece] if correct { p.Priority = piecePriorityNone p.PendingChunkSpecs = nil + for req := range t.urgent { + if int(req.Index) == piece { + delete(t.urgent, req) + } + } p.Event.Broadcast() } else { if len(p.PendingChunkSpecs) == 0 { t.pendAllChunkSpecs(int(piece)) } - if p.Priority != piecePriorityNone { + if t.wantPiece(piece) { me.openNewConns(t) } } diff --git a/client_test.go b/client_test.go index 08dbc80cd8b655647a35be4fbf58ec1e1b481ebc..05d7c90924e505c2e67ec3dfab292aa4b6a814fd 100644 --- a/client_test.go +++ b/client_test.go @@ -3,7 +3,6 @@ import ( "encoding/binary" "fmt" - "io" "io/ioutil" "log" "net" @@ -273,9 +272,11 @@ IP: util.AddrIP(seeder.ListenAddr()), Port: util.AddrPort(seeder.ListenAddr()), }, }) - _greeting, err := ioutil.ReadAll(io.NewSectionReader(leecherGreeting, 0, leecherGreeting.Length())) + r := leecherGreeting.NewReader() + defer r.Close() + _greeting, err := ioutil.ReadAll(r) if err != nil { - t.Fatal(err) + t.Fatalf("%q %s", string(_greeting), err) } greeting := string(_greeting) if greeting != testutil.GreetingFileContents { diff --git a/connection.go b/connection.go index b365f743d0e544f3834fc8dc6f727e37c6e7ec9f..25a834708a41773949bbe40312738a7078fa3422 100644 --- a/connection.go +++ b/connection.go @@ -106,8 +106,13 @@ cn.pieceRequestOrder.DeletePiece(piece) return } pp := cn.piecePriorities[piece] - // Priority goes to Now, then Next in connection order. Then Readahead in - // by piece index. Then normal again by connection order. + // Priority regions not to scale. Within each region, piece is randomized + // according to connection. + + // [ Now ] + // [ Next ] + // [ Readahead ] + // [ Normal ] key := func() int { switch priority { case piecePriorityNow: diff --git a/data/blob/blob.go b/data/blob/blob.go index a3ade8e687a6dd08547159b200af65d6eca6f84f..934bbc4826fe473f6547e0238fb4ceb813b23db8 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -3,6 +3,7 @@ import ( "encoding/hex" "io" + "log" "github.com/anacrolix/libtorgo/metainfo" ) @@ -19,16 +20,36 @@ func (me *data) Close() {} func (me *data) ReadAt(b []byte, off int64) (n int, err error) { - p := me.info.Piece(int(off / me.info.PieceLength)) - f := me.store.pieceRead(p) - if f == nil { - err = io.ErrUnexpectedEOF - return - } - defer f.Close() - n, err = f.ReadAt(b, off%me.info.PieceLength) - if err == io.EOF { - err = io.ErrUnexpectedEOF + for len(b) != 0 { + if off >= me.info.TotalLength() { + err = io.EOF + break + } + p := me.info.Piece(int(off / me.info.PieceLength)) + f := me.store.pieceRead(p) + if f == nil { + log.Println("piece not found", p) + err = io.ErrUnexpectedEOF + break + } + b1 := b + maxN1 := int(p.Length() - off%me.info.PieceLength) + if len(b1) > maxN1 { + b1 = b1[:maxN1] + } + var n1 int + n1, err = f.ReadAt(b1, off%me.info.PieceLength) + f.Close() + n += n1 + off += int64(n1) + b = b[n1:] + if err == io.EOF { + err = nil + break + } + if err != nil { + break + } } return } diff --git a/file.go b/file.go new file mode 100644 index 0000000000000000000000000000000000000000..1ec0d21dbf7d447cba0bb1c9c554235e5550c46e --- /dev/null +++ b/file.go @@ -0,0 +1,64 @@ +package torrent + +import "github.com/anacrolix/libtorgo/metainfo" + +// Provides access to regions of torrent data that correspond to its files. +type File struct { + t Torrent + path string + offset int64 + length int64 + fi metainfo.FileInfo +} + +// Data for this file begins this far into the torrent. +func (f *File) Offset() int64 { + return f.offset +} + +func (f File) FileInfo() metainfo.FileInfo { + return f.fi +} + +func (f File) Path() string { + return f.path +} + +func (f *File) Length() int64 { + return f.length +} + +type FilePieceState struct { + Length int64 + State byte +} + +func (f *File) Progress() (ret []FilePieceState) { + pieceSize := int64(f.t.usualPieceSize()) + off := f.offset % pieceSize + remaining := f.length + for i := int(f.offset / pieceSize); ; i++ { + if remaining == 0 { + break + } + len1 := pieceSize - off + if len1 > remaining { + len1 = remaining + } + ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)}) + off = 0 + remaining -= len1 + } + return +} + +func (f *File) PrioritizeRegion(off, len int64) { + if off < 0 || off >= f.length { + return + } + if off+len > f.length { + len = f.length - off + } + off += f.offset + f.t.SetRegionPriority(off, len) +} diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 47fd6109adba4207676d78d33ce138db6f56d487..ffb9df68b11e05186c0fcb4e7f78ba8d498dd2a4 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -91,7 +91,10 @@ _err error ) readDone := make(chan struct{}) go func() { - _n, _err = t.ReadAt(p, off) + r := t.NewReader() + defer r.Close() + _n, _err = r.ReadAt(p, off) + log.Println(_n, p) close(readDone) }() select { diff --git a/reader.go b/reader.go new file mode 100644 index 0000000000000000000000000000000000000000..60357f4effc2d460d954c4b220ce42d98ee5d07e --- /dev/null +++ b/reader.go @@ -0,0 +1,132 @@ +package torrent + +import ( + "errors" + "io" + "os" +) + +// Accesses torrent data via a client. +type Reader struct { + t *Torrent + pos int64 + responsive bool + readahead int64 +} + +var _ io.ReadCloser = &Reader{} + +// Don't wait for pieces to complete and be verified. Read calls return as +// soon as they can when the underlying chunks become available. +func (r *Reader) SetResponsive() { + r.responsive = true +} + +func (r *Reader) SetReadahead(readahead int64) { + r.readahead = readahead +} + +func (r *Reader) raisePriorities(off int64, n int) { + if r.responsive { + r.t.cl.addUrgentRequests(r.t.torrent, off, n) + } + r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead) +} + +func (r *Reader) readable(off int64) (ret bool) { + // log.Println("readable", off) + // defer func() { + // log.Println("readable", ret) + // }() + req, ok := r.t.offsetRequest(off) + if !ok { + panic(off) + } + if r.responsive { + return r.t.haveChunk(req) + } + return r.t.pieceComplete(int(req.Index)) +} + +// How many bytes are available to read. Max is the most we could require. +func (r *Reader) available(off, max int64) (ret int64) { + for max > 0 { + req, ok := r.t.offsetRequest(off) + if !ok { + break + } + if !r.t.haveChunk(req) { + break + } + len1 := int64(req.Length) - (off - r.t.requestOffset(req)) + max -= len1 + ret += len1 + off += len1 + } + return +} + +func (r *Reader) waitReadable(off int64) { + r.t.Pieces[off/int64(r.t.usualPieceSize())].Event.Wait() +} + +func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) { + return r.readAt(b, off) +} + +func (r *Reader) Read(b []byte) (n int, err error) { + n, err = r.readAt(b, r.pos) + r.pos += int64(n) + if n != 0 && err == io.ErrUnexpectedEOF { + err = nil + } + return +} + +func (r *Reader) readAt(b []byte, pos int64) (n int, err error) { + // defer func() { + // log.Println(pos, n, err) + // }() + r.t.cl.mu.Lock() + defer r.t.cl.mu.Unlock() + maxLen := r.t.Info.TotalLength() - pos + if maxLen <= 0 { + err = io.EOF + return + } + if int64(len(b)) > maxLen { + b = b[:maxLen] + } + r.raisePriorities(pos, len(b)) + for !r.readable(pos) { + r.raisePriorities(pos, len(b)) + r.waitReadable(pos) + } + avail := r.available(pos, int64(len(b))) + // log.Println("available", avail) + if int64(len(b)) > avail { + b = b[:avail] + } + n, err = dataReadAt(r.t.data, b, pos) + return +} + +func (r *Reader) Close() error { + r.t = nil + return nil +} + +func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { + switch whence { + case os.SEEK_SET: + r.pos = off + case os.SEEK_CUR: + r.pos += off + case os.SEEK_END: + r.pos = r.t.Info.TotalLength() + off + default: + err = errors.New("bad whence") + } + ret = r.pos + return +} diff --git a/t.go b/t.go new file mode 100644 index 0000000000000000000000000000000000000000..be35c8c6d7a4789c0804a1811d2cb22c05b77358 --- /dev/null +++ b/t.go @@ -0,0 +1,17 @@ +package torrent + +// The public interface for a torrent within a Client. + +// A handle to a live torrent within a Client. +type Torrent struct { + cl *Client + *torrent +} + +func (t *Torrent) NewReader() (ret *Reader) { + ret = &Reader{ + t: t, + readahead: 5 * 1024 * 1024, + } + return +} diff --git a/torrent.go b/torrent.go index 8b5dbb715a7eee3bcf154833a66eb7212721b8f3..72d0b1299e9607035d9a4bdb69da3deff4ef7d20 100644 --- a/torrent.go +++ b/torrent.go @@ -2,12 +2,10 @@ package torrent import ( "container/heap" - "errors" "fmt" "io" "log" "net" - "os" "sort" "sync" "time" @@ -66,6 +64,9 @@ ceasingNetworking chan struct{} InfoHash InfoHash Pieces []*piece + // Chunks that are wanted before all others. This is for + // responsive/streaming readers that want to unblock ASAP. + urgent map[request]struct{} // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. length int64 @@ -110,91 +111,6 @@ // assigned, which doesn't seem right. return t.data != nil && t.data.PieceComplete(piece) } -// A file-like handle to torrent data that implements SectionOpener. Opened -// sections will be reused so long as Reads and ReadAt's are contiguous. -type handle struct { - rc io.ReadCloser - rcOff int64 - curOff int64 - so SectionOpener - size int64 - t Torrent -} - -func (h *handle) Close() error { - if h.rc != nil { - return h.rc.Close() - } - return nil -} - -func (h *handle) ReadAt(b []byte, off int64) (n int, err error) { - return h.readAt(b, off) -} - -func (h *handle) readAt(b []byte, off int64) (n int, err error) { - avail := h.t.prepareRead(off) - if int64(len(b)) > avail { - b = b[:avail] - } - if int64(len(b)) > h.size-off { - b = b[:h.size-off] - } - if h.rcOff != off && h.rc != nil { - h.rc.Close() - h.rc = nil - } - if h.rc == nil { - h.rc, err = h.so.OpenSection(off, h.size-off) - if err != nil { - return - } - h.rcOff = off - } - n, err = h.rc.Read(b) - h.rcOff += int64(n) - return -} - -func (h *handle) Read(b []byte) (n int, err error) { - n, err = h.readAt(b, h.curOff) - h.curOff = h.rcOff - return -} - -func (h *handle) Seek(off int64, whence int) (newOff int64, err error) { - switch whence { - case os.SEEK_SET: - h.curOff = off - case os.SEEK_CUR: - h.curOff += off - case os.SEEK_END: - h.curOff = h.size + off - default: - err = errors.New("bad whence") - } - newOff = h.curOff - return -} - -// Implements Handle on top of an io.SectionReader. -type sectionReaderHandle struct { - *io.SectionReader -} - -func (sectionReaderHandle) Close() error { return nil } - -func (T Torrent) NewReadHandle() Handle { - if so, ok := T.data.(SectionOpener); ok { - return &handle{ - so: so, - size: T.Length(), - t: T, - } - } - return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())} -} - func (t *torrent) numConnsUnchoked() (num int) { for _, c := range t.Conns { if !c.PeerChoked { @@ -238,7 +154,9 @@ close(t.ceasingNetworking) for _, c := range t.Conns { c.Close() } - t.pruneTimer.Stop() + if t.pruneTimer != nil { + t.pruneTimer.Stop() + } } func (t *torrent) addPeer(p Peer) { @@ -502,6 +420,11 @@ fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char) } fmt.Fprintln(w) } + fmt.Fprintf(w, "Urgent:") + for req := range t.urgent { + fmt.Fprintf(w, " %s", req) + } + fmt.Fprintln(w) fmt.Fprintf(w, "Trackers: ") for _, tier := range t.Trackers { for _, tr := range tier { @@ -647,6 +570,7 @@ } func (t *torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { + // TODO: Check this logic. bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0) } return @@ -732,11 +656,12 @@ return t.haveInfo() && t.pieceComplete(index) } func (t *torrent) haveChunk(r request) bool { - p := t.Pieces[r.Index] - if !p.EverHashed { + if !t.haveInfo() { return false } - _, ok := p.PendingChunkSpecs[r.chunkSpec] + piece := t.Pieces[r.Index] + _, ok := piece.PendingChunkSpecs[r.chunkSpec] + // log.Println("have chunk", r, !ok) return !ok } @@ -745,16 +670,41 @@ if !t.wantPiece(int(r.Index)) { return false } _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec] + if ok { + return true + } + _, ok = t.urgent[r] return ok } +func (t *torrent) urgentChunkInPiece(piece int) bool { + for req := range t.urgent { + if int(req.Index) == piece { + return true + } + } + return false +} + func (t *torrent) wantPiece(index int) bool { if !t.haveInfo() { return false } p := t.Pieces[index] - // Put piece complete check last, since it's the slowest! - return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index) + if p.QueuedForHash { + return false + } + if p.Hashing { + return false + } + if p.Priority == piecePriorityNone { + if !t.urgentChunkInPiece(index) { + return false + } + } + // Put piece complete check last, since it's the slowest as it can involve + // calling out into external data stores. + return !t.pieceComplete(index) } func (t *torrent) connHasWantedPieces(c *connection) bool {