From 97ae59dfaf977ca0cad2e4c057ff853fa25c723a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 14 Sep 2014 03:50:15 +1000 Subject: [PATCH] Improve the data waiter system --- client.go | 74 +++++++++++++++++++++++++++++++++++++++++------------- torrent.go | 13 ++++++++++ 2 files changed, 69 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index 1ad69cab..5b161293 100644 --- a/client.go +++ b/client.go @@ -29,6 +29,7 @@ import ( mathRand "math/rand" "net" "os" + "strings" "sync" "syscall" "time" @@ -77,6 +78,14 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { go cl.verifyPiece(t, pieceIndex) } +func (cl *Client) queueFirstHash(t *torrent, piece int) { + p := t.Pieces[piece] + if p.EverHashed || p.Hashing || p.QueuedForHash { + return + } + cl.queuePieceCheck(t, pp.Integer(piece)) +} + // Queues the torrent data for the given region for download. The beginning of // the region is given highest priority to allow a subsequent read at the same // offset to return data ASAP. @@ -97,9 +106,9 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { return nil } -type dataSpec struct { - InfoHash - request +type dataWait struct { + offset int64 + ready chan struct{} } type Client struct { @@ -120,8 +129,7 @@ type Client struct { handshaking int torrents map[InfoHash]*torrent - dataWaiterMutex sync.Mutex - dataWaiter chan struct{} + dataWaits map[*torrent][]dataWait } func (me *Client) ListenAddr() net.Addr { @@ -153,6 +161,11 @@ func (cl *Client) WriteStatus(w io.Writer) { return 100 * (1 - float32(t.BytesLeft())/float32(t.Length())) } }()) + fmt.Fprint(w, "Blocked reads:") + for _, dw := range cl.dataWaits[t] { + fmt.Fprintf(w, " %d", dw.offset) + } + fmt.Fprintln(w) t.WriteStatus(w) fmt.Fprintln(w) } @@ -223,6 +236,8 @@ func NewClient(cfg *Config) (cl *Client, err error) { quit: make(chan struct{}), torrents: make(map[InfoHash]*torrent), + + dataWaits: make(map[*torrent][]dataWait), } cl.event.L = &cl.mu cl.mu.Init(2) @@ -1128,6 +1143,10 @@ func (me *Client) DropTorrent(infoHash InfoHash) (err error) { } delete(me.torrents, infoHash) me.downloadStrategy.TorrentStopped(t) + for _, dw := range me.dataWaits[t] { + close(dw.ready) + } + delete(me.dataWaits, t) return } @@ -1369,7 +1388,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // Record that we have the chunk. delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) - me.dataReady(dataSpec{t.InfoHash, req}) + me.dataReady(t, req) if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(t, req.Index) } @@ -1390,24 +1409,43 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er return nil } -func (cl *Client) dataReady(ds dataSpec) { - cl.dataWaiterMutex.Lock() - if cl.dataWaiter != nil { - close(cl.dataWaiter) +func (cl *Client) dataReady(t *torrent, r request) { + dws := cl.dataWaits[t] + begin := t.requestOffset(r) + end := begin + int64(r.Length) + for i := 0; i < len(dws); { + dw := dws[i] + if begin <= dw.offset && dw.offset < end { + close(dw.ready) + dws[i] = dws[len(dws)-1] + dws = dws[:len(dws)-1] + } else { + i++ + } } - cl.dataWaiter = nil - cl.dataWaiterMutex.Unlock() + cl.dataWaits[t] = dws } // Returns a channel that is closed when new data has become available in the // client. -func (me *Client) DataWaiter() (ret <-chan struct{}) { - me.dataWaiterMutex.Lock() - if me.dataWaiter == nil { - me.dataWaiter = make(chan struct{}) +func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) { + me.mu.Lock() + defer me.mu.Unlock() + ch := make(chan struct{}) + ret = ch + t := me.torrents[ih] + if t == nil { + close(ch) + return + } + if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) { + close(ch) + return } - ret = me.dataWaiter - me.dataWaiterMutex.Unlock() + me.dataWaits[t] = append(me.dataWaits[t], dataWait{ + offset: off, + ready: ch, + }) return } diff --git a/torrent.go b/torrent.go index 6ac23114..19b5f053 100644 --- a/torrent.go +++ b/torrent.go @@ -533,6 +533,19 @@ func (me *torrent) haveAnyPieces() bool { return false } +func (t *torrent) havePiece(index int) bool { + return t.haveInfo() && t.Pieces[index].Complete() +} + +func (t *torrent) haveChunk(r request) bool { + p := t.Pieces[r.Index] + if !p.EverHashed { + return false + } + _, ok := p.PendingChunkSpecs[r.chunkSpec] + return !ok +} + func (t *torrent) wantChunk(r request) bool { if !t.wantPiece(int(r.Index)) { return false -- 2.48.1