]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Add comments
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "math"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/dht/v2"
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22         "github.com/pkg/errors"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/mse"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27 )
28
29 type peerSource string
30
31 const (
32         peerSourceTracker         = "Tr"
33         peerSourceIncoming        = "I"
34         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
35         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
36         peerSourcePEX             = "X"
37 )
38
39 // Maintains the state of a connection with a peer.
40 type connection struct {
41         // First to ensure 64-bit alignment for atomics. See #262.
42         stats ConnStats
43
44         t *Torrent
45         // The actual Conn, used for closing, and setting socket options.
46         conn       net.Conn
47         outgoing   bool
48         network    string
49         remoteAddr IpPort
50         // The Reader and Writer for this Conn, with hooks installed for stats,
51         // limiting, deadlines etc.
52         w io.Writer
53         r io.Reader
54         // True if the connection is operating over MSE obfuscation.
55         headerEncrypted bool
56         cryptoMethod    mse.CryptoMethod
57         Discovery       peerSource
58         closed          missinggo.Event
59         // Set true after we've added our ConnStats generated during handshake to
60         // other ConnStat instances as determined when the *Torrent became known.
61         reconciledHandshakeStats bool
62
63         lastMessageReceived     time.Time
64         completedHandshake      time.Time
65         lastUsefulChunkReceived time.Time
66         lastChunkSent           time.Time
67
68         // Stuff controlled by the local peer.
69         Interested           bool
70         lastBecameInterested time.Time
71         priorInterest        time.Duration
72
73         lastStartedExpectingToReceiveChunks time.Time
74         cumulativeExpectedToReceiveChunks   time.Duration
75         chunksReceivedWhileExpecting        int64
76
77         Choked           bool
78         requests         map[request]struct{}
79         requestsLowWater int
80         // Chunks that we might reasonably expect to receive from the peer. Due to
81         // latency, buffering, and implementation differences, we may receive
82         // chunks that are no longer in the set of requests actually want.
83         validReceiveChunks map[request]struct{}
84         // Indexed by metadata piece, set to true if posted and pending a
85         // response.
86         metadataRequests []bool
87         sentHaves        bitmap.Bitmap
88
89         // Stuff controlled by the remote peer.
90         PeerID             PeerID
91         PeerInterested     bool
92         PeerChoked         bool
93         PeerRequests       map[request]struct{}
94         PeerExtensionBytes pp.PeerExtensionBits
95         // The pieces the peer has claimed to have.
96         peerPieces bitmap.Bitmap
97         // The peer has everything. This can occur due to a special message, when
98         // we may not even know the number of pieces in the torrent yet.
99         peerSentHaveAll bool
100         // The highest possible number of pieces the torrent could have based on
101         // communication with the peer. Generally only useful until we have the
102         // torrent info.
103         peerMinPieces pieceIndex
104         // Pieces we've accepted chunks for from the peer.
105         peerTouchedPieces map[pieceIndex]struct{}
106         peerAllowedFast   bitmap.Bitmap
107
108         PeerMaxRequests  int // Maximum pending requests the peer allows.
109         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
110         PeerClientName   string
111
112         pieceInclination  []int
113         pieceRequestOrder prioritybitmap.PriorityBitmap
114
115         writeBuffer *bytes.Buffer
116         uploadTimer *time.Timer
117         writerCond  sync.Cond
118 }
119
120 func (cn *connection) updateExpectingChunks() {
121         if cn.expectingChunks() {
122                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
123                         cn.lastStartedExpectingToReceiveChunks = time.Now()
124                 }
125         } else {
126                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
127                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
128                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
129                 }
130         }
131 }
132
133 func (cn *connection) expectingChunks() bool {
134         return cn.Interested && !cn.PeerChoked
135 }
136
137 // Returns true if the connection is over IPv6.
138 func (cn *connection) ipv6() bool {
139         ip := cn.remoteAddr.IP
140         if ip.To4() != nil {
141                 return false
142         }
143         return len(ip) == net.IPv6len
144 }
145
146 // Returns true the dialer has the lower client peer ID. TODO: Find the
147 // specification for this.
148 func (cn *connection) isPreferredDirection() bool {
149         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
150 }
151
152 // Returns whether the left connection should be preferred over the right one,
153 // considering only their networking properties. If ok is false, we can't
154 // decide.
155 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
156         var ml multiLess
157         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
158         ml.NextBool(!l.utp(), !r.utp())
159         ml.NextBool(l.ipv6(), r.ipv6())
160         return ml.FinalOk()
161 }
162
163 func (cn *connection) cumInterest() time.Duration {
164         ret := cn.priorInterest
165         if cn.Interested {
166                 ret += time.Since(cn.lastBecameInterested)
167         }
168         return ret
169 }
170
171 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
172         if cn.peerSentHaveAll {
173                 return true, true
174         }
175         if !cn.t.haveInfo() {
176                 return false, false
177         }
178         return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
179 }
180
181 func (cn *connection) mu() sync.Locker {
182         return cn.t.cl.locker()
183 }
184
185 func (cn *connection) localAddr() net.Addr {
186         return cn.conn.LocalAddr()
187 }
188
189 func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
190         _, ok := cn.PeerExtensionIDs[ext]
191         return ok
192 }
193
194 // The best guess at number of pieces in the torrent for this peer.
195 func (cn *connection) bestPeerNumPieces() pieceIndex {
196         if cn.t.haveInfo() {
197                 return cn.t.numPieces()
198         }
199         return cn.peerMinPieces
200 }
201
202 func (cn *connection) completedString() string {
203         have := pieceIndex(cn.peerPieces.Len())
204         if cn.peerSentHaveAll {
205                 have = cn.bestPeerNumPieces()
206         }
207         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
208 }
209
210 // Correct the PeerPieces slice length. Return false if the existing slice is
211 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
212 // messages.
213 func (cn *connection) setNumPieces(num pieceIndex) error {
214         cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
215         cn.peerPiecesChanged()
216         return nil
217 }
218
219 func eventAgeString(t time.Time) string {
220         if t.IsZero() {
221                 return "never"
222         }
223         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
224 }
225
226 func (cn *connection) connectionFlags() (ret string) {
227         c := func(b byte) {
228                 ret += string([]byte{b})
229         }
230         if cn.cryptoMethod == mse.CryptoMethodRC4 {
231                 c('E')
232         } else if cn.headerEncrypted {
233                 c('e')
234         }
235         ret += string(cn.Discovery)
236         if cn.utp() {
237                 c('U')
238         }
239         return
240 }
241
242 func (cn *connection) utp() bool {
243         return parseNetworkString(cn.network).Udp
244 }
245
246 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
247 func (cn *connection) statusFlags() (ret string) {
248         c := func(b byte) {
249                 ret += string([]byte{b})
250         }
251         if cn.Interested {
252                 c('i')
253         }
254         if cn.Choked {
255                 c('c')
256         }
257         c('-')
258         ret += cn.connectionFlags()
259         c('-')
260         if cn.PeerInterested {
261                 c('i')
262         }
263         if cn.PeerChoked {
264                 c('c')
265         }
266         return
267 }
268
269 // func (cn *connection) String() string {
270 //      var buf bytes.Buffer
271 //      cn.WriteStatus(&buf, nil)
272 //      return buf.String()
273 // }
274
275 func (cn *connection) downloadRate() float64 {
276         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
277 }
278
279 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
280         // \t isn't preserved in <pre> blocks?
281         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
282         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
283                 eventAgeString(cn.lastMessageReceived),
284                 eventAgeString(cn.completedHandshake),
285                 eventAgeString(cn.lastHelpful()),
286                 cn.cumInterest(),
287                 cn.totalExpectingTime(),
288         )
289         fmt.Fprintf(w,
290                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
291                 cn.completedString(),
292                 len(cn.peerTouchedPieces),
293                 &cn.stats.ChunksReadUseful,
294                 &cn.stats.ChunksRead,
295                 &cn.stats.ChunksWritten,
296                 cn.requestsLowWater,
297                 cn.numLocalRequests(),
298                 cn.nominalMaxRequests(),
299                 len(cn.PeerRequests),
300                 cn.statusFlags(),
301                 cn.downloadRate()/(1<<10),
302         )
303         fmt.Fprintf(w, "    next pieces: %v%s\n",
304                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
305                 func() string {
306                         if cn.shouldRequestWithoutBias() {
307                                 return " (fastest)"
308                         } else {
309                                 return ""
310                         }
311                 }())
312 }
313
314 func (cn *connection) Close() {
315         if !cn.closed.Set() {
316                 return
317         }
318         cn.tickleWriter()
319         cn.discardPieceInclination()
320         cn.pieceRequestOrder.Clear()
321         if cn.conn != nil {
322                 go cn.conn.Close()
323         }
324 }
325
326 func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
327         return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
328 }
329
330 // Writes a message into the write buffer.
331 func (cn *connection) Post(msg pp.Message) {
332         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
333         // We don't need to track bytes here because a connection.w Writer wrapper
334         // takes care of that (although there's some delay between us recording
335         // the message, and the connection writer flushing it out.).
336         cn.writeBuffer.Write(msg.MustMarshalBinary())
337         // Last I checked only Piece messages affect stats, and we don't post
338         // those.
339         cn.wroteMsg(&msg)
340         cn.tickleWriter()
341 }
342
343 func (cn *connection) requestMetadataPiece(index int) {
344         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
345         if eID == 0 {
346                 return
347         }
348         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
349                 return
350         }
351         cn.Post(pp.Message{
352                 Type:       pp.Extended,
353                 ExtendedID: eID,
354                 ExtendedPayload: func() []byte {
355                         b, err := bencode.Marshal(map[string]int{
356                                 "msg_type": pp.RequestMetadataExtensionMsgType,
357                                 "piece":    index,
358                         })
359                         if err != nil {
360                                 panic(err)
361                         }
362                         return b
363                 }(),
364         })
365         for index >= len(cn.metadataRequests) {
366                 cn.metadataRequests = append(cn.metadataRequests, false)
367         }
368         cn.metadataRequests[index] = true
369 }
370
371 func (cn *connection) requestedMetadataPiece(index int) bool {
372         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
373 }
374
375 // The actual value to use as the maximum outbound requests.
376 func (cn *connection) nominalMaxRequests() (ret int) {
377         if cn.t.requestStrategy == 3 {
378                 expectingTime := int64(cn.totalExpectingTime())
379                 if expectingTime == 0 {
380                         expectingTime = math.MaxInt64
381                 } else {
382                         expectingTime *= 2
383                 }
384                 return int(clamp(
385                         1,
386                         int64(cn.PeerMaxRequests),
387                         max(
388                                 // It makes sense to always pipeline at least one connection,
389                                 // since latency must be non-zero.
390                                 2,
391                                 // Request only as many as we expect to receive in the
392                                 // dupliateRequestTimeout window. We are trying to avoid having to
393                                 // duplicate requests.
394                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
395                         ),
396                 ))
397         }
398         return int(clamp(
399                 1,
400                 int64(cn.PeerMaxRequests),
401                 max(64,
402                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
403 }
404
405 func (cn *connection) totalExpectingTime() (ret time.Duration) {
406         ret = cn.cumulativeExpectedToReceiveChunks
407         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
408                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
409         }
410         return
411
412 }
413
414 func (cn *connection) onPeerSentCancel(r request) {
415         if _, ok := cn.PeerRequests[r]; !ok {
416                 torrent.Add("unexpected cancels received", 1)
417                 return
418         }
419         if cn.fastEnabled() {
420                 cn.reject(r)
421         } else {
422                 delete(cn.PeerRequests, r)
423         }
424 }
425
426 func (cn *connection) Choke(msg messageWriter) (more bool) {
427         if cn.Choked {
428                 return true
429         }
430         cn.Choked = true
431         more = msg(pp.Message{
432                 Type: pp.Choke,
433         })
434         if cn.fastEnabled() {
435                 for r := range cn.PeerRequests {
436                         // TODO: Don't reject pieces in allowed fast set.
437                         cn.reject(r)
438                 }
439         } else {
440                 cn.PeerRequests = nil
441         }
442         return
443 }
444
445 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
446         if !cn.Choked {
447                 return true
448         }
449         cn.Choked = false
450         return msg(pp.Message{
451                 Type: pp.Unchoke,
452         })
453 }
454
455 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
456         if cn.Interested == interested {
457                 return true
458         }
459         cn.Interested = interested
460         if interested {
461                 cn.lastBecameInterested = time.Now()
462         } else if !cn.lastBecameInterested.IsZero() {
463                 cn.priorInterest += time.Since(cn.lastBecameInterested)
464         }
465         cn.updateExpectingChunks()
466         // log.Printf("%p: setting interest: %v", cn, interested)
467         return msg(pp.Message{
468                 Type: func() pp.MessageType {
469                         if interested {
470                                 return pp.Interested
471                         } else {
472                                 return pp.NotInterested
473                         }
474                 }(),
475         })
476 }
477
478 // The function takes a message to be sent, and returns true if more messages
479 // are okay.
480 type messageWriter func(pp.Message) bool
481
482 // Proxies the messageWriter's response.
483 func (cn *connection) request(r request, mw messageWriter) bool {
484         if _, ok := cn.requests[r]; ok {
485                 panic("chunk already requested")
486         }
487         if !cn.PeerHasPiece(pieceIndex(r.Index)) {
488                 panic("requesting piece peer doesn't have")
489         }
490         if _, ok := cn.t.conns[cn]; !ok {
491                 panic("requesting but not in active conns")
492         }
493         if cn.closed.IsSet() {
494                 panic("requesting when connection is closed")
495         }
496         if cn.PeerChoked {
497                 if cn.peerAllowedFast.Get(int(r.Index)) {
498                         torrent.Add("allowed fast requests sent", 1)
499                 } else {
500                         panic("requesting while choked and not allowed fast")
501                 }
502         }
503         if cn.t.hashingPiece(pieceIndex(r.Index)) {
504                 panic("piece is being hashed")
505         }
506         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
507                 panic("piece is queued for hash")
508         }
509         if cn.requests == nil {
510                 cn.requests = make(map[request]struct{})
511         }
512         cn.requests[r] = struct{}{}
513         if cn.validReceiveChunks == nil {
514                 cn.validReceiveChunks = make(map[request]struct{})
515         }
516         cn.validReceiveChunks[r] = struct{}{}
517         cn.t.pendingRequests[r]++
518         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
519                 torrent.Add("duplicate request timeouts", 1)
520                 cn.mu().Lock()
521                 defer cn.mu().Unlock()
522                 delete(cn.t.lastRequested, r)
523                 for cn := range cn.t.conns {
524                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
525                                 cn.updateRequests()
526                         }
527                 }
528         })
529         cn.updateExpectingChunks()
530         return mw(pp.Message{
531                 Type:   pp.Request,
532                 Index:  r.Index,
533                 Begin:  r.Begin,
534                 Length: r.Length,
535         })
536 }
537
538 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
539         if !cn.t.networkingEnabled {
540                 if !cn.SetInterested(false, msg) {
541                         return
542                 }
543                 if len(cn.requests) != 0 {
544                         for r := range cn.requests {
545                                 cn.deleteRequest(r)
546                                 // log.Printf("%p: cancelling request: %v", cn, r)
547                                 if !msg(makeCancelMessage(r)) {
548                                         return
549                                 }
550                         }
551                 }
552         }
553         if len(cn.requests) <= cn.requestsLowWater {
554                 filledBuffer := false
555                 cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
556                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
557                                 if !cn.SetInterested(true, msg) {
558                                         filledBuffer = true
559                                         return false
560                                 }
561                                 if len(cn.requests) >= cn.nominalMaxRequests() {
562                                         return false
563                                 }
564                                 // Choking is looked at here because our interest is dependent
565                                 // on whether we'd make requests in its absence.
566                                 if cn.PeerChoked {
567                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
568                                                 return false
569                                         }
570                                 }
571                                 if _, ok := cn.requests[r]; ok {
572                                         return true
573                                 }
574                                 filledBuffer = !cn.request(r, msg)
575                                 return !filledBuffer
576                         })
577                         return !filledBuffer
578                 })
579                 if filledBuffer {
580                         // If we didn't completely top up the requests, we shouldn't mark
581                         // the low water, since we'll want to top up the requests as soon
582                         // as we have more write buffer space.
583                         return
584                 }
585                 cn.requestsLowWater = len(cn.requests) / 2
586         }
587
588         cn.upload(msg)
589 }
590
591 // Routine that writes to the peer. Some of what to write is buffered by
592 // activity elsewhere in the Client, and some is determined locally when the
593 // connection is writable.
594 func (cn *connection) writer(keepAliveTimeout time.Duration) {
595         var (
596                 lastWrite      time.Time = time.Now()
597                 keepAliveTimer *time.Timer
598         )
599         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
600                 cn.mu().Lock()
601                 defer cn.mu().Unlock()
602                 if time.Since(lastWrite) >= keepAliveTimeout {
603                         cn.tickleWriter()
604                 }
605                 keepAliveTimer.Reset(keepAliveTimeout)
606         })
607         cn.mu().Lock()
608         defer cn.mu().Unlock()
609         defer cn.Close()
610         defer keepAliveTimer.Stop()
611         frontBuf := new(bytes.Buffer)
612         for {
613                 if cn.closed.IsSet() {
614                         return
615                 }
616                 if cn.writeBuffer.Len() == 0 {
617                         cn.fillWriteBuffer(func(msg pp.Message) bool {
618                                 cn.wroteMsg(&msg)
619                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
620                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
621                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
622                         })
623                 }
624                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
625                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
626                         postedKeepalives.Add(1)
627                 }
628                 if cn.writeBuffer.Len() == 0 {
629                         // TODO: Minimize wakeups....
630                         cn.writerCond.Wait()
631                         continue
632                 }
633                 // Flip the buffers.
634                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
635                 cn.mu().Unlock()
636                 n, err := cn.w.Write(frontBuf.Bytes())
637                 cn.mu().Lock()
638                 if n != 0 {
639                         lastWrite = time.Now()
640                         keepAliveTimer.Reset(keepAliveTimeout)
641                 }
642                 if err != nil {
643                         return
644                 }
645                 if n != frontBuf.Len() {
646                         panic("short write")
647                 }
648                 frontBuf.Reset()
649         }
650 }
651
652 func (cn *connection) Have(piece pieceIndex) {
653         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
654                 return
655         }
656         cn.Post(pp.Message{
657                 Type:  pp.Have,
658                 Index: pp.Integer(piece),
659         })
660         cn.sentHaves.Add(bitmap.BitIndex(piece))
661 }
662
663 func (cn *connection) PostBitfield() {
664         if cn.sentHaves.Len() != 0 {
665                 panic("bitfield must be first have-related message sent")
666         }
667         if !cn.t.haveAnyPieces() {
668                 return
669         }
670         cn.Post(pp.Message{
671                 Type:     pp.Bitfield,
672                 Bitfield: cn.t.bitfield(),
673         })
674         cn.sentHaves = cn.t.completedPieces.Copy()
675 }
676
677 func (cn *connection) updateRequests() {
678         // log.Print("update requests")
679         cn.tickleWriter()
680 }
681
682 // Emits the indices in the Bitmaps bms in order, never repeating any index.
683 // skip is mutated during execution, and its initial values will never be
684 // emitted.
685 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
686         return func(cb iter.Callback) {
687                 for _, bm := range bms {
688                         if !iter.All(func(i interface{}) bool {
689                                 skip.Add(i.(int))
690                                 return cb(i)
691                         }, bitmap.Sub(bm, *skip).Iter) {
692                                 return
693                         }
694                 }
695         }
696 }
697
698 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
699         now, readahead := cn.t.readerPiecePriorities()
700         var skip bitmap.Bitmap
701         if !cn.peerSentHaveAll {
702                 // Pieces to skip include pieces the peer doesn't have.
703                 skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
704         }
705         // And pieces that we already have.
706         skip.Union(cn.t.completedPieces)
707         skip.Union(cn.t.piecesQueuedForHash)
708         // Return an iterator over the different priority classes, minus the skip
709         // pieces.
710         return iter.All(
711                 func(_piece interface{}) bool {
712                         i := _piece.(bitmap.BitIndex)
713                         if cn.t.hashingPiece(pieceIndex(i)) {
714                                 return true
715                         }
716                         return f(pieceIndex(i))
717                 },
718                 iterBitmapsDistinct(&skip, now, readahead),
719                 func(cb iter.Callback) {
720                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
721                                 if skip.Contains(piece) {
722                                         return true
723                                 }
724                                 more := cb(piece)
725                                 skip.Add(piece)
726                                 return more
727                         })
728                 },
729         )
730 }
731
732 // The connection should download highest priority pieces first, without any
733 // inclination toward avoiding wastage. Generally we might do this if there's
734 // a single connection, or this is the fastest connection, and we have active
735 // readers that signal an ordering preference. It's conceivable that the best
736 // connection should do this, since it's least likely to waste our time if
737 // assigned to the highest priority pieces, and assigning more than one this
738 // role would cause significant wasted bandwidth.
739 func (cn *connection) shouldRequestWithoutBias() bool {
740         if cn.t.requestStrategy != 2 {
741                 return false
742         }
743         if len(cn.t.readers) == 0 {
744                 return false
745         }
746         if len(cn.t.conns) == 1 {
747                 return true
748         }
749         if cn == cn.t.fastestConn {
750                 return true
751         }
752         return false
753 }
754
755 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
756         if !cn.t.haveInfo() {
757                 return false
758         }
759         if cn.t.requestStrategy == 3 {
760                 return cn.iterUnbiasedPieceRequestOrder(f)
761         }
762         if cn.shouldRequestWithoutBias() {
763                 return cn.iterUnbiasedPieceRequestOrder(f)
764         } else {
765                 return cn.pieceRequestOrder.IterTyped(func(i int) bool {
766                         return f(pieceIndex(i))
767                 })
768         }
769 }
770
771 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
772         cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
773 }
774
775 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
776         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
777                 r := request{pp.Integer(piece), cs}
778                 if cn.t.requestStrategy == 3 {
779                         if _, ok := cn.t.lastRequested[r]; ok {
780                                 // This piece has been requested on another connection, and
781                                 // the duplicate request timer is still running.
782                                 return true
783                         }
784                 }
785                 return f(r)
786         })
787 }
788
789 func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
790         p := &t.pieces[piece]
791         if t.requestStrategy == 3 {
792                 for i := pp.Integer(0); i < p.numChunks(); i++ {
793                         if !p.dirtyChunks.Get(bitmap.BitIndex(i)) {
794                                 if !f(t.chunkIndexSpec(i, piece)) {
795                                         return false
796                                 }
797                         }
798                 }
799                 return true
800         }
801         chunkIndices := t.pieces[piece].undirtiedChunkIndices()
802         return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
803                 ci, err := chunkIndices.RB.Select(uint32(i))
804                 if err != nil {
805                         panic(err)
806                 }
807                 return f(t.chunkIndexSpec(pp.Integer(ci), piece))
808         })
809 }
810
811 // check callers updaterequests
812 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
813         return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
814 }
815
816 // This is distinct from Torrent piece priority, which is the user's
817 // preference. Connection piece priority is specific to a connection and is
818 // used to pseudorandomly avoid connections always requesting the same pieces
819 // and thus wasting effort.
820 func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
821         tpp := cn.t.piecePriority(piece)
822         if !cn.PeerHasPiece(piece) {
823                 tpp = PiecePriorityNone
824         }
825         if tpp == PiecePriorityNone {
826                 return cn.stopRequestingPiece(piece)
827         }
828         prio := cn.getPieceInclination()[piece]
829         switch cn.t.requestStrategy {
830         case 1:
831                 switch tpp {
832                 case PiecePriorityNormal:
833                 case PiecePriorityReadahead:
834                         prio -= int(cn.t.numPieces())
835                 case PiecePriorityNext, PiecePriorityNow:
836                         prio -= 2 * int(cn.t.numPieces())
837                 default:
838                         panic(tpp)
839                 }
840                 prio += int(piece / 3)
841         default:
842         }
843         return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
844 }
845
846 func (cn *connection) getPieceInclination() []int {
847         if cn.pieceInclination == nil {
848                 cn.pieceInclination = cn.t.getConnPieceInclination()
849         }
850         return cn.pieceInclination
851 }
852
853 func (cn *connection) discardPieceInclination() {
854         if cn.pieceInclination == nil {
855                 return
856         }
857         cn.t.putPieceInclination(cn.pieceInclination)
858         cn.pieceInclination = nil
859 }
860
861 func (cn *connection) peerPiecesChanged() {
862         if cn.t.haveInfo() {
863                 prioritiesChanged := false
864                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
865                         if cn.updatePiecePriority(i) {
866                                 prioritiesChanged = true
867                         }
868                 }
869                 if prioritiesChanged {
870                         cn.updateRequests()
871                 }
872         }
873 }
874
875 func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
876         if newMin > cn.peerMinPieces {
877                 cn.peerMinPieces = newMin
878         }
879 }
880
881 func (cn *connection) peerSentHave(piece pieceIndex) error {
882         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
883                 return errors.New("invalid piece")
884         }
885         if cn.PeerHasPiece(piece) {
886                 return nil
887         }
888         cn.raisePeerMinPieces(piece + 1)
889         cn.peerPieces.Set(bitmap.BitIndex(piece), true)
890         if cn.updatePiecePriority(piece) {
891                 cn.updateRequests()
892         }
893         return nil
894 }
895
896 func (cn *connection) peerSentBitfield(bf []bool) error {
897         cn.peerSentHaveAll = false
898         if len(bf)%8 != 0 {
899                 panic("expected bitfield length divisible by 8")
900         }
901         // We know that the last byte means that at most the last 7 bits are
902         // wasted.
903         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
904         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
905                 // Ignore known excess pieces.
906                 bf = bf[:cn.t.numPieces()]
907         }
908         for i, have := range bf {
909                 if have {
910                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
911                 }
912                 cn.peerPieces.Set(i, have)
913         }
914         cn.peerPiecesChanged()
915         return nil
916 }
917
918 func (cn *connection) onPeerSentHaveAll() error {
919         cn.peerSentHaveAll = true
920         cn.peerPieces.Clear()
921         cn.peerPiecesChanged()
922         return nil
923 }
924
925 func (cn *connection) peerSentHaveNone() error {
926         cn.peerPieces.Clear()
927         cn.peerSentHaveAll = false
928         cn.peerPiecesChanged()
929         return nil
930 }
931
932 func (c *connection) requestPendingMetadata() {
933         if c.t.haveInfo() {
934                 return
935         }
936         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
937                 // Peer doesn't support this.
938                 return
939         }
940         // Request metadata pieces that we don't have in a random order.
941         var pending []int
942         for index := 0; index < c.t.metadataPieceCount(); index++ {
943                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
944                         pending = append(pending, index)
945                 }
946         }
947         for _, i := range rand.Perm(len(pending)) {
948                 c.requestMetadataPiece(pending[i])
949         }
950 }
951
952 func (cn *connection) wroteMsg(msg *pp.Message) {
953         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
954         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
955 }
956
957 func (cn *connection) readMsg(msg *pp.Message) {
958         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
959 }
960
961 // After handshake, we know what Torrent and Client stats to include for a
962 // connection.
963 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
964         t := cn.t
965         f(&t.stats)
966         f(&t.cl.stats)
967 }
968
969 // All ConnStats that include this connection. Some objects are not known
970 // until the handshake is complete, after which it's expected to reconcile the
971 // differences.
972 func (cn *connection) allStats(f func(*ConnStats)) {
973         f(&cn.stats)
974         if cn.reconciledHandshakeStats {
975                 cn.postHandshakeStats(f)
976         }
977 }
978
979 func (cn *connection) wroteBytes(n int64) {
980         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
981 }
982
983 func (cn *connection) readBytes(n int64) {
984         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
985 }
986
987 // Returns whether the connection could be useful to us. We're seeding and
988 // they want data, we don't have metainfo and they can provide it, etc.
989 func (c *connection) useful() bool {
990         t := c.t
991         if c.closed.IsSet() {
992                 return false
993         }
994         if !t.haveInfo() {
995                 return c.supportsExtension("ut_metadata")
996         }
997         if t.seeding() && c.PeerInterested {
998                 return true
999         }
1000         if c.peerHasWantedPieces() {
1001                 return true
1002         }
1003         return false
1004 }
1005
1006 func (c *connection) lastHelpful() (ret time.Time) {
1007         ret = c.lastUsefulChunkReceived
1008         if c.t.seeding() && c.lastChunkSent.After(ret) {
1009                 ret = c.lastChunkSent
1010         }
1011         return
1012 }
1013
1014 func (c *connection) fastEnabled() bool {
1015         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1016 }
1017
1018 func (c *connection) reject(r request) {
1019         if !c.fastEnabled() {
1020                 panic("fast not enabled")
1021         }
1022         c.Post(r.ToMsg(pp.Reject))
1023         delete(c.PeerRequests, r)
1024 }
1025
1026 func (c *connection) onReadRequest(r request) error {
1027         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1028         if _, ok := c.PeerRequests[r]; ok {
1029                 torrent.Add("duplicate requests received", 1)
1030                 return nil
1031         }
1032         if c.Choked {
1033                 torrent.Add("requests received while choking", 1)
1034                 if c.fastEnabled() {
1035                         torrent.Add("requests rejected while choking", 1)
1036                         c.reject(r)
1037                 }
1038                 return nil
1039         }
1040         if len(c.PeerRequests) >= maxRequests {
1041                 torrent.Add("requests received while queue full", 1)
1042                 if c.fastEnabled() {
1043                         c.reject(r)
1044                 }
1045                 // BEP 6 says we may close here if we choose.
1046                 return nil
1047         }
1048         if !c.t.havePiece(pieceIndex(r.Index)) {
1049                 // This isn't necessarily them screwing up. We can drop pieces
1050                 // from our storage, and can't communicate this to peers
1051                 // except by reconnecting.
1052                 requestsReceivedForMissingPieces.Add(1)
1053                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1054         }
1055         // Check this after we know we have the piece, so that the piece length will be known.
1056         if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
1057                 torrent.Add("bad requests received", 1)
1058                 return errors.New("bad request")
1059         }
1060         if c.PeerRequests == nil {
1061                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1062         }
1063         c.PeerRequests[r] = struct{}{}
1064         c.tickleWriter()
1065         return nil
1066 }
1067
1068 // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
1069 // exit. Returning will end the connection.
1070 func (c *connection) mainReadLoop() (err error) {
1071         defer func() {
1072                 if err != nil {
1073                         torrent.Add("connection.mainReadLoop returned with error", 1)
1074                 } else {
1075                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1076                 }
1077         }()
1078         t := c.t
1079         cl := t.cl
1080
1081         decoder := pp.Decoder{
1082                 R:         bufio.NewReaderSize(c.r, 1<<17),
1083                 MaxLength: 256 * 1024,
1084                 Pool:      t.chunkPool,
1085         }
1086         for {
1087                 var msg pp.Message
1088                 func() {
1089                         cl.unlock()
1090                         defer cl.lock()
1091                         err = decoder.Decode(&msg)
1092                 }()
1093                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1094                         return nil
1095                 }
1096                 if err != nil {
1097                         return err
1098                 }
1099                 c.readMsg(&msg)
1100                 c.lastMessageReceived = time.Now()
1101                 if msg.Keepalive {
1102                         receivedKeepalives.Add(1)
1103                         continue
1104                 }
1105                 messageTypesReceived.Add(msg.Type.String(), 1)
1106                 if msg.Type.FastExtension() && !c.fastEnabled() {
1107                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1108                 }
1109                 switch msg.Type {
1110                 case pp.Choke:
1111                         c.PeerChoked = true
1112                         c.deleteAllRequests()
1113                         // We can then reset our interest.
1114                         c.updateRequests()
1115                         c.updateExpectingChunks()
1116                 case pp.Unchoke:
1117                         c.PeerChoked = false
1118                         c.tickleWriter()
1119                         c.updateExpectingChunks()
1120                 case pp.Interested:
1121                         c.PeerInterested = true
1122                         c.tickleWriter()
1123                 case pp.NotInterested:
1124                         c.PeerInterested = false
1125                         // We don't clear their requests since it isn't clear in the spec.
1126                         // We'll probably choke them for this, which will clear them if
1127                         // appropriate, and is clearly specified.
1128                 case pp.Have:
1129                         err = c.peerSentHave(pieceIndex(msg.Index))
1130                 case pp.Bitfield:
1131                         err = c.peerSentBitfield(msg.Bitfield)
1132                 case pp.Request:
1133                         r := newRequestFromMessage(&msg)
1134                         err = c.onReadRequest(r)
1135                 case pp.Piece:
1136                         err = c.receiveChunk(&msg)
1137                         if len(msg.Piece) == int(t.chunkSize) {
1138                                 t.chunkPool.Put(&msg.Piece)
1139                         }
1140                         if err != nil {
1141                                 err = fmt.Errorf("receiving chunk: %s", err)
1142                         }
1143                 case pp.Cancel:
1144                         req := newRequestFromMessage(&msg)
1145                         c.onPeerSentCancel(req)
1146                 case pp.Port:
1147                         pingAddr := net.UDPAddr{
1148                                 IP:   c.remoteAddr.IP,
1149                                 Port: int(c.remoteAddr.Port),
1150                         }
1151                         if msg.Port != 0 {
1152                                 pingAddr.Port = int(msg.Port)
1153                         }
1154                         cl.eachDhtServer(func(s *dht.Server) {
1155                                 go s.Ping(&pingAddr, nil)
1156                         })
1157                 case pp.Suggest:
1158                         torrent.Add("suggests received", 1)
1159                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1160                         c.updateRequests()
1161                 case pp.HaveAll:
1162                         err = c.onPeerSentHaveAll()
1163                 case pp.HaveNone:
1164                         err = c.peerSentHaveNone()
1165                 case pp.Reject:
1166                         c.deleteRequest(newRequestFromMessage(&msg))
1167                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1168                 case pp.AllowedFast:
1169                         torrent.Add("allowed fasts received", 1)
1170                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1171                         c.peerAllowedFast.Add(int(msg.Index))
1172                         c.updateRequests()
1173                 case pp.Extended:
1174                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1175                 default:
1176                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1177                 }
1178                 if err != nil {
1179                         return err
1180                 }
1181         }
1182 }
1183
1184 func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1185         defer func() {
1186                 // TODO: Should we still do this?
1187                 if err != nil {
1188                         // These clients use their own extension IDs for outgoing message
1189                         // types, which is incorrect.
1190                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1191                                 err = nil
1192                         }
1193                 }
1194         }()
1195         t := c.t
1196         cl := t.cl
1197         switch id {
1198         case pp.HandshakeExtendedID:
1199                 var d pp.ExtendedHandshakeMessage
1200                 if err := bencode.Unmarshal(payload, &d); err != nil {
1201                         c.t.logger.Printf("error parsing extended handshake message %q: %s", payload, err)
1202                         return errors.Wrap(err, "unmarshalling extended handshake payload")
1203                 }
1204                 if d.Reqq != 0 {
1205                         c.PeerMaxRequests = d.Reqq
1206                 }
1207                 c.PeerClientName = d.V
1208                 if c.PeerExtensionIDs == nil {
1209                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1210                 }
1211                 for name, id := range d.M {
1212                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1213                                 torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
1214                         }
1215                         c.PeerExtensionIDs[name] = id
1216                 }
1217                 if d.MetadataSize != 0 {
1218                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1219                                 return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
1220                         }
1221                 }
1222                 if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok {
1223                         c.requestPendingMetadata()
1224                 }
1225                 return nil
1226         case metadataExtendedId:
1227                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1228                 if err != nil {
1229                         return fmt.Errorf("error handling metadata extension message: %s", err)
1230                 }
1231                 return nil
1232         case pexExtendedId:
1233                 if cl.config.DisablePEX {
1234                         // TODO: Maybe close the connection. Check that we're not
1235                         // advertising that we support PEX if it's disabled.
1236                         return nil
1237                 }
1238                 var pexMsg pp.PexMsg
1239                 err := bencode.Unmarshal(payload, &pexMsg)
1240                 if err != nil {
1241                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1242                 }
1243                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1244                 var peers Peers
1245                 peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
1246                 peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
1247                 t.addPeers(peers)
1248                 return nil
1249         default:
1250                 return fmt.Errorf("unexpected extended message ID: %v", id)
1251         }
1252 }
1253
1254 // Set both the Reader and Writer for the connection from a single ReadWriter.
1255 func (cn *connection) setRW(rw io.ReadWriter) {
1256         cn.r = rw
1257         cn.w = rw
1258 }
1259
1260 // Returns the Reader and Writer as a combined ReadWriter.
1261 func (cn *connection) rw() io.ReadWriter {
1262         return struct {
1263                 io.Reader
1264                 io.Writer
1265         }{cn.r, cn.w}
1266 }
1267
1268 // Handle a received chunk from a peer.
1269 func (c *connection) receiveChunk(msg *pp.Message) error {
1270         t := c.t
1271         cl := t.cl
1272         torrent.Add("chunks received", 1)
1273
1274         req := newRequestFromMessage(msg)
1275
1276         if c.PeerChoked {
1277                 torrent.Add("chunks received while choked", 1)
1278         }
1279
1280         if _, ok := c.validReceiveChunks[req]; !ok {
1281                 torrent.Add("chunks received unexpected", 1)
1282                 return errors.New("received unexpected chunk")
1283         }
1284         delete(c.validReceiveChunks, req)
1285
1286         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1287                 torrent.Add("chunks received due to allowed fast", 1)
1288         }
1289
1290         // Request has been satisfied.
1291         if c.deleteRequest(req) {
1292                 if c.expectingChunks() {
1293                         c.chunksReceivedWhileExpecting++
1294                 }
1295         } else {
1296                 torrent.Add("chunks received unwanted", 1)
1297         }
1298
1299         // Do we actually want this chunk?
1300         if t.haveChunk(req) {
1301                 torrent.Add("chunks received wasted", 1)
1302                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1303                 return nil
1304         }
1305
1306         piece := &t.pieces[req.Index]
1307
1308         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1309         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1310         c.lastUsefulChunkReceived = time.Now()
1311         // if t.fastestConn != c {
1312         // log.Printf("setting fastest connection %p", c)
1313         // }
1314         t.fastestConn = c
1315
1316         // Need to record that it hasn't been written yet, before we attempt to do
1317         // anything with it.
1318         piece.incrementPendingWrites()
1319         // Record that we have the chunk, so we aren't trying to download it while
1320         // waiting for it to be written to storage.
1321         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1322
1323         // Cancel pending requests for this chunk.
1324         for c := range t.conns {
1325                 c.postCancel(req)
1326         }
1327
1328         err := func() error {
1329                 cl.unlock()
1330                 defer cl.lock()
1331                 concurrentChunkWrites.Add(1)
1332                 defer concurrentChunkWrites.Add(-1)
1333                 // Write the chunk out. Note that the upper bound on chunk writing
1334                 // concurrency will be the number of connections. We write inline with
1335                 // receiving the chunk (with this lock dance), because we want to
1336                 // handle errors synchronously and I haven't thought of a nice way to
1337                 // defer any concurrency to the storage and have that notify the
1338                 // client of errors. TODO: Do that instead.
1339                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1340         }()
1341
1342         piece.decrementPendingWrites()
1343
1344         if err != nil {
1345                 panic(fmt.Sprintf("error writing chunk: %v", err))
1346                 t.pendRequest(req)
1347                 t.updatePieceCompletion(pieceIndex(msg.Index))
1348                 return nil
1349         }
1350
1351         // It's important that the piece is potentially queued before we check if
1352         // the piece is still wanted, because if it is queued, it won't be wanted.
1353         if t.pieceAllDirty(pieceIndex(req.Index)) {
1354                 t.queuePieceCheck(pieceIndex(req.Index))
1355                 t.pendAllChunkSpecs(pieceIndex(req.Index))
1356         }
1357
1358         c.onDirtiedPiece(pieceIndex(req.Index))
1359
1360         cl.event.Broadcast()
1361         t.publishPieceChange(pieceIndex(req.Index))
1362
1363         return nil
1364 }
1365
1366 func (c *connection) onDirtiedPiece(piece pieceIndex) {
1367         if c.peerTouchedPieces == nil {
1368                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1369         }
1370         c.peerTouchedPieces[piece] = struct{}{}
1371         ds := &c.t.pieces[piece].dirtiers
1372         if *ds == nil {
1373                 *ds = make(map[*connection]struct{})
1374         }
1375         (*ds)[c] = struct{}{}
1376 }
1377
1378 func (c *connection) uploadAllowed() bool {
1379         if c.t.cl.config.NoUpload {
1380                 return false
1381         }
1382         if c.t.seeding() {
1383                 return true
1384         }
1385         if !c.peerHasWantedPieces() {
1386                 return false
1387         }
1388         // Don't upload more than 100 KiB more than we download.
1389         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1390                 return false
1391         }
1392         return true
1393 }
1394
1395 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1396         if c.uploadTimer == nil {
1397                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1398         } else {
1399                 c.uploadTimer.Reset(delay)
1400         }
1401 }
1402
1403 // Also handles choking and unchoking of the remote peer.
1404 func (c *connection) upload(msg func(pp.Message) bool) bool {
1405         // Breaking or completing this loop means we don't want to upload to the
1406         // peer anymore, and we choke them.
1407 another:
1408         for c.uploadAllowed() {
1409                 // We want to upload to the peer.
1410                 if !c.Unchoke(msg) {
1411                         return false
1412                 }
1413                 for r := range c.PeerRequests {
1414                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1415                         if !res.OK() {
1416                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1417                         }
1418                         delay := res.Delay()
1419                         if delay > 0 {
1420                                 res.Cancel()
1421                                 c.setRetryUploadTimer(delay)
1422                                 // Hard to say what to return here.
1423                                 return true
1424                         }
1425                         more, err := c.sendChunk(r, msg)
1426                         if err != nil {
1427                                 i := pieceIndex(r.Index)
1428                                 if c.t.pieceComplete(i) {
1429                                         c.t.updatePieceCompletion(i)
1430                                         if !c.t.pieceComplete(i) {
1431                                                 // We had the piece, but not anymore.
1432                                                 break another
1433                                         }
1434                                 }
1435                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1436                                 // If we failed to send a chunk, choke the peer to ensure they
1437                                 // flush all their requests. We've probably dropped a piece,
1438                                 // but there's no way to communicate this to the peer. If they
1439                                 // ask for it again, we'll kick them to allow us to send them
1440                                 // an updated bitfield.
1441                                 break another
1442                         }
1443                         delete(c.PeerRequests, r)
1444                         if !more {
1445                                 return false
1446                         }
1447                         goto another
1448                 }
1449                 return true
1450         }
1451         return c.Choke(msg)
1452 }
1453
1454 func (cn *connection) Drop() {
1455         cn.t.dropConnection(cn)
1456 }
1457
1458 func (cn *connection) netGoodPiecesDirtied() int64 {
1459         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1460 }
1461
1462 func (c *connection) peerHasWantedPieces() bool {
1463         return !c.pieceRequestOrder.IsEmpty()
1464 }
1465
1466 func (c *connection) numLocalRequests() int {
1467         return len(c.requests)
1468 }
1469
1470 func (c *connection) deleteRequest(r request) bool {
1471         if _, ok := c.requests[r]; !ok {
1472                 return false
1473         }
1474         delete(c.requests, r)
1475         c.updateExpectingChunks()
1476         if t, ok := c.t.lastRequested[r]; ok {
1477                 t.Stop()
1478                 delete(c.t.lastRequested, r)
1479         }
1480         pr := c.t.pendingRequests
1481         pr[r]--
1482         n := pr[r]
1483         if n == 0 {
1484                 delete(pr, r)
1485         }
1486         if n < 0 {
1487                 panic(n)
1488         }
1489         c.updateRequests()
1490         for _c := range c.t.conns {
1491                 if !_c.Interested && _c != c && c.PeerHasPiece(pieceIndex(r.Index)) {
1492                         _c.updateRequests()
1493                 }
1494         }
1495         return true
1496 }
1497
1498 func (c *connection) deleteAllRequests() {
1499         for r := range c.requests {
1500                 c.deleteRequest(r)
1501         }
1502         if len(c.requests) != 0 {
1503                 panic(len(c.requests))
1504         }
1505         // for c := range c.t.conns {
1506         //      c.tickleWriter()
1507         // }
1508 }
1509
1510 func (c *connection) tickleWriter() {
1511         c.writerCond.Broadcast()
1512 }
1513
1514 func (c *connection) postCancel(r request) bool {
1515         if !c.deleteRequest(r) {
1516                 return false
1517         }
1518         c.Post(makeCancelMessage(r))
1519         return true
1520 }
1521
1522 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1523         // Count the chunk being sent, even if it isn't.
1524         b := make([]byte, r.Length)
1525         p := c.t.info.Piece(int(r.Index))
1526         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1527         if n != len(b) {
1528                 if err == nil {
1529                         panic("expected error")
1530                 }
1531                 return
1532         } else if err == io.EOF {
1533                 err = nil
1534         }
1535         more = msg(pp.Message{
1536                 Type:  pp.Piece,
1537                 Index: r.Index,
1538                 Begin: r.Begin,
1539                 Piece: b,
1540         })
1541         c.lastChunkSent = time.Now()
1542         return
1543 }
1544
1545 func (c *connection) setTorrent(t *Torrent) {
1546         if c.t != nil {
1547                 panic("connection already associated with a torrent")
1548         }
1549         c.t = t
1550         t.reconcileHandshakeStats(c)
1551 }
1552
1553 func (c *connection) peerPriority() peerPriority {
1554         return bep40PriorityIgnoreError(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1555 }
1556
1557 func (c *connection) remoteIp() net.IP {
1558         return c.remoteAddr.IP
1559 }
1560
1561 func (c *connection) remoteIpPort() IpPort {
1562         return c.remoteAddr
1563 }