mathRand "math/rand"
"net"
"os"
+ "strings"
"sync"
"syscall"
"time"
go cl.verifyPiece(t, pieceIndex)
}
+func (cl *Client) queueFirstHash(t *torrent, piece int) {
+ p := t.Pieces[piece]
+ if p.EverHashed || p.Hashing || p.QueuedForHash {
+ return
+ }
+ cl.queuePieceCheck(t, pp.Integer(piece))
+}
+
// Queues the torrent data for the given region for download. The beginning of
// the region is given highest priority to allow a subsequent read at the same
// offset to return data ASAP.
return nil
}
-type dataSpec struct {
- InfoHash
- request
+type dataWait struct {
+ offset int64
+ ready chan struct{}
}
type Client struct {
handshaking int
torrents map[InfoHash]*torrent
- dataWaiterMutex sync.Mutex
- dataWaiter chan struct{}
+ dataWaits map[*torrent][]dataWait
}
func (me *Client) ListenAddr() net.Addr {
return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
}
}())
+ fmt.Fprint(w, "Blocked reads:")
+ for _, dw := range cl.dataWaits[t] {
+ fmt.Fprintf(w, " %d", dw.offset)
+ }
+ fmt.Fprintln(w)
t.WriteStatus(w)
fmt.Fprintln(w)
}
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
+
+ dataWaits: make(map[*torrent][]dataWait),
}
cl.event.L = &cl.mu
cl.mu.Init(2)
}
delete(me.torrents, infoHash)
me.downloadStrategy.TorrentStopped(t)
+ for _, dw := range me.dataWaits[t] {
+ close(dw.ready)
+ }
+ delete(me.dataWaits, t)
return
}
// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
- me.dataReady(dataSpec{t.InfoHash, req})
+ me.dataReady(t, req)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}
return nil
}
-func (cl *Client) dataReady(ds dataSpec) {
- cl.dataWaiterMutex.Lock()
- if cl.dataWaiter != nil {
- close(cl.dataWaiter)
+func (cl *Client) dataReady(t *torrent, r request) {
+ dws := cl.dataWaits[t]
+ begin := t.requestOffset(r)
+ end := begin + int64(r.Length)
+ for i := 0; i < len(dws); {
+ dw := dws[i]
+ if begin <= dw.offset && dw.offset < end {
+ close(dw.ready)
+ dws[i] = dws[len(dws)-1]
+ dws = dws[:len(dws)-1]
+ } else {
+ i++
+ }
}
- cl.dataWaiter = nil
- cl.dataWaiterMutex.Unlock()
+ cl.dataWaits[t] = dws
}
// Returns a channel that is closed when new data has become available in the
// client.
-func (me *Client) DataWaiter() (ret <-chan struct{}) {
- me.dataWaiterMutex.Lock()
- if me.dataWaiter == nil {
- me.dataWaiter = make(chan struct{})
+func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ ch := make(chan struct{})
+ ret = ch
+ t := me.torrents[ih]
+ if t == nil {
+ close(ch)
+ return
+ }
+ if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
+ close(ch)
+ return
}
- ret = me.dataWaiter
- me.dataWaiterMutex.Unlock()
+ me.dataWaits[t] = append(me.dataWaits[t], dataWait{
+ offset: off,
+ ready: ch,
+ })
return
}