cl.queuePieceCheck(t, pp.Integer(piece))
}
-// Queues the torrent data for the given region for download. The beginning of
-// the region is given highest priority to allow a subsequent read at the same
-// offset to return data ASAP.
-func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
- me.mu.Lock()
- defer me.mu.Unlock()
- t := me.torrent(ih)
- if t == nil {
- return errors.New("no such active torrent")
- }
- if !t.haveInfo() {
- return errors.New("missing metadata")
- }
- firstIndex := int(off / int64(t.UsualPieceSize()))
- for i := 0; i < 5; i++ {
- index := firstIndex + i
- if index >= t.numPieces() {
- continue
- }
- me.queueFirstHash(t, index)
- }
- me.downloadStrategy.TorrentPrioritize(t, off, len_)
- for _, cn := range t.Conns {
- me.replenishConnRequests(t, cn)
- }
- me.openNewConns(t)
- return nil
-}
-
-type dataWait struct {
- offset int64
- ready chan struct{}
-}
-
type Client struct {
noUpload bool
dataDir string
w.WriteString("<missing metainfo>")
}
fmt.Fprint(w, "\n")
- fmt.Fprint(w, "Blocked reads:")
- for _, dw := range cl.dataWaits[t] {
- fmt.Fprintf(w, " %d", dw.offset)
- }
- fmt.Fprintln(w)
t.WriteStatus(w)
fmt.Fprintln(w)
}
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
-
- dataWaits: make(map[*torrent][]dataWait),
}
- // TODO: Write my own UTP library ಠ_ಠ
- // cl.disableUTP = true
cl.event.L = &cl.mu
if !cfg.NoDefaultBlocklist {
}
}
-func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
- me.mu.Lock()
- defer me.mu.Unlock()
- return me.dropTorrent(infoHash)
-}
-
func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
t, ok := me.torrents[infoHash]
if !ok {
}
delete(me.torrents, infoHash)
me.downloadStrategy.TorrentStopped(t)
- for _, dw := range me.dataWaits[t] {
- close(dw.ready)
- }
- delete(me.dataWaits, t)
return
}
// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
- me.dataReady(t, req)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
for _, c := range t.Conns {
c.pieceRequestOrder.DeletePiece(int(req.Index))
return nil
}
-func (cl *Client) dataReady(t *torrent, r request) {
- dws := cl.dataWaits[t]
- begin := t.requestOffset(r)
- end := begin + int64(r.Length)
- for i := 0; i < len(dws); {
- dw := dws[i]
- if begin <= dw.offset && dw.offset < end {
- close(dw.ready)
- dws[i] = dws[len(dws)-1]
- dws = dws[:len(dws)-1]
- } else {
- i++
- }
- }
- cl.dataWaits[t] = dws
-}
-
-// Returns a channel that is closed when new data has become available in the
-// client.
-func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
- me.mu.Lock()
- defer me.mu.Unlock()
- ch := make(chan struct{})
- ret = ch
- t := me.torrents[ih]
- if t == nil {
- close(ch)
- return
- }
- if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
- close(ch)
- return
- }
- me.dataWaits[t] = append(me.dataWaits[t], dataWait{
- offset: off,
- ready: ch,
- })
- return
-}
-
func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
p := t.Pieces[piece]
if p.EverHashed && !correct {
p.PendingChunkSpecs = nil
p.Event.Broadcast()
me.downloadStrategy.TorrentGotPiece(t, int(piece))
- me.dataReady(t, request{
- pp.Integer(piece),
- chunkSpec{0, pp.Integer(t.PieceLength(piece))},
- })
} else {
if len(p.PendingChunkSpecs) == 0 {
t.pendAllChunkSpecs(piece)