}
c.quit = make(chan struct{})
if c.DownloadStrategy == nil {
- c.DownloadStrategy = DefaultDownloadStrategy{}
+ c.DownloadStrategy = &DefaultDownloadStrategy{}
}
if c.Listener != nil {
go c.acceptConnections()
me.replenishConnRequests(torrent, conn)
}
+func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
+ ok = cn.Cancel(r)
+ if ok {
+ cl.DownloadStrategy.DeleteRequest(t, r)
+ }
+ return
+}
+
+func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
+ if !cn.RequestPending(r) {
+ return
+ }
+ cl.DownloadStrategy.DeleteRequest(t, r)
+ delete(cn.Requests, r)
+}
+
func (me *Client) connectionLoop(t *torrent, c *connection) error {
decoder := pp.Decoder{
R: bufio.NewReader(c.Socket),
switch msg.Type {
case pp.Choke:
c.PeerChoked = true
- c.Requests = nil
+ for r := range c.Requests {
+ me.connDeleteRequest(t, c, r)
+ }
case pp.Unchoke:
c.PeerChoked = false
me.peerUnchoked(t, c)
func (me *Client) dropConnection(torrent *torrent, conn *connection) {
conn.Socket.Close()
+ for r := range conn.Requests {
+ me.connDeleteRequest(torrent, conn, r)
+ }
for i0, c := range torrent.Conns {
if c != conn {
continue
return torrent.Close()
}
me.torrents[torrent.InfoHash] = torrent
+ me.DownloadStrategy.TorrentStarted(torrent)
if !me.DisableTrackers {
go me.announceTorrent(torrent)
}
type DownloadStrategy interface {
FillRequests(t *torrent, c *connection)
+ TorrentStarted(t *torrent)
+ TorrentStopped(t *torrent)
+ DeleteRequest(t *torrent, r request)
}
-type DefaultDownloadStrategy struct{}
+type DefaultDownloadStrategy struct {
+ heat map[*torrent]map[request]int
+}
-func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) {
- requestHeatMap := torrent.requestHeat()
+func (cl *Client) assertRequestHeat() {
+ dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy)
+ if !ok {
+ return
+ }
+ for _, t := range cl.torrents {
+ m := make(map[request]int, 3000)
+ for _, cn := range t.Conns {
+ for r := range cn.Requests {
+ m[r]++
+ }
+ }
+ for r, h := range dds.heat[t] {
+ if m[r] != h {
+ panic(fmt.Sprintln(m[r], h))
+ }
+ }
+ }
+}
+
+func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ th := s.heat[t]
addRequest := func(req request) (again bool) {
- piece := torrent.Pieces[req.Index]
+ piece := t.Pieces[req.Index]
if piece.Hashing {
// We can't be sure we want this.
return true
// We already have this.
return true
}
- if requestHeatMap[req] > 0 {
- // We've already requested this.
+ if c.RequestPending(req) {
return true
}
- return conn.Request(req)
+ again = c.Request(req)
+ if c.RequestPending(req) {
+ th[req]++
+ }
+ return
}
// First request prioritized chunks.
- for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+ for e := t.Priorities.Front(); e != nil; e = e.Next() {
if !addRequest(e.Value.(request)) {
return
}
}
+ ppbs := t.piecesByPendingBytes()
// Then finish off incomplete pieces in order of bytes remaining.
- for _, index := range torrent.piecesByPendingBytes() {
- for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
- if !addRequest(request{index, chunkSpec}) {
- return
+ for _, heatThreshold := range []int{0, 4, 100} {
+ for _, pieceIndex := range ppbs {
+ for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
+ r := request{pieceIndex, chunkSpec}
+ if th[r] > heatThreshold {
+ continue
+ }
+ if !addRequest(request{pieceIndex, chunkSpec}) {
+ return
+ }
}
}
}
}
+func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
+ if s.heat[t] != nil {
+ panic("torrent already started")
+ }
+ if s.heat == nil {
+ s.heat = make(map[*torrent]map[request]int, 10)
+ }
+ s.heat[t] = make(map[request]int, t.ChunkCount())
+}
+
+func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
+ if _, ok := s.heat[t]; !ok {
+ panic("torrent not yet started")
+ }
+ delete(s.heat, t)
+}
+
+func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
+ m := s.heat[t]
+ if m[r] <= 0 {
+ panic("no pending requests")
+ }
+ m[r]--
+}
+
type ResponsiveDownloadStrategy struct{}
+func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {}
+func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {}
+func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+
func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
for e := t.Priorities.Front(); e != nil; e = e.Next() {
if !c.Request(e.Value.(request)) {
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
// Request has been satisfied.
- delete(c.Requests, req)
+ me.connDeleteRequest(t, c, req)
defer me.replenishConnRequests(t, c)
// Cancel pending requests for this chunk.
for _, c := range t.Conns {
- if c.Cancel(req) {
+ if me.connCancel(t, c, req) {
me.replenishConnRequests(t, c)
}
}