]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improve the data waiter system
authorMatt Joiner <anacrolix@gmail.com>
Sat, 13 Sep 2014 17:50:15 +0000 (03:50 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 13 Sep 2014 17:50:15 +0000 (03:50 +1000)
client.go
torrent.go

index 1ad69cab2508c6588f4620301db435cb3f181880..5b161293023a6cf163f76f034169d6c54900bd56 100644 (file)
--- a/client.go
+++ b/client.go
@@ -29,6 +29,7 @@ import (
        mathRand "math/rand"
        "net"
        "os"
+       "strings"
        "sync"
        "syscall"
        "time"
@@ -77,6 +78,14 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
        go cl.verifyPiece(t, pieceIndex)
 }
 
+func (cl *Client) queueFirstHash(t *torrent, piece int) {
+       p := t.Pieces[piece]
+       if p.EverHashed || p.Hashing || p.QueuedForHash {
+               return
+       }
+       cl.queuePieceCheck(t, pp.Integer(piece))
+}
+
 // Queues the torrent data for the given region for download. The beginning of
 // the region is given highest priority to allow a subsequent read at the same
 // offset to return data ASAP.
@@ -97,9 +106,9 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
        return nil
 }
 
-type dataSpec struct {
-       InfoHash
-       request
+type dataWait struct {
+       offset int64
+       ready  chan struct{}
 }
 
 type Client struct {
@@ -120,8 +129,7 @@ type Client struct {
        handshaking int
        torrents    map[InfoHash]*torrent
 
-       dataWaiterMutex sync.Mutex
-       dataWaiter      chan struct{}
+       dataWaits map[*torrent][]dataWait
 }
 
 func (me *Client) ListenAddr() net.Addr {
@@ -153,6 +161,11 @@ func (cl *Client) WriteStatus(w io.Writer) {
                                return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
                        }
                }())
+               fmt.Fprint(w, "Blocked reads:")
+               for _, dw := range cl.dataWaits[t] {
+                       fmt.Fprintf(w, " %d", dw.offset)
+               }
+               fmt.Fprintln(w)
                t.WriteStatus(w)
                fmt.Fprintln(w)
        }
@@ -223,6 +236,8 @@ func NewClient(cfg *Config) (cl *Client, err error) {
 
                quit:     make(chan struct{}),
                torrents: make(map[InfoHash]*torrent),
+
+               dataWaits: make(map[*torrent][]dataWait),
        }
        cl.event.L = &cl.mu
        cl.mu.Init(2)
@@ -1128,6 +1143,10 @@ func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
        }
        delete(me.torrents, infoHash)
        me.downloadStrategy.TorrentStopped(t)
+       for _, dw := range me.dataWaits[t] {
+               close(dw.ready)
+       }
+       delete(me.dataWaits, t)
        return
 }
 
@@ -1369,7 +1388,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        // Record that we have the chunk.
        delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
-       me.dataReady(dataSpec{t.InfoHash, req})
+       me.dataReady(t, req)
        if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(t, req.Index)
        }
@@ -1390,24 +1409,43 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        return nil
 }
 
-func (cl *Client) dataReady(ds dataSpec) {
-       cl.dataWaiterMutex.Lock()
-       if cl.dataWaiter != nil {
-               close(cl.dataWaiter)
+func (cl *Client) dataReady(t *torrent, r request) {
+       dws := cl.dataWaits[t]
+       begin := t.requestOffset(r)
+       end := begin + int64(r.Length)
+       for i := 0; i < len(dws); {
+               dw := dws[i]
+               if begin <= dw.offset && dw.offset < end {
+                       close(dw.ready)
+                       dws[i] = dws[len(dws)-1]
+                       dws = dws[:len(dws)-1]
+               } else {
+                       i++
+               }
        }
-       cl.dataWaiter = nil
-       cl.dataWaiterMutex.Unlock()
+       cl.dataWaits[t] = dws
 }
 
 // Returns a channel that is closed when new data has become available in the
 // client.
-func (me *Client) DataWaiter() (ret <-chan struct{}) {
-       me.dataWaiterMutex.Lock()
-       if me.dataWaiter == nil {
-               me.dataWaiter = make(chan struct{})
+func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       ch := make(chan struct{})
+       ret = ch
+       t := me.torrents[ih]
+       if t == nil {
+               close(ch)
+               return
+       }
+       if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
+               close(ch)
+               return
        }
-       ret = me.dataWaiter
-       me.dataWaiterMutex.Unlock()
+       me.dataWaits[t] = append(me.dataWaits[t], dataWait{
+               offset: off,
+               ready:  ch,
+       })
        return
 }
 
index 6ac23114a1ba67b5a1de976763321550bc4d894e..19b5f053a7a34b8fcfa71c50bcf5d2243e4a9306 100644 (file)
@@ -533,6 +533,19 @@ func (me *torrent) haveAnyPieces() bool {
        return false
 }
 
+func (t *torrent) havePiece(index int) bool {
+       return t.haveInfo() && t.Pieces[index].Complete()
+}
+
+func (t *torrent) haveChunk(r request) bool {
+       p := t.Pieces[r.Index]
+       if !p.EverHashed {
+               return false
+       }
+       _, ok := p.PendingChunkSpecs[r.chunkSpec]
+       return !ok
+}
+
 func (t *torrent) wantChunk(r request) bool {
        if !t.wantPiece(int(r.Index)) {
                return false