]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
883d6c8ecb5da60714a7ef4ce41684ffa5913256
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "math"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/dht"
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22         "github.com/pkg/errors"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/mse"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27 )
28
29 type peerSource string
30
31 const (
32         peerSourceTracker         = "Tr"
33         peerSourceIncoming        = "I"
34         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
35         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
36         peerSourcePEX             = "X"
37 )
38
39 // Maintains the state of a connection with a peer.
40 type connection struct {
41         // First to ensure 64-bit alignment for atomics. See #262.
42         stats ConnStats
43
44         t *Torrent
45         // The actual Conn, used for closing, and setting socket options.
46         conn     net.Conn
47         outgoing bool
48         // The Reader and Writer for this Conn, with hooks installed for stats,
49         // limiting, deadlines etc.
50         w io.Writer
51         r io.Reader
52         // True if the connection is operating over MSE obfuscation.
53         headerEncrypted bool
54         cryptoMethod    mse.CryptoMethod
55         Discovery       peerSource
56         closed          missinggo.Event
57         // Set true after we've added our ConnStats generated during handshake to
58         // other ConnStat instances as determined when the *Torrent became known.
59         reconciledHandshakeStats bool
60
61         lastMessageReceived     time.Time
62         completedHandshake      time.Time
63         lastUsefulChunkReceived time.Time
64         lastChunkSent           time.Time
65
66         // Stuff controlled by the local peer.
67         Interested           bool
68         lastBecameInterested time.Time
69         priorInterest        time.Duration
70
71         lastStartedExpectingToReceiveChunks time.Time
72         cumulativeExpectedToReceiveChunks   time.Duration
73         chunksReceivedWhileExpecting        int64
74
75         Choked           bool
76         requests         map[request]struct{}
77         requestsLowWater int
78         // Chunks that we might reasonably expect to receive from the peer. Due to
79         // latency, buffering, and implementation differences, we may receive
80         // chunks that are no longer in the set of requests actually want.
81         validReceiveChunks map[request]struct{}
82         // Indexed by metadata piece, set to true if posted and pending a
83         // response.
84         metadataRequests []bool
85         sentHaves        bitmap.Bitmap
86
87         // Stuff controlled by the remote peer.
88         PeerID             PeerID
89         PeerInterested     bool
90         PeerChoked         bool
91         PeerRequests       map[request]struct{}
92         PeerExtensionBytes pp.PeerExtensionBits
93         // The pieces the peer has claimed to have.
94         peerPieces bitmap.Bitmap
95         // The peer has everything. This can occur due to a special message, when
96         // we may not even know the number of pieces in the torrent yet.
97         peerSentHaveAll bool
98         // The highest possible number of pieces the torrent could have based on
99         // communication with the peer. Generally only useful until we have the
100         // torrent info.
101         peerMinPieces pieceIndex
102         // Pieces we've accepted chunks for from the peer.
103         peerTouchedPieces map[pieceIndex]struct{}
104         peerAllowedFast   bitmap.Bitmap
105
106         PeerMaxRequests  int // Maximum pending requests the peer allows.
107         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
108         PeerClientName   string
109
110         pieceInclination  []int
111         pieceRequestOrder prioritybitmap.PriorityBitmap
112
113         writeBuffer *bytes.Buffer
114         uploadTimer *time.Timer
115         writerCond  sync.Cond
116 }
117
118 func (cn *connection) updateExpectingChunks() {
119         if cn.expectingChunks() {
120                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
121                         cn.lastStartedExpectingToReceiveChunks = time.Now()
122                 }
123         } else {
124                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
125                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
126                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
127                 }
128         }
129 }
130
131 func (cn *connection) expectingChunks() bool {
132         return cn.Interested && !cn.PeerChoked
133 }
134
135 // Returns true if the connection is over IPv6.
136 func (cn *connection) ipv6() bool {
137         ip := missinggo.AddrIP(cn.remoteAddr())
138         if ip.To4() != nil {
139                 return false
140         }
141         return len(ip) == net.IPv6len
142 }
143
144 // Returns true the dialer has the lower client peer ID. TODO: Find the
145 // specification for this.
146 func (cn *connection) isPreferredDirection() bool {
147         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
148 }
149
150 // Returns whether the left connection should be preferred over the right one,
151 // considering only their networking properties. If ok is false, we can't
152 // decide.
153 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
154         var ml multiLess
155         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
156         ml.NextBool(!l.utp(), !r.utp())
157         ml.NextBool(l.ipv6(), r.ipv6())
158         return ml.FinalOk()
159 }
160
161 func (cn *connection) cumInterest() time.Duration {
162         ret := cn.priorInterest
163         if cn.Interested {
164                 ret += time.Since(cn.lastBecameInterested)
165         }
166         return ret
167 }
168
169 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
170         if cn.peerSentHaveAll {
171                 return true, true
172         }
173         if !cn.t.haveInfo() {
174                 return false, false
175         }
176         return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
177 }
178
179 func (cn *connection) mu() sync.Locker {
180         return &cn.t.cl.mu
181 }
182
183 func (cn *connection) remoteAddr() net.Addr {
184         return cn.conn.RemoteAddr()
185 }
186
187 func (cn *connection) localAddr() net.Addr {
188         return cn.conn.LocalAddr()
189 }
190
191 func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
192         _, ok := cn.PeerExtensionIDs[ext]
193         return ok
194 }
195
196 // The best guess at number of pieces in the torrent for this peer.
197 func (cn *connection) bestPeerNumPieces() pieceIndex {
198         if cn.t.haveInfo() {
199                 return cn.t.numPieces()
200         }
201         return cn.peerMinPieces
202 }
203
204 func (cn *connection) completedString() string {
205         have := pieceIndex(cn.peerPieces.Len())
206         if cn.peerSentHaveAll {
207                 have = cn.bestPeerNumPieces()
208         }
209         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
210 }
211
212 // Correct the PeerPieces slice length. Return false if the existing slice is
213 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
214 // messages.
215 func (cn *connection) setNumPieces(num pieceIndex) error {
216         cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
217         cn.peerPiecesChanged()
218         return nil
219 }
220
221 func eventAgeString(t time.Time) string {
222         if t.IsZero() {
223                 return "never"
224         }
225         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
226 }
227
228 func (cn *connection) connectionFlags() (ret string) {
229         c := func(b byte) {
230                 ret += string([]byte{b})
231         }
232         if cn.cryptoMethod == mse.CryptoMethodRC4 {
233                 c('E')
234         } else if cn.headerEncrypted {
235                 c('e')
236         }
237         ret += string(cn.Discovery)
238         if cn.utp() {
239                 c('U')
240         }
241         return
242 }
243
244 func (cn *connection) utp() bool {
245         return isUtpNetwork(cn.remoteAddr().Network())
246 }
247
248 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
249 func (cn *connection) statusFlags() (ret string) {
250         c := func(b byte) {
251                 ret += string([]byte{b})
252         }
253         if cn.Interested {
254                 c('i')
255         }
256         if cn.Choked {
257                 c('c')
258         }
259         c('-')
260         ret += cn.connectionFlags()
261         c('-')
262         if cn.PeerInterested {
263                 c('i')
264         }
265         if cn.PeerChoked {
266                 c('c')
267         }
268         return
269 }
270
271 // func (cn *connection) String() string {
272 //      var buf bytes.Buffer
273 //      cn.WriteStatus(&buf, nil)
274 //      return buf.String()
275 // }
276
277 func (cn *connection) downloadRate() float64 {
278         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
279 }
280
281 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
282         // \t isn't preserved in <pre> blocks?
283         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
284         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
285                 eventAgeString(cn.lastMessageReceived),
286                 eventAgeString(cn.completedHandshake),
287                 eventAgeString(cn.lastHelpful()),
288                 cn.cumInterest(),
289                 cn.totalExpectingTime(),
290         )
291         fmt.Fprintf(w,
292                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
293                 cn.completedString(),
294                 len(cn.peerTouchedPieces),
295                 &cn.stats.ChunksReadUseful,
296                 &cn.stats.ChunksRead,
297                 &cn.stats.ChunksWritten,
298                 cn.requestsLowWater,
299                 cn.numLocalRequests(),
300                 cn.nominalMaxRequests(),
301                 len(cn.PeerRequests),
302                 cn.statusFlags(),
303                 cn.downloadRate()/(1<<10),
304         )
305         fmt.Fprintf(w, "    next pieces: %v%s\n",
306                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
307                 func() string {
308                         if cn.shouldRequestWithoutBias() {
309                                 return " (fastest)"
310                         } else {
311                                 return ""
312                         }
313                 }())
314 }
315
316 func (cn *connection) Close() {
317         if !cn.closed.Set() {
318                 return
319         }
320         cn.tickleWriter()
321         cn.discardPieceInclination()
322         cn.pieceRequestOrder.Clear()
323         if cn.conn != nil {
324                 go cn.conn.Close()
325         }
326 }
327
328 func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
329         return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
330 }
331
332 // Writes a message into the write buffer.
333 func (cn *connection) Post(msg pp.Message) {
334         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
335         // We don't need to track bytes here because a connection.w Writer wrapper
336         // takes care of that (although there's some delay between us recording
337         // the message, and the connection writer flushing it out.).
338         cn.writeBuffer.Write(msg.MustMarshalBinary())
339         // Last I checked only Piece messages affect stats, and we don't post
340         // those.
341         cn.wroteMsg(&msg)
342         cn.tickleWriter()
343 }
344
345 func (cn *connection) requestMetadataPiece(index int) {
346         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
347         if eID == 0 {
348                 return
349         }
350         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
351                 return
352         }
353         cn.Post(pp.Message{
354                 Type:       pp.Extended,
355                 ExtendedID: eID,
356                 ExtendedPayload: func() []byte {
357                         b, err := bencode.Marshal(map[string]int{
358                                 "msg_type": pp.RequestMetadataExtensionMsgType,
359                                 "piece":    index,
360                         })
361                         if err != nil {
362                                 panic(err)
363                         }
364                         return b
365                 }(),
366         })
367         for index >= len(cn.metadataRequests) {
368                 cn.metadataRequests = append(cn.metadataRequests, false)
369         }
370         cn.metadataRequests[index] = true
371 }
372
373 func (cn *connection) requestedMetadataPiece(index int) bool {
374         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
375 }
376
377 // The actual value to use as the maximum outbound requests.
378 func (cn *connection) nominalMaxRequests() (ret int) {
379         if cn.t.requestStrategy == 3 {
380                 expectingTime := int64(cn.totalExpectingTime())
381                 if expectingTime == 0 {
382                         expectingTime = math.MaxInt64
383                 } else {
384                         expectingTime *= 2
385                 }
386                 return int(clamp(
387                         1,
388                         int64(cn.PeerMaxRequests),
389                         max(
390                                 // It makes sense to always pipeline at least one connection,
391                                 // since latency must be non-zero.
392                                 2,
393                                 // Request only as many as we expect to receive in the
394                                 // dupliateRequestTimeout window. We are trying to avoid having to
395                                 // duplicate requests.
396                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
397                         ),
398                 ))
399         }
400         return int(clamp(
401                 1,
402                 int64(cn.PeerMaxRequests),
403                 max(64,
404                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
405 }
406
407 func (cn *connection) totalExpectingTime() (ret time.Duration) {
408         ret = cn.cumulativeExpectedToReceiveChunks
409         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
410                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
411         }
412         return
413
414 }
415
416 func (cn *connection) onPeerSentCancel(r request) {
417         if _, ok := cn.PeerRequests[r]; !ok {
418                 torrent.Add("unexpected cancels received", 1)
419                 return
420         }
421         if cn.fastEnabled() {
422                 cn.reject(r)
423         } else {
424                 delete(cn.PeerRequests, r)
425         }
426 }
427
428 func (cn *connection) Choke(msg messageWriter) (more bool) {
429         if cn.Choked {
430                 return true
431         }
432         cn.Choked = true
433         more = msg(pp.Message{
434                 Type: pp.Choke,
435         })
436         if cn.fastEnabled() {
437                 for r := range cn.PeerRequests {
438                         // TODO: Don't reject pieces in allowed fast set.
439                         cn.reject(r)
440                 }
441         } else {
442                 cn.PeerRequests = nil
443         }
444         return
445 }
446
447 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
448         if !cn.Choked {
449                 return true
450         }
451         cn.Choked = false
452         return msg(pp.Message{
453                 Type: pp.Unchoke,
454         })
455 }
456
457 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
458         if cn.Interested == interested {
459                 return true
460         }
461         cn.Interested = interested
462         if interested {
463                 cn.lastBecameInterested = time.Now()
464         } else if !cn.lastBecameInterested.IsZero() {
465                 cn.priorInterest += time.Since(cn.lastBecameInterested)
466         }
467         cn.updateExpectingChunks()
468         // log.Printf("%p: setting interest: %v", cn, interested)
469         return msg(pp.Message{
470                 Type: func() pp.MessageType {
471                         if interested {
472                                 return pp.Interested
473                         } else {
474                                 return pp.NotInterested
475                         }
476                 }(),
477         })
478 }
479
480 // The function takes a message to be sent, and returns true if more messages
481 // are okay.
482 type messageWriter func(pp.Message) bool
483
484 // Proxies the messageWriter's response.
485 func (cn *connection) request(r request, mw messageWriter) bool {
486         if _, ok := cn.requests[r]; ok {
487                 panic("chunk already requested")
488         }
489         if !cn.PeerHasPiece(pieceIndex(r.Index)) {
490                 panic("requesting piece peer doesn't have")
491         }
492         if _, ok := cn.t.conns[cn]; !ok {
493                 panic("requesting but not in active conns")
494         }
495         if cn.closed.IsSet() {
496                 panic("requesting when connection is closed")
497         }
498         if cn.PeerChoked {
499                 if cn.peerAllowedFast.Get(int(r.Index)) {
500                         torrent.Add("allowed fast requests sent", 1)
501                 } else {
502                         panic("requesting while choked and not allowed fast")
503                 }
504         }
505         if cn.t.hashingPiece(pieceIndex(r.Index)) {
506                 panic("piece is being hashed")
507         }
508         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
509                 panic("piece is queued for hash")
510         }
511         if cn.requests == nil {
512                 cn.requests = make(map[request]struct{})
513         }
514         cn.requests[r] = struct{}{}
515         if cn.validReceiveChunks == nil {
516                 cn.validReceiveChunks = make(map[request]struct{})
517         }
518         cn.validReceiveChunks[r] = struct{}{}
519         cn.t.pendingRequests[r]++
520         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
521                 torrent.Add("duplicate request timeouts", 1)
522                 cn.mu().Lock()
523                 defer cn.mu().Unlock()
524                 delete(cn.t.lastRequested, r)
525                 for cn := range cn.t.conns {
526                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
527                                 cn.updateRequests()
528                         }
529                 }
530         })
531         cn.updateExpectingChunks()
532         return mw(pp.Message{
533                 Type:   pp.Request,
534                 Index:  r.Index,
535                 Begin:  r.Begin,
536                 Length: r.Length,
537         })
538 }
539
540 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
541         if !cn.t.networkingEnabled {
542                 if !cn.SetInterested(false, msg) {
543                         return
544                 }
545                 if len(cn.requests) != 0 {
546                         for r := range cn.requests {
547                                 cn.deleteRequest(r)
548                                 // log.Printf("%p: cancelling request: %v", cn, r)
549                                 if !msg(makeCancelMessage(r)) {
550                                         return
551                                 }
552                         }
553                 }
554         }
555         if len(cn.requests) <= cn.requestsLowWater {
556                 filledBuffer := false
557                 cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
558                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
559                                 if !cn.SetInterested(true, msg) {
560                                         filledBuffer = true
561                                         return false
562                                 }
563                                 if len(cn.requests) >= cn.nominalMaxRequests() {
564                                         return false
565                                 }
566                                 // Choking is looked at here because our interest is dependent
567                                 // on whether we'd make requests in its absence.
568                                 if cn.PeerChoked {
569                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
570                                                 return false
571                                         }
572                                 }
573                                 if _, ok := cn.requests[r]; ok {
574                                         return true
575                                 }
576                                 filledBuffer = !cn.request(r, msg)
577                                 return !filledBuffer
578                         })
579                         return !filledBuffer
580                 })
581                 if filledBuffer {
582                         // If we didn't completely top up the requests, we shouldn't mark
583                         // the low water, since we'll want to top up the requests as soon
584                         // as we have more write buffer space.
585                         return
586                 }
587                 cn.requestsLowWater = len(cn.requests) / 2
588         }
589
590         cn.upload(msg)
591 }
592
593 // Routine that writes to the peer. Some of what to write is buffered by
594 // activity elsewhere in the Client, and some is determined locally when the
595 // connection is writable.
596 func (cn *connection) writer(keepAliveTimeout time.Duration) {
597         var (
598                 lastWrite      time.Time = time.Now()
599                 keepAliveTimer *time.Timer
600         )
601         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
602                 cn.mu().Lock()
603                 defer cn.mu().Unlock()
604                 if time.Since(lastWrite) >= keepAliveTimeout {
605                         cn.tickleWriter()
606                 }
607                 keepAliveTimer.Reset(keepAliveTimeout)
608         })
609         cn.mu().Lock()
610         defer cn.mu().Unlock()
611         defer cn.Close()
612         defer keepAliveTimer.Stop()
613         frontBuf := new(bytes.Buffer)
614         for {
615                 if cn.closed.IsSet() {
616                         return
617                 }
618                 if cn.writeBuffer.Len() == 0 {
619                         cn.fillWriteBuffer(func(msg pp.Message) bool {
620                                 cn.wroteMsg(&msg)
621                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
622                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
623                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
624                         })
625                 }
626                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
627                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
628                         postedKeepalives.Add(1)
629                 }
630                 if cn.writeBuffer.Len() == 0 {
631                         // TODO: Minimize wakeups....
632                         cn.writerCond.Wait()
633                         continue
634                 }
635                 // Flip the buffers.
636                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
637                 cn.mu().Unlock()
638                 n, err := cn.w.Write(frontBuf.Bytes())
639                 cn.mu().Lock()
640                 if n != 0 {
641                         lastWrite = time.Now()
642                         keepAliveTimer.Reset(keepAliveTimeout)
643                 }
644                 if err != nil {
645                         return
646                 }
647                 if n != frontBuf.Len() {
648                         panic("short write")
649                 }
650                 frontBuf.Reset()
651         }
652 }
653
654 func (cn *connection) Have(piece pieceIndex) {
655         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
656                 return
657         }
658         cn.Post(pp.Message{
659                 Type:  pp.Have,
660                 Index: pp.Integer(piece),
661         })
662         cn.sentHaves.Add(bitmap.BitIndex(piece))
663 }
664
665 func (cn *connection) PostBitfield() {
666         if cn.sentHaves.Len() != 0 {
667                 panic("bitfield must be first have-related message sent")
668         }
669         if !cn.t.haveAnyPieces() {
670                 return
671         }
672         cn.Post(pp.Message{
673                 Type:     pp.Bitfield,
674                 Bitfield: cn.t.bitfield(),
675         })
676         cn.sentHaves = cn.t.completedPieces.Copy()
677 }
678
679 func (cn *connection) updateRequests() {
680         // log.Print("update requests")
681         cn.tickleWriter()
682 }
683
684 // Emits the indices in the Bitmaps bms in order, never repeating any index.
685 // skip is mutated during execution, and its initial values will never be
686 // emitted.
687 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
688         return func(cb iter.Callback) {
689                 for _, bm := range bms {
690                         if !iter.All(func(i interface{}) bool {
691                                 skip.Add(i.(int))
692                                 return cb(i)
693                         }, bitmap.Sub(bm, *skip).Iter) {
694                                 return
695                         }
696                 }
697         }
698 }
699
700 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
701         now, readahead := cn.t.readerPiecePriorities()
702         var skip bitmap.Bitmap
703         if !cn.peerSentHaveAll {
704                 // Pieces to skip include pieces the peer doesn't have.
705                 skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
706         }
707         // And pieces that we already have.
708         skip.Union(cn.t.completedPieces)
709         skip.Union(cn.t.piecesQueuedForHash)
710         // Return an iterator over the different priority classes, minus the skip
711         // pieces.
712         return iter.All(
713                 func(_piece interface{}) bool {
714                         i := _piece.(bitmap.BitIndex)
715                         if cn.t.hashingPiece(pieceIndex(i)) {
716                                 return true
717                         }
718                         return f(pieceIndex(i))
719                 },
720                 iterBitmapsDistinct(&skip, now, readahead),
721                 func(cb iter.Callback) {
722                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
723                                 if skip.Contains(piece) {
724                                         return true
725                                 }
726                                 more := cb(piece)
727                                 skip.Add(piece)
728                                 return more
729                         })
730                 },
731         )
732 }
733
734 // The connection should download highest priority pieces first, without any
735 // inclination toward avoiding wastage. Generally we might do this if there's
736 // a single connection, or this is the fastest connection, and we have active
737 // readers that signal an ordering preference. It's conceivable that the best
738 // connection should do this, since it's least likely to waste our time if
739 // assigned to the highest priority pieces, and assigning more than one this
740 // role would cause significant wasted bandwidth.
741 func (cn *connection) shouldRequestWithoutBias() bool {
742         if cn.t.requestStrategy != 2 {
743                 return false
744         }
745         if len(cn.t.readers) == 0 {
746                 return false
747         }
748         if len(cn.t.conns) == 1 {
749                 return true
750         }
751         if cn == cn.t.fastestConn {
752                 return true
753         }
754         return false
755 }
756
757 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
758         if !cn.t.haveInfo() {
759                 return false
760         }
761         if cn.t.requestStrategy == 3 {
762                 return cn.iterUnbiasedPieceRequestOrder(f)
763         }
764         if cn.shouldRequestWithoutBias() {
765                 return cn.iterUnbiasedPieceRequestOrder(f)
766         } else {
767                 return cn.pieceRequestOrder.IterTyped(func(i int) bool {
768                         return f(pieceIndex(i))
769                 })
770         }
771 }
772
773 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
774         cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
775 }
776
777 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
778         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
779                 r := request{pp.Integer(piece), cs}
780                 if cn.t.requestStrategy == 3 {
781                         if _, ok := cn.t.lastRequested[r]; ok {
782                                 // This piece has been requested on another connection, and
783                                 // the duplicate request timer is still running.
784                                 return true
785                         }
786                 }
787                 return f(r)
788         })
789 }
790
791 func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
792         p := &t.pieces[piece]
793         if t.requestStrategy == 3 {
794                 for i := pp.Integer(0); i < p.numChunks(); i++ {
795                         if !p.dirtyChunks.Get(bitmap.BitIndex(i)) {
796                                 if !f(t.chunkIndexSpec(i, piece)) {
797                                         return false
798                                 }
799                         }
800                 }
801                 return true
802         }
803         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
804         // TODO: Use "math/rand".Shuffle >= Go 1.10
805         return iter.ForPerm(len(chunkIndices), func(i int) bool {
806                 return f(t.chunkIndexSpec(pp.Integer(chunkIndices[i]), piece))
807         })
808 }
809
810 // check callers updaterequests
811 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
812         return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
813 }
814
815 // This is distinct from Torrent piece priority, which is the user's
816 // preference. Connection piece priority is specific to a connection and is
817 // used to pseudorandomly avoid connections always requesting the same pieces
818 // and thus wasting effort.
819 func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
820         tpp := cn.t.piecePriority(piece)
821         if !cn.PeerHasPiece(piece) {
822                 tpp = PiecePriorityNone
823         }
824         if tpp == PiecePriorityNone {
825                 return cn.stopRequestingPiece(piece)
826         }
827         prio := cn.getPieceInclination()[piece]
828         switch cn.t.requestStrategy {
829         case 1:
830                 switch tpp {
831                 case PiecePriorityNormal:
832                 case PiecePriorityReadahead:
833                         prio -= int(cn.t.numPieces())
834                 case PiecePriorityNext, PiecePriorityNow:
835                         prio -= 2 * int(cn.t.numPieces())
836                 default:
837                         panic(tpp)
838                 }
839                 prio += int(piece / 3)
840         default:
841         }
842         return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
843 }
844
845 func (cn *connection) getPieceInclination() []int {
846         if cn.pieceInclination == nil {
847                 cn.pieceInclination = cn.t.getConnPieceInclination()
848         }
849         return cn.pieceInclination
850 }
851
852 func (cn *connection) discardPieceInclination() {
853         if cn.pieceInclination == nil {
854                 return
855         }
856         cn.t.putPieceInclination(cn.pieceInclination)
857         cn.pieceInclination = nil
858 }
859
860 func (cn *connection) peerPiecesChanged() {
861         if cn.t.haveInfo() {
862                 prioritiesChanged := false
863                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
864                         if cn.updatePiecePriority(i) {
865                                 prioritiesChanged = true
866                         }
867                 }
868                 if prioritiesChanged {
869                         cn.updateRequests()
870                 }
871         }
872 }
873
874 func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
875         if newMin > cn.peerMinPieces {
876                 cn.peerMinPieces = newMin
877         }
878 }
879
880 func (cn *connection) peerSentHave(piece pieceIndex) error {
881         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
882                 return errors.New("invalid piece")
883         }
884         if cn.PeerHasPiece(piece) {
885                 return nil
886         }
887         cn.raisePeerMinPieces(piece + 1)
888         cn.peerPieces.Set(bitmap.BitIndex(piece), true)
889         if cn.updatePiecePriority(piece) {
890                 cn.updateRequests()
891         }
892         return nil
893 }
894
895 func (cn *connection) peerSentBitfield(bf []bool) error {
896         cn.peerSentHaveAll = false
897         if len(bf)%8 != 0 {
898                 panic("expected bitfield length divisible by 8")
899         }
900         // We know that the last byte means that at most the last 7 bits are
901         // wasted.
902         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
903         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
904                 // Ignore known excess pieces.
905                 bf = bf[:cn.t.numPieces()]
906         }
907         for i, have := range bf {
908                 if have {
909                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
910                 }
911                 cn.peerPieces.Set(i, have)
912         }
913         cn.peerPiecesChanged()
914         return nil
915 }
916
917 func (cn *connection) onPeerSentHaveAll() error {
918         cn.peerSentHaveAll = true
919         cn.peerPieces.Clear()
920         cn.peerPiecesChanged()
921         return nil
922 }
923
924 func (cn *connection) peerSentHaveNone() error {
925         cn.peerPieces.Clear()
926         cn.peerSentHaveAll = false
927         cn.peerPiecesChanged()
928         return nil
929 }
930
931 func (c *connection) requestPendingMetadata() {
932         if c.t.haveInfo() {
933                 return
934         }
935         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
936                 // Peer doesn't support this.
937                 return
938         }
939         // Request metadata pieces that we don't have in a random order.
940         var pending []int
941         for index := 0; index < c.t.metadataPieceCount(); index++ {
942                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
943                         pending = append(pending, index)
944                 }
945         }
946         for _, i := range rand.Perm(len(pending)) {
947                 c.requestMetadataPiece(pending[i])
948         }
949 }
950
951 func (cn *connection) wroteMsg(msg *pp.Message) {
952         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
953         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
954 }
955
956 func (cn *connection) readMsg(msg *pp.Message) {
957         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
958 }
959
960 // After handshake, we know what Torrent and Client stats to include for a
961 // connection.
962 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
963         t := cn.t
964         f(&t.stats)
965         f(&t.cl.stats)
966 }
967
968 // All ConnStats that include this connection. Some objects are not known
969 // until the handshake is complete, after which it's expected to reconcile the
970 // differences.
971 func (cn *connection) allStats(f func(*ConnStats)) {
972         f(&cn.stats)
973         if cn.reconciledHandshakeStats {
974                 cn.postHandshakeStats(f)
975         }
976 }
977
978 func (cn *connection) wroteBytes(n int64) {
979         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
980 }
981
982 func (cn *connection) readBytes(n int64) {
983         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
984 }
985
986 // Returns whether the connection could be useful to us. We're seeding and
987 // they want data, we don't have metainfo and they can provide it, etc.
988 func (c *connection) useful() bool {
989         t := c.t
990         if c.closed.IsSet() {
991                 return false
992         }
993         if !t.haveInfo() {
994                 return c.supportsExtension("ut_metadata")
995         }
996         if t.seeding() && c.PeerInterested {
997                 return true
998         }
999         if c.peerHasWantedPieces() {
1000                 return true
1001         }
1002         return false
1003 }
1004
1005 func (c *connection) lastHelpful() (ret time.Time) {
1006         ret = c.lastUsefulChunkReceived
1007         if c.t.seeding() && c.lastChunkSent.After(ret) {
1008                 ret = c.lastChunkSent
1009         }
1010         return
1011 }
1012
1013 func (c *connection) fastEnabled() bool {
1014         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1015 }
1016
1017 func (c *connection) reject(r request) {
1018         if !c.fastEnabled() {
1019                 panic("fast not enabled")
1020         }
1021         c.Post(r.ToMsg(pp.Reject))
1022         delete(c.PeerRequests, r)
1023 }
1024
1025 func (c *connection) onReadRequest(r request) error {
1026         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1027         if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
1028                 torrent.Add("bad requests received", 1)
1029                 return errors.New("bad request")
1030         }
1031         if _, ok := c.PeerRequests[r]; ok {
1032                 torrent.Add("duplicate requests received", 1)
1033                 return nil
1034         }
1035         if c.Choked {
1036                 torrent.Add("requests received while choking", 1)
1037                 if c.fastEnabled() {
1038                         torrent.Add("requests rejected while choking", 1)
1039                         c.reject(r)
1040                 }
1041                 return nil
1042         }
1043         if len(c.PeerRequests) >= maxRequests {
1044                 torrent.Add("requests received while queue full", 1)
1045                 if c.fastEnabled() {
1046                         c.reject(r)
1047                 }
1048                 // BEP 6 says we may close here if we choose.
1049                 return nil
1050         }
1051         if !c.t.havePiece(pieceIndex(r.Index)) {
1052                 // This isn't necessarily them screwing up. We can drop pieces
1053                 // from our storage, and can't communicate this to peers
1054                 // except by reconnecting.
1055                 requestsReceivedForMissingPieces.Add(1)
1056                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1057         }
1058         if c.PeerRequests == nil {
1059                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1060         }
1061         c.PeerRequests[r] = struct{}{}
1062         c.tickleWriter()
1063         return nil
1064 }
1065
1066 // Processes incoming bittorrent messages. The client lock is held upon entry
1067 // and exit. Returning will end the connection.
1068 func (c *connection) mainReadLoop() (err error) {
1069         defer func() {
1070                 if err != nil {
1071                         torrent.Add("connection.mainReadLoop returned with error", 1)
1072                 } else {
1073                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1074                 }
1075         }()
1076         t := c.t
1077         cl := t.cl
1078
1079         decoder := pp.Decoder{
1080                 R:         bufio.NewReaderSize(c.r, 1<<17),
1081                 MaxLength: 256 * 1024,
1082                 Pool:      t.chunkPool,
1083         }
1084         for {
1085                 var msg pp.Message
1086                 func() {
1087                         cl.mu.Unlock()
1088                         defer cl.mu.Lock()
1089                         err = decoder.Decode(&msg)
1090                 }()
1091                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1092                         return nil
1093                 }
1094                 if err != nil {
1095                         return err
1096                 }
1097                 c.readMsg(&msg)
1098                 c.lastMessageReceived = time.Now()
1099                 if msg.Keepalive {
1100                         receivedKeepalives.Add(1)
1101                         continue
1102                 }
1103                 messageTypesReceived.Add(msg.Type.String(), 1)
1104                 if msg.Type.FastExtension() && !c.fastEnabled() {
1105                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1106                 }
1107                 switch msg.Type {
1108                 case pp.Choke:
1109                         c.PeerChoked = true
1110                         c.deleteAllRequests()
1111                         // We can then reset our interest.
1112                         c.updateRequests()
1113                         c.updateExpectingChunks()
1114                 case pp.Reject:
1115                         c.deleteRequest(newRequestFromMessage(&msg))
1116                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1117                 case pp.Unchoke:
1118                         c.PeerChoked = false
1119                         c.tickleWriter()
1120                         c.updateExpectingChunks()
1121                 case pp.Interested:
1122                         c.PeerInterested = true
1123                         c.tickleWriter()
1124                 case pp.NotInterested:
1125                         c.PeerInterested = false
1126                         // We don't clear their requests since it isn't clear in the spec.
1127                         // We'll probably choke them for this, which will clear them if
1128                         // appropriate, and is clearly specified.
1129                 case pp.Have:
1130                         err = c.peerSentHave(pieceIndex(msg.Index))
1131                 case pp.Request:
1132                         r := newRequestFromMessage(&msg)
1133                         err = c.onReadRequest(r)
1134                 case pp.Cancel:
1135                         req := newRequestFromMessage(&msg)
1136                         c.onPeerSentCancel(req)
1137                 case pp.Bitfield:
1138                         err = c.peerSentBitfield(msg.Bitfield)
1139                 case pp.HaveAll:
1140                         err = c.onPeerSentHaveAll()
1141                 case pp.HaveNone:
1142                         err = c.peerSentHaveNone()
1143                 case pp.Piece:
1144                         err = c.receiveChunk(&msg)
1145                         if len(msg.Piece) == int(t.chunkSize) {
1146                                 t.chunkPool.Put(&msg.Piece)
1147                         }
1148                         if err != nil {
1149                                 err = fmt.Errorf("receiving chunk: %s", err)
1150                         }
1151                 case pp.Extended:
1152                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1153                 case pp.Port:
1154                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1155                         if err != nil {
1156                                 panic(err)
1157                         }
1158                         if msg.Port != 0 {
1159                                 pingAddr.Port = int(msg.Port)
1160                         }
1161                         cl.eachDhtServer(func(s *dht.Server) {
1162                                 go s.Ping(pingAddr, nil)
1163                         })
1164                 case pp.AllowedFast:
1165                         torrent.Add("allowed fasts received", 1)
1166                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1167                         c.peerAllowedFast.Add(int(msg.Index))
1168                         c.updateRequests()
1169                 case pp.Suggest:
1170                         torrent.Add("suggests received", 1)
1171                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1172                         c.updateRequests()
1173                 default:
1174                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1175                 }
1176                 if err != nil {
1177                         return err
1178                 }
1179         }
1180 }
1181
1182 func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1183         defer func() {
1184                 // TODO: Should we still do this?
1185                 if err != nil {
1186                         // These clients use their own extension IDs for outgoing message
1187                         // types, which is incorrect.
1188                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1189                                 err = nil
1190                         }
1191                 }
1192         }()
1193         t := c.t
1194         cl := t.cl
1195         switch id {
1196         case pp.HandshakeExtendedID:
1197                 var d pp.ExtendedHandshakeMessage
1198                 if err := bencode.Unmarshal(payload, &d); err != nil {
1199                         log.Print(err)
1200                         return errors.Wrap(err, "unmarshalling extended handshake payload")
1201                 }
1202                 if d.Reqq != 0 {
1203                         c.PeerMaxRequests = d.Reqq
1204                 }
1205                 c.PeerClientName = d.V
1206                 if c.PeerExtensionIDs == nil {
1207                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1208                 }
1209                 for name, id := range d.M {
1210                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1211                                 torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
1212                         }
1213                         c.PeerExtensionIDs[name] = id
1214                 }
1215                 if d.MetadataSize != 0 {
1216                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1217                                 return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
1218                         }
1219                 }
1220                 if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok {
1221                         c.requestPendingMetadata()
1222                 }
1223                 return nil
1224         case metadataExtendedId:
1225                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1226                 if err != nil {
1227                         return fmt.Errorf("error handling metadata extension message: %s", err)
1228                 }
1229                 return nil
1230         case pexExtendedId:
1231                 if cl.config.DisablePEX {
1232                         // TODO: Maybe close the connection. Check that we're not
1233                         // advertising that we support PEX if it's disabled.
1234                         return nil
1235                 }
1236                 var pexMsg pp.PexMsg
1237                 err := bencode.Unmarshal(payload, &pexMsg)
1238                 if err != nil {
1239                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1240                 }
1241                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1242                 var peers Peers
1243                 peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
1244                 peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
1245                 t.addPeers(peers)
1246                 return nil
1247         default:
1248                 return fmt.Errorf("unexpected extended message ID: %v", id)
1249         }
1250 }
1251
1252 // Set both the Reader and Writer for the connection from a single ReadWriter.
1253 func (cn *connection) setRW(rw io.ReadWriter) {
1254         cn.r = rw
1255         cn.w = rw
1256 }
1257
1258 // Returns the Reader and Writer as a combined ReadWriter.
1259 func (cn *connection) rw() io.ReadWriter {
1260         return struct {
1261                 io.Reader
1262                 io.Writer
1263         }{cn.r, cn.w}
1264 }
1265
1266 // Handle a received chunk from a peer.
1267 func (c *connection) receiveChunk(msg *pp.Message) error {
1268         t := c.t
1269         cl := t.cl
1270         torrent.Add("chunks received", 1)
1271
1272         req := newRequestFromMessage(msg)
1273
1274         if c.PeerChoked {
1275                 torrent.Add("chunks received while choked", 1)
1276         }
1277
1278         if _, ok := c.validReceiveChunks[req]; !ok {
1279                 torrent.Add("chunks received unexpected", 1)
1280                 return errors.New("received unexpected chunk")
1281         }
1282         delete(c.validReceiveChunks, req)
1283
1284         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1285                 torrent.Add("chunks received due to allowed fast", 1)
1286         }
1287
1288         // Request has been satisfied.
1289         if c.deleteRequest(req) {
1290                 if c.expectingChunks() {
1291                         c.chunksReceivedWhileExpecting++
1292                 }
1293         } else {
1294                 torrent.Add("chunks received unwanted", 1)
1295         }
1296
1297         // Do we actually want this chunk?
1298         if t.haveChunk(req) {
1299                 torrent.Add("chunks received wasted", 1)
1300                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1301                 return nil
1302         }
1303
1304         piece := &t.pieces[req.Index]
1305
1306         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1307         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1308         c.lastUsefulChunkReceived = time.Now()
1309         // if t.fastestConn != c {
1310         // log.Printf("setting fastest connection %p", c)
1311         // }
1312         t.fastestConn = c
1313
1314         // Need to record that it hasn't been written yet, before we attempt to do
1315         // anything with it.
1316         piece.incrementPendingWrites()
1317         // Record that we have the chunk, so we aren't trying to download it while
1318         // waiting for it to be written to storage.
1319         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1320
1321         // Cancel pending requests for this chunk.
1322         for c := range t.conns {
1323                 c.postCancel(req)
1324         }
1325
1326         err := func() error {
1327                 cl.mu.Unlock()
1328                 defer cl.mu.Lock()
1329                 // Write the chunk out. Note that the upper bound on chunk writing
1330                 // concurrency will be the number of connections. We write inline with
1331                 // receiving the chunk (with this lock dance), because we want to
1332                 // handle errors synchronously and I haven't thought of a nice way to
1333                 // defer any concurrency to the storage and have that notify the
1334                 // client of errors. TODO: Do that instead.
1335                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1336         }()
1337
1338         piece.decrementPendingWrites()
1339
1340         if err != nil {
1341                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1342                 t.pendRequest(req)
1343                 t.updatePieceCompletion(pieceIndex(msg.Index))
1344                 return nil
1345         }
1346
1347         // It's important that the piece is potentially queued before we check if
1348         // the piece is still wanted, because if it is queued, it won't be wanted.
1349         if t.pieceAllDirty(pieceIndex(req.Index)) {
1350                 t.queuePieceCheck(pieceIndex(req.Index))
1351                 t.pendAllChunkSpecs(pieceIndex(req.Index))
1352         }
1353
1354         c.onDirtiedPiece(pieceIndex(req.Index))
1355
1356         cl.event.Broadcast()
1357         t.publishPieceChange(pieceIndex(req.Index))
1358
1359         return nil
1360 }
1361
1362 func (c *connection) onDirtiedPiece(piece pieceIndex) {
1363         if c.peerTouchedPieces == nil {
1364                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1365         }
1366         c.peerTouchedPieces[piece] = struct{}{}
1367         ds := &c.t.pieces[piece].dirtiers
1368         if *ds == nil {
1369                 *ds = make(map[*connection]struct{})
1370         }
1371         (*ds)[c] = struct{}{}
1372 }
1373
1374 func (c *connection) uploadAllowed() bool {
1375         if c.t.cl.config.NoUpload {
1376                 return false
1377         }
1378         if c.t.seeding() {
1379                 return true
1380         }
1381         if !c.peerHasWantedPieces() {
1382                 return false
1383         }
1384         // Don't upload more than 100 KiB more than we download.
1385         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1386                 return false
1387         }
1388         return true
1389 }
1390
1391 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1392         if c.uploadTimer == nil {
1393                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1394         } else {
1395                 c.uploadTimer.Reset(delay)
1396         }
1397 }
1398
1399 // Also handles choking and unchoking of the remote peer.
1400 func (c *connection) upload(msg func(pp.Message) bool) bool {
1401         // Breaking or completing this loop means we don't want to upload to the
1402         // peer anymore, and we choke them.
1403 another:
1404         for c.uploadAllowed() {
1405                 // We want to upload to the peer.
1406                 if !c.Unchoke(msg) {
1407                         return false
1408                 }
1409                 for r := range c.PeerRequests {
1410                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1411                         if !res.OK() {
1412                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1413                         }
1414                         delay := res.Delay()
1415                         if delay > 0 {
1416                                 res.Cancel()
1417                                 c.setRetryUploadTimer(delay)
1418                                 // Hard to say what to return here.
1419                                 return true
1420                         }
1421                         more, err := c.sendChunk(r, msg)
1422                         if err != nil {
1423                                 i := pieceIndex(r.Index)
1424                                 if c.t.pieceComplete(i) {
1425                                         c.t.updatePieceCompletion(i)
1426                                         if !c.t.pieceComplete(i) {
1427                                                 // We had the piece, but not anymore.
1428                                                 break another
1429                                         }
1430                                 }
1431                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1432                                 // If we failed to send a chunk, choke the peer to ensure they
1433                                 // flush all their requests. We've probably dropped a piece,
1434                                 // but there's no way to communicate this to the peer. If they
1435                                 // ask for it again, we'll kick them to allow us to send them
1436                                 // an updated bitfield.
1437                                 break another
1438                         }
1439                         delete(c.PeerRequests, r)
1440                         if !more {
1441                                 return false
1442                         }
1443                         goto another
1444                 }
1445                 return true
1446         }
1447         return c.Choke(msg)
1448 }
1449
1450 func (cn *connection) Drop() {
1451         cn.t.dropConnection(cn)
1452 }
1453
1454 func (cn *connection) netGoodPiecesDirtied() int64 {
1455         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1456 }
1457
1458 func (c *connection) peerHasWantedPieces() bool {
1459         return !c.pieceRequestOrder.IsEmpty()
1460 }
1461
1462 func (c *connection) numLocalRequests() int {
1463         return len(c.requests)
1464 }
1465
1466 func (c *connection) deleteRequest(r request) bool {
1467         if _, ok := c.requests[r]; !ok {
1468                 return false
1469         }
1470         delete(c.requests, r)
1471         c.updateExpectingChunks()
1472         if t, ok := c.t.lastRequested[r]; ok {
1473                 t.Stop()
1474                 delete(c.t.lastRequested, r)
1475         }
1476         pr := c.t.pendingRequests
1477         pr[r]--
1478         n := pr[r]
1479         if n == 0 {
1480                 delete(pr, r)
1481         }
1482         if n < 0 {
1483                 panic(n)
1484         }
1485         c.updateRequests()
1486         for _c := range c.t.conns {
1487                 if !_c.Interested && _c != c && c.PeerHasPiece(pieceIndex(r.Index)) {
1488                         _c.updateRequests()
1489                 }
1490         }
1491         return true
1492 }
1493
1494 func (c *connection) deleteAllRequests() {
1495         for r := range c.requests {
1496                 c.deleteRequest(r)
1497         }
1498         if len(c.requests) != 0 {
1499                 panic(len(c.requests))
1500         }
1501         // for c := range c.t.conns {
1502         //      c.tickleWriter()
1503         // }
1504 }
1505
1506 func (c *connection) tickleWriter() {
1507         c.writerCond.Broadcast()
1508 }
1509
1510 func (c *connection) postCancel(r request) bool {
1511         if !c.deleteRequest(r) {
1512                 return false
1513         }
1514         c.Post(makeCancelMessage(r))
1515         return true
1516 }
1517
1518 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1519         // Count the chunk being sent, even if it isn't.
1520         b := make([]byte, r.Length)
1521         p := c.t.info.Piece(int(r.Index))
1522         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1523         if n != len(b) {
1524                 if err == nil {
1525                         panic("expected error")
1526                 }
1527                 return
1528         } else if err == io.EOF {
1529                 err = nil
1530         }
1531         more = msg(pp.Message{
1532                 Type:  pp.Piece,
1533                 Index: r.Index,
1534                 Begin: r.Begin,
1535                 Piece: b,
1536         })
1537         c.lastChunkSent = time.Now()
1538         return
1539 }
1540
1541 func (c *connection) setTorrent(t *Torrent) {
1542         if c.t != nil {
1543                 panic("connection already associated with a torrent")
1544         }
1545         c.t = t
1546         t.reconcileHandshakeStats(c)
1547 }
1548
1549 func (c *connection) peerPriority() peerPriority {
1550         return bep40PriorityIgnoreError(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1551 }
1552
1553 func (c *connection) remoteIp() net.IP {
1554         return missinggo.AddrIP(c.remoteAddr())
1555 }
1556
1557 func (c *connection) remoteIpPort() ipPort {
1558         return ipPort{missinggo.AddrIP(c.remoteAddr()), uint16(missinggo.AddrPort(c.remoteAddr()))}
1559 }