]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Update all imports of dht to v2
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "math"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/dht/v2"
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22         "github.com/anacrolix/torrent/bencode"
23         "github.com/anacrolix/torrent/mse"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25         "github.com/pkg/errors"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "Tr"
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
34         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         // First to ensure 64-bit alignment for atomics. See #262.
41         stats ConnStats
42
43         t *Torrent
44         // The actual Conn, used for closing, and setting socket options.
45         conn       net.Conn
46         outgoing   bool
47         network    string
48         remoteAddr IpPort
49         // The Reader and Writer for this Conn, with hooks installed for stats,
50         // limiting, deadlines etc.
51         w io.Writer
52         r io.Reader
53         // True if the connection is operating over MSE obfuscation.
54         headerEncrypted bool
55         cryptoMethod    mse.CryptoMethod
56         Discovery       peerSource
57         closed          missinggo.Event
58         // Set true after we've added our ConnStats generated during handshake to
59         // other ConnStat instances as determined when the *Torrent became known.
60         reconciledHandshakeStats bool
61
62         lastMessageReceived     time.Time
63         completedHandshake      time.Time
64         lastUsefulChunkReceived time.Time
65         lastChunkSent           time.Time
66
67         // Stuff controlled by the local peer.
68         Interested           bool
69         lastBecameInterested time.Time
70         priorInterest        time.Duration
71
72         lastStartedExpectingToReceiveChunks time.Time
73         cumulativeExpectedToReceiveChunks   time.Duration
74         chunksReceivedWhileExpecting        int64
75
76         Choked           bool
77         requests         map[request]struct{}
78         requestsLowWater int
79         // Chunks that we might reasonably expect to receive from the peer. Due to
80         // latency, buffering, and implementation differences, we may receive
81         // chunks that are no longer in the set of requests actually want.
82         validReceiveChunks map[request]struct{}
83         // Indexed by metadata piece, set to true if posted and pending a
84         // response.
85         metadataRequests []bool
86         sentHaves        bitmap.Bitmap
87
88         // Stuff controlled by the remote peer.
89         PeerID             PeerID
90         PeerInterested     bool
91         PeerChoked         bool
92         PeerRequests       map[request]struct{}
93         PeerExtensionBytes pp.PeerExtensionBits
94         // The pieces the peer has claimed to have.
95         peerPieces bitmap.Bitmap
96         // The peer has everything. This can occur due to a special message, when
97         // we may not even know the number of pieces in the torrent yet.
98         peerSentHaveAll bool
99         // The highest possible number of pieces the torrent could have based on
100         // communication with the peer. Generally only useful until we have the
101         // torrent info.
102         peerMinPieces pieceIndex
103         // Pieces we've accepted chunks for from the peer.
104         peerTouchedPieces map[pieceIndex]struct{}
105         peerAllowedFast   bitmap.Bitmap
106
107         PeerMaxRequests  int // Maximum pending requests the peer allows.
108         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
109         PeerClientName   string
110
111         pieceInclination  []int
112         pieceRequestOrder prioritybitmap.PriorityBitmap
113
114         writeBuffer *bytes.Buffer
115         uploadTimer *time.Timer
116         writerCond  sync.Cond
117 }
118
119 func (cn *connection) updateExpectingChunks() {
120         if cn.expectingChunks() {
121                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
122                         cn.lastStartedExpectingToReceiveChunks = time.Now()
123                 }
124         } else {
125                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
126                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
127                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
128                 }
129         }
130 }
131
132 func (cn *connection) expectingChunks() bool {
133         return cn.Interested && !cn.PeerChoked
134 }
135
136 // Returns true if the connection is over IPv6.
137 func (cn *connection) ipv6() bool {
138         ip := cn.remoteAddr.IP
139         if ip.To4() != nil {
140                 return false
141         }
142         return len(ip) == net.IPv6len
143 }
144
145 // Returns true the dialer has the lower client peer ID. TODO: Find the
146 // specification for this.
147 func (cn *connection) isPreferredDirection() bool {
148         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
149 }
150
151 // Returns whether the left connection should be preferred over the right one,
152 // considering only their networking properties. If ok is false, we can't
153 // decide.
154 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
155         var ml multiLess
156         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
157         ml.NextBool(!l.utp(), !r.utp())
158         ml.NextBool(l.ipv6(), r.ipv6())
159         return ml.FinalOk()
160 }
161
162 func (cn *connection) cumInterest() time.Duration {
163         ret := cn.priorInterest
164         if cn.Interested {
165                 ret += time.Since(cn.lastBecameInterested)
166         }
167         return ret
168 }
169
170 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
171         if cn.peerSentHaveAll {
172                 return true, true
173         }
174         if !cn.t.haveInfo() {
175                 return false, false
176         }
177         return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
178 }
179
180 func (cn *connection) mu() sync.Locker {
181         return cn.t.cl.locker()
182 }
183
184 func (cn *connection) localAddr() net.Addr {
185         return cn.conn.LocalAddr()
186 }
187
188 func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
189         _, ok := cn.PeerExtensionIDs[ext]
190         return ok
191 }
192
193 // The best guess at number of pieces in the torrent for this peer.
194 func (cn *connection) bestPeerNumPieces() pieceIndex {
195         if cn.t.haveInfo() {
196                 return cn.t.numPieces()
197         }
198         return cn.peerMinPieces
199 }
200
201 func (cn *connection) completedString() string {
202         have := pieceIndex(cn.peerPieces.Len())
203         if cn.peerSentHaveAll {
204                 have = cn.bestPeerNumPieces()
205         }
206         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
207 }
208
209 // Correct the PeerPieces slice length. Return false if the existing slice is
210 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
211 // messages.
212 func (cn *connection) setNumPieces(num pieceIndex) error {
213         cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
214         cn.peerPiecesChanged()
215         return nil
216 }
217
218 func eventAgeString(t time.Time) string {
219         if t.IsZero() {
220                 return "never"
221         }
222         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
223 }
224
225 func (cn *connection) connectionFlags() (ret string) {
226         c := func(b byte) {
227                 ret += string([]byte{b})
228         }
229         if cn.cryptoMethod == mse.CryptoMethodRC4 {
230                 c('E')
231         } else if cn.headerEncrypted {
232                 c('e')
233         }
234         ret += string(cn.Discovery)
235         if cn.utp() {
236                 c('U')
237         }
238         return
239 }
240
241 func (cn *connection) utp() bool {
242         return parseNetworkString(cn.network).Udp
243 }
244
245 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
246 func (cn *connection) statusFlags() (ret string) {
247         c := func(b byte) {
248                 ret += string([]byte{b})
249         }
250         if cn.Interested {
251                 c('i')
252         }
253         if cn.Choked {
254                 c('c')
255         }
256         c('-')
257         ret += cn.connectionFlags()
258         c('-')
259         if cn.PeerInterested {
260                 c('i')
261         }
262         if cn.PeerChoked {
263                 c('c')
264         }
265         return
266 }
267
268 // func (cn *connection) String() string {
269 //      var buf bytes.Buffer
270 //      cn.WriteStatus(&buf, nil)
271 //      return buf.String()
272 // }
273
274 func (cn *connection) downloadRate() float64 {
275         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
276 }
277
278 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
279         // \t isn't preserved in <pre> blocks?
280         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
281         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
282                 eventAgeString(cn.lastMessageReceived),
283                 eventAgeString(cn.completedHandshake),
284                 eventAgeString(cn.lastHelpful()),
285                 cn.cumInterest(),
286                 cn.totalExpectingTime(),
287         )
288         fmt.Fprintf(w,
289                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
290                 cn.completedString(),
291                 len(cn.peerTouchedPieces),
292                 &cn.stats.ChunksReadUseful,
293                 &cn.stats.ChunksRead,
294                 &cn.stats.ChunksWritten,
295                 cn.requestsLowWater,
296                 cn.numLocalRequests(),
297                 cn.nominalMaxRequests(),
298                 len(cn.PeerRequests),
299                 cn.statusFlags(),
300                 cn.downloadRate()/(1<<10),
301         )
302         fmt.Fprintf(w, "    next pieces: %v%s\n",
303                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
304                 func() string {
305                         if cn.shouldRequestWithoutBias() {
306                                 return " (fastest)"
307                         } else {
308                                 return ""
309                         }
310                 }())
311 }
312
313 func (cn *connection) Close() {
314         if !cn.closed.Set() {
315                 return
316         }
317         cn.tickleWriter()
318         cn.discardPieceInclination()
319         cn.pieceRequestOrder.Clear()
320         if cn.conn != nil {
321                 go cn.conn.Close()
322         }
323 }
324
325 func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
326         return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
327 }
328
329 // Writes a message into the write buffer.
330 func (cn *connection) Post(msg pp.Message) {
331         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
332         // We don't need to track bytes here because a connection.w Writer wrapper
333         // takes care of that (although there's some delay between us recording
334         // the message, and the connection writer flushing it out.).
335         cn.writeBuffer.Write(msg.MustMarshalBinary())
336         // Last I checked only Piece messages affect stats, and we don't post
337         // those.
338         cn.wroteMsg(&msg)
339         cn.tickleWriter()
340 }
341
342 func (cn *connection) requestMetadataPiece(index int) {
343         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
344         if eID == 0 {
345                 return
346         }
347         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
348                 return
349         }
350         cn.Post(pp.Message{
351                 Type:       pp.Extended,
352                 ExtendedID: eID,
353                 ExtendedPayload: func() []byte {
354                         b, err := bencode.Marshal(map[string]int{
355                                 "msg_type": pp.RequestMetadataExtensionMsgType,
356                                 "piece":    index,
357                         })
358                         if err != nil {
359                                 panic(err)
360                         }
361                         return b
362                 }(),
363         })
364         for index >= len(cn.metadataRequests) {
365                 cn.metadataRequests = append(cn.metadataRequests, false)
366         }
367         cn.metadataRequests[index] = true
368 }
369
370 func (cn *connection) requestedMetadataPiece(index int) bool {
371         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
372 }
373
374 // The actual value to use as the maximum outbound requests.
375 func (cn *connection) nominalMaxRequests() (ret int) {
376         if cn.t.requestStrategy == 3 {
377                 expectingTime := int64(cn.totalExpectingTime())
378                 if expectingTime == 0 {
379                         expectingTime = math.MaxInt64
380                 } else {
381                         expectingTime *= 2
382                 }
383                 return int(clamp(
384                         1,
385                         int64(cn.PeerMaxRequests),
386                         max(
387                                 // It makes sense to always pipeline at least one connection,
388                                 // since latency must be non-zero.
389                                 2,
390                                 // Request only as many as we expect to receive in the
391                                 // dupliateRequestTimeout window. We are trying to avoid having to
392                                 // duplicate requests.
393                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
394                         ),
395                 ))
396         }
397         return int(clamp(
398                 1,
399                 int64(cn.PeerMaxRequests),
400                 max(64,
401                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
402 }
403
404 func (cn *connection) totalExpectingTime() (ret time.Duration) {
405         ret = cn.cumulativeExpectedToReceiveChunks
406         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
407                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
408         }
409         return
410
411 }
412
413 func (cn *connection) onPeerSentCancel(r request) {
414         if _, ok := cn.PeerRequests[r]; !ok {
415                 torrent.Add("unexpected cancels received", 1)
416                 return
417         }
418         if cn.fastEnabled() {
419                 cn.reject(r)
420         } else {
421                 delete(cn.PeerRequests, r)
422         }
423 }
424
425 func (cn *connection) Choke(msg messageWriter) (more bool) {
426         if cn.Choked {
427                 return true
428         }
429         cn.Choked = true
430         more = msg(pp.Message{
431                 Type: pp.Choke,
432         })
433         if cn.fastEnabled() {
434                 for r := range cn.PeerRequests {
435                         // TODO: Don't reject pieces in allowed fast set.
436                         cn.reject(r)
437                 }
438         } else {
439                 cn.PeerRequests = nil
440         }
441         return
442 }
443
444 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
445         if !cn.Choked {
446                 return true
447         }
448         cn.Choked = false
449         return msg(pp.Message{
450                 Type: pp.Unchoke,
451         })
452 }
453
454 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
455         if cn.Interested == interested {
456                 return true
457         }
458         cn.Interested = interested
459         if interested {
460                 cn.lastBecameInterested = time.Now()
461         } else if !cn.lastBecameInterested.IsZero() {
462                 cn.priorInterest += time.Since(cn.lastBecameInterested)
463         }
464         cn.updateExpectingChunks()
465         // log.Printf("%p: setting interest: %v", cn, interested)
466         return msg(pp.Message{
467                 Type: func() pp.MessageType {
468                         if interested {
469                                 return pp.Interested
470                         } else {
471                                 return pp.NotInterested
472                         }
473                 }(),
474         })
475 }
476
477 // The function takes a message to be sent, and returns true if more messages
478 // are okay.
479 type messageWriter func(pp.Message) bool
480
481 // Proxies the messageWriter's response.
482 func (cn *connection) request(r request, mw messageWriter) bool {
483         if _, ok := cn.requests[r]; ok {
484                 panic("chunk already requested")
485         }
486         if !cn.PeerHasPiece(pieceIndex(r.Index)) {
487                 panic("requesting piece peer doesn't have")
488         }
489         if _, ok := cn.t.conns[cn]; !ok {
490                 panic("requesting but not in active conns")
491         }
492         if cn.closed.IsSet() {
493                 panic("requesting when connection is closed")
494         }
495         if cn.PeerChoked {
496                 if cn.peerAllowedFast.Get(int(r.Index)) {
497                         torrent.Add("allowed fast requests sent", 1)
498                 } else {
499                         panic("requesting while choked and not allowed fast")
500                 }
501         }
502         if cn.t.hashingPiece(pieceIndex(r.Index)) {
503                 panic("piece is being hashed")
504         }
505         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
506                 panic("piece is queued for hash")
507         }
508         if cn.requests == nil {
509                 cn.requests = make(map[request]struct{})
510         }
511         cn.requests[r] = struct{}{}
512         if cn.validReceiveChunks == nil {
513                 cn.validReceiveChunks = make(map[request]struct{})
514         }
515         cn.validReceiveChunks[r] = struct{}{}
516         cn.t.pendingRequests[r]++
517         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
518                 torrent.Add("duplicate request timeouts", 1)
519                 cn.mu().Lock()
520                 defer cn.mu().Unlock()
521                 delete(cn.t.lastRequested, r)
522                 for cn := range cn.t.conns {
523                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
524                                 cn.updateRequests()
525                         }
526                 }
527         })
528         cn.updateExpectingChunks()
529         return mw(pp.Message{
530                 Type:   pp.Request,
531                 Index:  r.Index,
532                 Begin:  r.Begin,
533                 Length: r.Length,
534         })
535 }
536
537 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
538         if !cn.t.networkingEnabled {
539                 if !cn.SetInterested(false, msg) {
540                         return
541                 }
542                 if len(cn.requests) != 0 {
543                         for r := range cn.requests {
544                                 cn.deleteRequest(r)
545                                 // log.Printf("%p: cancelling request: %v", cn, r)
546                                 if !msg(makeCancelMessage(r)) {
547                                         return
548                                 }
549                         }
550                 }
551         }
552         if len(cn.requests) <= cn.requestsLowWater {
553                 filledBuffer := false
554                 cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
555                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
556                                 if !cn.SetInterested(true, msg) {
557                                         filledBuffer = true
558                                         return false
559                                 }
560                                 if len(cn.requests) >= cn.nominalMaxRequests() {
561                                         return false
562                                 }
563                                 // Choking is looked at here because our interest is dependent
564                                 // on whether we'd make requests in its absence.
565                                 if cn.PeerChoked {
566                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
567                                                 return false
568                                         }
569                                 }
570                                 if _, ok := cn.requests[r]; ok {
571                                         return true
572                                 }
573                                 filledBuffer = !cn.request(r, msg)
574                                 return !filledBuffer
575                         })
576                         return !filledBuffer
577                 })
578                 if filledBuffer {
579                         // If we didn't completely top up the requests, we shouldn't mark
580                         // the low water, since we'll want to top up the requests as soon
581                         // as we have more write buffer space.
582                         return
583                 }
584                 cn.requestsLowWater = len(cn.requests) / 2
585         }
586
587         cn.upload(msg)
588 }
589
590 // Routine that writes to the peer. Some of what to write is buffered by
591 // activity elsewhere in the Client, and some is determined locally when the
592 // connection is writable.
593 func (cn *connection) writer(keepAliveTimeout time.Duration) {
594         var (
595                 lastWrite      time.Time = time.Now()
596                 keepAliveTimer *time.Timer
597         )
598         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
599                 cn.mu().Lock()
600                 defer cn.mu().Unlock()
601                 if time.Since(lastWrite) >= keepAliveTimeout {
602                         cn.tickleWriter()
603                 }
604                 keepAliveTimer.Reset(keepAliveTimeout)
605         })
606         cn.mu().Lock()
607         defer cn.mu().Unlock()
608         defer cn.Close()
609         defer keepAliveTimer.Stop()
610         frontBuf := new(bytes.Buffer)
611         for {
612                 if cn.closed.IsSet() {
613                         return
614                 }
615                 if cn.writeBuffer.Len() == 0 {
616                         cn.fillWriteBuffer(func(msg pp.Message) bool {
617                                 cn.wroteMsg(&msg)
618                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
619                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
620                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
621                         })
622                 }
623                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
624                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
625                         postedKeepalives.Add(1)
626                 }
627                 if cn.writeBuffer.Len() == 0 {
628                         // TODO: Minimize wakeups....
629                         cn.writerCond.Wait()
630                         continue
631                 }
632                 // Flip the buffers.
633                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
634                 cn.mu().Unlock()
635                 n, err := cn.w.Write(frontBuf.Bytes())
636                 cn.mu().Lock()
637                 if n != 0 {
638                         lastWrite = time.Now()
639                         keepAliveTimer.Reset(keepAliveTimeout)
640                 }
641                 if err != nil {
642                         return
643                 }
644                 if n != frontBuf.Len() {
645                         panic("short write")
646                 }
647                 frontBuf.Reset()
648         }
649 }
650
651 func (cn *connection) Have(piece pieceIndex) {
652         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
653                 return
654         }
655         cn.Post(pp.Message{
656                 Type:  pp.Have,
657                 Index: pp.Integer(piece),
658         })
659         cn.sentHaves.Add(bitmap.BitIndex(piece))
660 }
661
662 func (cn *connection) PostBitfield() {
663         if cn.sentHaves.Len() != 0 {
664                 panic("bitfield must be first have-related message sent")
665         }
666         if !cn.t.haveAnyPieces() {
667                 return
668         }
669         cn.Post(pp.Message{
670                 Type:     pp.Bitfield,
671                 Bitfield: cn.t.bitfield(),
672         })
673         cn.sentHaves = cn.t.completedPieces.Copy()
674 }
675
676 func (cn *connection) updateRequests() {
677         // log.Print("update requests")
678         cn.tickleWriter()
679 }
680
681 // Emits the indices in the Bitmaps bms in order, never repeating any index.
682 // skip is mutated during execution, and its initial values will never be
683 // emitted.
684 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
685         return func(cb iter.Callback) {
686                 for _, bm := range bms {
687                         if !iter.All(func(i interface{}) bool {
688                                 skip.Add(i.(int))
689                                 return cb(i)
690                         }, bitmap.Sub(bm, *skip).Iter) {
691                                 return
692                         }
693                 }
694         }
695 }
696
697 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
698         now, readahead := cn.t.readerPiecePriorities()
699         var skip bitmap.Bitmap
700         if !cn.peerSentHaveAll {
701                 // Pieces to skip include pieces the peer doesn't have.
702                 skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
703         }
704         // And pieces that we already have.
705         skip.Union(cn.t.completedPieces)
706         skip.Union(cn.t.piecesQueuedForHash)
707         // Return an iterator over the different priority classes, minus the skip
708         // pieces.
709         return iter.All(
710                 func(_piece interface{}) bool {
711                         i := _piece.(bitmap.BitIndex)
712                         if cn.t.hashingPiece(pieceIndex(i)) {
713                                 return true
714                         }
715                         return f(pieceIndex(i))
716                 },
717                 iterBitmapsDistinct(&skip, now, readahead),
718                 func(cb iter.Callback) {
719                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
720                                 if skip.Contains(piece) {
721                                         return true
722                                 }
723                                 more := cb(piece)
724                                 skip.Add(piece)
725                                 return more
726                         })
727                 },
728         )
729 }
730
731 // The connection should download highest priority pieces first, without any
732 // inclination toward avoiding wastage. Generally we might do this if there's
733 // a single connection, or this is the fastest connection, and we have active
734 // readers that signal an ordering preference. It's conceivable that the best
735 // connection should do this, since it's least likely to waste our time if
736 // assigned to the highest priority pieces, and assigning more than one this
737 // role would cause significant wasted bandwidth.
738 func (cn *connection) shouldRequestWithoutBias() bool {
739         if cn.t.requestStrategy != 2 {
740                 return false
741         }
742         if len(cn.t.readers) == 0 {
743                 return false
744         }
745         if len(cn.t.conns) == 1 {
746                 return true
747         }
748         if cn == cn.t.fastestConn {
749                 return true
750         }
751         return false
752 }
753
754 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
755         if !cn.t.haveInfo() {
756                 return false
757         }
758         if cn.t.requestStrategy == 3 {
759                 return cn.iterUnbiasedPieceRequestOrder(f)
760         }
761         if cn.shouldRequestWithoutBias() {
762                 return cn.iterUnbiasedPieceRequestOrder(f)
763         } else {
764                 return cn.pieceRequestOrder.IterTyped(func(i int) bool {
765                         return f(pieceIndex(i))
766                 })
767         }
768 }
769
770 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
771         cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
772 }
773
774 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
775         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
776                 r := request{pp.Integer(piece), cs}
777                 if cn.t.requestStrategy == 3 {
778                         if _, ok := cn.t.lastRequested[r]; ok {
779                                 // This piece has been requested on another connection, and
780                                 // the duplicate request timer is still running.
781                                 return true
782                         }
783                 }
784                 return f(r)
785         })
786 }
787
788 func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
789         p := &t.pieces[piece]
790         if t.requestStrategy == 3 {
791                 for i := pp.Integer(0); i < p.numChunks(); i++ {
792                         if !p.dirtyChunks.Get(bitmap.BitIndex(i)) {
793                                 if !f(t.chunkIndexSpec(i, piece)) {
794                                         return false
795                                 }
796                         }
797                 }
798                 return true
799         }
800         chunkIndices := t.pieces[piece].undirtiedChunkIndices()
801         return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
802                 ci, err := chunkIndices.RB.Select(uint32(i))
803                 if err != nil {
804                         panic(err)
805                 }
806                 return f(t.chunkIndexSpec(pp.Integer(ci), piece))
807         })
808 }
809
810 // check callers updaterequests
811 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
812         return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
813 }
814
815 // This is distinct from Torrent piece priority, which is the user's
816 // preference. Connection piece priority is specific to a connection and is
817 // used to pseudorandomly avoid connections always requesting the same pieces
818 // and thus wasting effort.
819 func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
820         tpp := cn.t.piecePriority(piece)
821         if !cn.PeerHasPiece(piece) {
822                 tpp = PiecePriorityNone
823         }
824         if tpp == PiecePriorityNone {
825                 return cn.stopRequestingPiece(piece)
826         }
827         prio := cn.getPieceInclination()[piece]
828         switch cn.t.requestStrategy {
829         case 1:
830                 switch tpp {
831                 case PiecePriorityNormal:
832                 case PiecePriorityReadahead:
833                         prio -= int(cn.t.numPieces())
834                 case PiecePriorityNext, PiecePriorityNow:
835                         prio -= 2 * int(cn.t.numPieces())
836                 default:
837                         panic(tpp)
838                 }
839                 prio += int(piece / 3)
840         default:
841         }
842         return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
843 }
844
845 func (cn *connection) getPieceInclination() []int {
846         if cn.pieceInclination == nil {
847                 cn.pieceInclination = cn.t.getConnPieceInclination()
848         }
849         return cn.pieceInclination
850 }
851
852 func (cn *connection) discardPieceInclination() {
853         if cn.pieceInclination == nil {
854                 return
855         }
856         cn.t.putPieceInclination(cn.pieceInclination)
857         cn.pieceInclination = nil
858 }
859
860 func (cn *connection) peerPiecesChanged() {
861         if cn.t.haveInfo() {
862                 prioritiesChanged := false
863                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
864                         if cn.updatePiecePriority(i) {
865                                 prioritiesChanged = true
866                         }
867                 }
868                 if prioritiesChanged {
869                         cn.updateRequests()
870                 }
871         }
872 }
873
874 func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
875         if newMin > cn.peerMinPieces {
876                 cn.peerMinPieces = newMin
877         }
878 }
879
880 func (cn *connection) peerSentHave(piece pieceIndex) error {
881         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
882                 return errors.New("invalid piece")
883         }
884         if cn.PeerHasPiece(piece) {
885                 return nil
886         }
887         cn.raisePeerMinPieces(piece + 1)
888         cn.peerPieces.Set(bitmap.BitIndex(piece), true)
889         if cn.updatePiecePriority(piece) {
890                 cn.updateRequests()
891         }
892         return nil
893 }
894
895 func (cn *connection) peerSentBitfield(bf []bool) error {
896         cn.peerSentHaveAll = false
897         if len(bf)%8 != 0 {
898                 panic("expected bitfield length divisible by 8")
899         }
900         // We know that the last byte means that at most the last 7 bits are
901         // wasted.
902         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
903         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
904                 // Ignore known excess pieces.
905                 bf = bf[:cn.t.numPieces()]
906         }
907         for i, have := range bf {
908                 if have {
909                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
910                 }
911                 cn.peerPieces.Set(i, have)
912         }
913         cn.peerPiecesChanged()
914         return nil
915 }
916
917 func (cn *connection) onPeerSentHaveAll() error {
918         cn.peerSentHaveAll = true
919         cn.peerPieces.Clear()
920         cn.peerPiecesChanged()
921         return nil
922 }
923
924 func (cn *connection) peerSentHaveNone() error {
925         cn.peerPieces.Clear()
926         cn.peerSentHaveAll = false
927         cn.peerPiecesChanged()
928         return nil
929 }
930
931 func (c *connection) requestPendingMetadata() {
932         if c.t.haveInfo() {
933                 return
934         }
935         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
936                 // Peer doesn't support this.
937                 return
938         }
939         // Request metadata pieces that we don't have in a random order.
940         var pending []int
941         for index := 0; index < c.t.metadataPieceCount(); index++ {
942                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
943                         pending = append(pending, index)
944                 }
945         }
946         for _, i := range rand.Perm(len(pending)) {
947                 c.requestMetadataPiece(pending[i])
948         }
949 }
950
951 func (cn *connection) wroteMsg(msg *pp.Message) {
952         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
953         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
954 }
955
956 func (cn *connection) readMsg(msg *pp.Message) {
957         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
958 }
959
960 // After handshake, we know what Torrent and Client stats to include for a
961 // connection.
962 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
963         t := cn.t
964         f(&t.stats)
965         f(&t.cl.stats)
966 }
967
968 // All ConnStats that include this connection. Some objects are not known
969 // until the handshake is complete, after which it's expected to reconcile the
970 // differences.
971 func (cn *connection) allStats(f func(*ConnStats)) {
972         f(&cn.stats)
973         if cn.reconciledHandshakeStats {
974                 cn.postHandshakeStats(f)
975         }
976 }
977
978 func (cn *connection) wroteBytes(n int64) {
979         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
980 }
981
982 func (cn *connection) readBytes(n int64) {
983         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
984 }
985
986 // Returns whether the connection could be useful to us. We're seeding and
987 // they want data, we don't have metainfo and they can provide it, etc.
988 func (c *connection) useful() bool {
989         t := c.t
990         if c.closed.IsSet() {
991                 return false
992         }
993         if !t.haveInfo() {
994                 return c.supportsExtension("ut_metadata")
995         }
996         if t.seeding() && c.PeerInterested {
997                 return true
998         }
999         if c.peerHasWantedPieces() {
1000                 return true
1001         }
1002         return false
1003 }
1004
1005 func (c *connection) lastHelpful() (ret time.Time) {
1006         ret = c.lastUsefulChunkReceived
1007         if c.t.seeding() && c.lastChunkSent.After(ret) {
1008                 ret = c.lastChunkSent
1009         }
1010         return
1011 }
1012
1013 func (c *connection) fastEnabled() bool {
1014         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1015 }
1016
1017 func (c *connection) reject(r request) {
1018         if !c.fastEnabled() {
1019                 panic("fast not enabled")
1020         }
1021         c.Post(r.ToMsg(pp.Reject))
1022         delete(c.PeerRequests, r)
1023 }
1024
1025 func (c *connection) onReadRequest(r request) error {
1026         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1027         if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
1028                 torrent.Add("bad requests received", 1)
1029                 return errors.New("bad request")
1030         }
1031         if _, ok := c.PeerRequests[r]; ok {
1032                 torrent.Add("duplicate requests received", 1)
1033                 return nil
1034         }
1035         if c.Choked {
1036                 torrent.Add("requests received while choking", 1)
1037                 if c.fastEnabled() {
1038                         torrent.Add("requests rejected while choking", 1)
1039                         c.reject(r)
1040                 }
1041                 return nil
1042         }
1043         if len(c.PeerRequests) >= maxRequests {
1044                 torrent.Add("requests received while queue full", 1)
1045                 if c.fastEnabled() {
1046                         c.reject(r)
1047                 }
1048                 // BEP 6 says we may close here if we choose.
1049                 return nil
1050         }
1051         if !c.t.havePiece(pieceIndex(r.Index)) {
1052                 // This isn't necessarily them screwing up. We can drop pieces
1053                 // from our storage, and can't communicate this to peers
1054                 // except by reconnecting.
1055                 requestsReceivedForMissingPieces.Add(1)
1056                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1057         }
1058         if c.PeerRequests == nil {
1059                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1060         }
1061         c.PeerRequests[r] = struct{}{}
1062         c.tickleWriter()
1063         return nil
1064 }
1065
1066 // Processes incoming bittorrent messages. The client lock is held upon entry
1067 // and exit. Returning will end the connection.
1068 func (c *connection) mainReadLoop() (err error) {
1069         defer func() {
1070                 if err != nil {
1071                         torrent.Add("connection.mainReadLoop returned with error", 1)
1072                 } else {
1073                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1074                 }
1075         }()
1076         t := c.t
1077         cl := t.cl
1078
1079         decoder := pp.Decoder{
1080                 R:         bufio.NewReaderSize(c.r, 1<<17),
1081                 MaxLength: 256 * 1024,
1082                 Pool:      t.chunkPool,
1083         }
1084         for {
1085                 var msg pp.Message
1086                 func() {
1087                         cl.unlock()
1088                         defer cl.lock()
1089                         err = decoder.Decode(&msg)
1090                 }()
1091                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1092                         return nil
1093                 }
1094                 if err != nil {
1095                         return err
1096                 }
1097                 c.readMsg(&msg)
1098                 c.lastMessageReceived = time.Now()
1099                 if msg.Keepalive {
1100                         receivedKeepalives.Add(1)
1101                         continue
1102                 }
1103                 messageTypesReceived.Add(msg.Type.String(), 1)
1104                 if msg.Type.FastExtension() && !c.fastEnabled() {
1105                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1106                 }
1107                 switch msg.Type {
1108                 case pp.Choke:
1109                         c.PeerChoked = true
1110                         c.deleteAllRequests()
1111                         // We can then reset our interest.
1112                         c.updateRequests()
1113                         c.updateExpectingChunks()
1114                 case pp.Reject:
1115                         c.deleteRequest(newRequestFromMessage(&msg))
1116                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1117                 case pp.Unchoke:
1118                         c.PeerChoked = false
1119                         c.tickleWriter()
1120                         c.updateExpectingChunks()
1121                 case pp.Interested:
1122                         c.PeerInterested = true
1123                         c.tickleWriter()
1124                 case pp.NotInterested:
1125                         c.PeerInterested = false
1126                         // We don't clear their requests since it isn't clear in the spec.
1127                         // We'll probably choke them for this, which will clear them if
1128                         // appropriate, and is clearly specified.
1129                 case pp.Have:
1130                         err = c.peerSentHave(pieceIndex(msg.Index))
1131                 case pp.Request:
1132                         r := newRequestFromMessage(&msg)
1133                         err = c.onReadRequest(r)
1134                 case pp.Cancel:
1135                         req := newRequestFromMessage(&msg)
1136                         c.onPeerSentCancel(req)
1137                 case pp.Bitfield:
1138                         err = c.peerSentBitfield(msg.Bitfield)
1139                 case pp.HaveAll:
1140                         err = c.onPeerSentHaveAll()
1141                 case pp.HaveNone:
1142                         err = c.peerSentHaveNone()
1143                 case pp.Piece:
1144                         err = c.receiveChunk(&msg)
1145                         if len(msg.Piece) == int(t.chunkSize) {
1146                                 t.chunkPool.Put(&msg.Piece)
1147                         }
1148                         if err != nil {
1149                                 err = fmt.Errorf("receiving chunk: %s", err)
1150                         }
1151                 case pp.Extended:
1152                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1153                 case pp.Port:
1154                         pingAddr := net.UDPAddr{
1155                                 IP:   c.remoteAddr.IP,
1156                                 Port: int(c.remoteAddr.Port),
1157                         }
1158                         if msg.Port != 0 {
1159                                 pingAddr.Port = int(msg.Port)
1160                         }
1161                         cl.eachDhtServer(func(s *dht.Server) {
1162                                 go s.Ping(&pingAddr, nil)
1163                         })
1164                 case pp.AllowedFast:
1165                         torrent.Add("allowed fasts received", 1)
1166                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1167                         c.peerAllowedFast.Add(int(msg.Index))
1168                         c.updateRequests()
1169                 case pp.Suggest:
1170                         torrent.Add("suggests received", 1)
1171                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1172                         c.updateRequests()
1173                 default:
1174                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1175                 }
1176                 if err != nil {
1177                         return err
1178                 }
1179         }
1180 }
1181
1182 func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1183         defer func() {
1184                 // TODO: Should we still do this?
1185                 if err != nil {
1186                         // These clients use their own extension IDs for outgoing message
1187                         // types, which is incorrect.
1188                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1189                                 err = nil
1190                         }
1191                 }
1192         }()
1193         t := c.t
1194         cl := t.cl
1195         switch id {
1196         case pp.HandshakeExtendedID:
1197                 var d pp.ExtendedHandshakeMessage
1198                 if err := bencode.Unmarshal(payload, &d); err != nil {
1199                         c.t.logger.Printf("error parsing extended handshake message %q: %s", payload, err)
1200                         return errors.Wrap(err, "unmarshalling extended handshake payload")
1201                 }
1202                 if d.Reqq != 0 {
1203                         c.PeerMaxRequests = d.Reqq
1204                 }
1205                 c.PeerClientName = d.V
1206                 if c.PeerExtensionIDs == nil {
1207                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1208                 }
1209                 for name, id := range d.M {
1210                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1211                                 torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
1212                         }
1213                         c.PeerExtensionIDs[name] = id
1214                 }
1215                 if d.MetadataSize != 0 {
1216                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1217                                 return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
1218                         }
1219                 }
1220                 if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok {
1221                         c.requestPendingMetadata()
1222                 }
1223                 return nil
1224         case metadataExtendedId:
1225                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1226                 if err != nil {
1227                         return fmt.Errorf("error handling metadata extension message: %s", err)
1228                 }
1229                 return nil
1230         case pexExtendedId:
1231                 if cl.config.DisablePEX {
1232                         // TODO: Maybe close the connection. Check that we're not
1233                         // advertising that we support PEX if it's disabled.
1234                         return nil
1235                 }
1236                 var pexMsg pp.PexMsg
1237                 err := bencode.Unmarshal(payload, &pexMsg)
1238                 if err != nil {
1239                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1240                 }
1241                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1242                 var peers Peers
1243                 peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
1244                 peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
1245                 t.addPeers(peers)
1246                 return nil
1247         default:
1248                 return fmt.Errorf("unexpected extended message ID: %v", id)
1249         }
1250 }
1251
1252 // Set both the Reader and Writer for the connection from a single ReadWriter.
1253 func (cn *connection) setRW(rw io.ReadWriter) {
1254         cn.r = rw
1255         cn.w = rw
1256 }
1257
1258 // Returns the Reader and Writer as a combined ReadWriter.
1259 func (cn *connection) rw() io.ReadWriter {
1260         return struct {
1261                 io.Reader
1262                 io.Writer
1263         }{cn.r, cn.w}
1264 }
1265
1266 // Handle a received chunk from a peer.
1267 func (c *connection) receiveChunk(msg *pp.Message) error {
1268         t := c.t
1269         cl := t.cl
1270         torrent.Add("chunks received", 1)
1271
1272         req := newRequestFromMessage(msg)
1273
1274         if c.PeerChoked {
1275                 torrent.Add("chunks received while choked", 1)
1276         }
1277
1278         if _, ok := c.validReceiveChunks[req]; !ok {
1279                 torrent.Add("chunks received unexpected", 1)
1280                 return errors.New("received unexpected chunk")
1281         }
1282         delete(c.validReceiveChunks, req)
1283
1284         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1285                 torrent.Add("chunks received due to allowed fast", 1)
1286         }
1287
1288         // Request has been satisfied.
1289         if c.deleteRequest(req) {
1290                 if c.expectingChunks() {
1291                         c.chunksReceivedWhileExpecting++
1292                 }
1293         } else {
1294                 torrent.Add("chunks received unwanted", 1)
1295         }
1296
1297         // Do we actually want this chunk?
1298         if t.haveChunk(req) {
1299                 torrent.Add("chunks received wasted", 1)
1300                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1301                 return nil
1302         }
1303
1304         piece := &t.pieces[req.Index]
1305
1306         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1307         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1308         c.lastUsefulChunkReceived = time.Now()
1309         // if t.fastestConn != c {
1310         // log.Printf("setting fastest connection %p", c)
1311         // }
1312         t.fastestConn = c
1313
1314         // Need to record that it hasn't been written yet, before we attempt to do
1315         // anything with it.
1316         piece.incrementPendingWrites()
1317         // Record that we have the chunk, so we aren't trying to download it while
1318         // waiting for it to be written to storage.
1319         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1320
1321         // Cancel pending requests for this chunk.
1322         for c := range t.conns {
1323                 c.postCancel(req)
1324         }
1325
1326         err := func() error {
1327                 cl.unlock()
1328                 defer cl.lock()
1329                 concurrentChunkWrites.Add(1)
1330                 defer concurrentChunkWrites.Add(-1)
1331                 // Write the chunk out. Note that the upper bound on chunk writing
1332                 // concurrency will be the number of connections. We write inline with
1333                 // receiving the chunk (with this lock dance), because we want to
1334                 // handle errors synchronously and I haven't thought of a nice way to
1335                 // defer any concurrency to the storage and have that notify the
1336                 // client of errors. TODO: Do that instead.
1337                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1338         }()
1339
1340         piece.decrementPendingWrites()
1341
1342         if err != nil {
1343                 panic(fmt.Sprintf("error writing chunk: %v", err))
1344                 t.pendRequest(req)
1345                 t.updatePieceCompletion(pieceIndex(msg.Index))
1346                 return nil
1347         }
1348
1349         // It's important that the piece is potentially queued before we check if
1350         // the piece is still wanted, because if it is queued, it won't be wanted.
1351         if t.pieceAllDirty(pieceIndex(req.Index)) {
1352                 t.queuePieceCheck(pieceIndex(req.Index))
1353                 t.pendAllChunkSpecs(pieceIndex(req.Index))
1354         }
1355
1356         c.onDirtiedPiece(pieceIndex(req.Index))
1357
1358         cl.event.Broadcast()
1359         t.publishPieceChange(pieceIndex(req.Index))
1360
1361         return nil
1362 }
1363
1364 func (c *connection) onDirtiedPiece(piece pieceIndex) {
1365         if c.peerTouchedPieces == nil {
1366                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1367         }
1368         c.peerTouchedPieces[piece] = struct{}{}
1369         ds := &c.t.pieces[piece].dirtiers
1370         if *ds == nil {
1371                 *ds = make(map[*connection]struct{})
1372         }
1373         (*ds)[c] = struct{}{}
1374 }
1375
1376 func (c *connection) uploadAllowed() bool {
1377         if c.t.cl.config.NoUpload {
1378                 return false
1379         }
1380         if c.t.seeding() {
1381                 return true
1382         }
1383         if !c.peerHasWantedPieces() {
1384                 return false
1385         }
1386         // Don't upload more than 100 KiB more than we download.
1387         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1388                 return false
1389         }
1390         return true
1391 }
1392
1393 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1394         if c.uploadTimer == nil {
1395                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1396         } else {
1397                 c.uploadTimer.Reset(delay)
1398         }
1399 }
1400
1401 // Also handles choking and unchoking of the remote peer.
1402 func (c *connection) upload(msg func(pp.Message) bool) bool {
1403         // Breaking or completing this loop means we don't want to upload to the
1404         // peer anymore, and we choke them.
1405 another:
1406         for c.uploadAllowed() {
1407                 // We want to upload to the peer.
1408                 if !c.Unchoke(msg) {
1409                         return false
1410                 }
1411                 for r := range c.PeerRequests {
1412                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1413                         if !res.OK() {
1414                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1415                         }
1416                         delay := res.Delay()
1417                         if delay > 0 {
1418                                 res.Cancel()
1419                                 c.setRetryUploadTimer(delay)
1420                                 // Hard to say what to return here.
1421                                 return true
1422                         }
1423                         more, err := c.sendChunk(r, msg)
1424                         if err != nil {
1425                                 i := pieceIndex(r.Index)
1426                                 if c.t.pieceComplete(i) {
1427                                         c.t.updatePieceCompletion(i)
1428                                         if !c.t.pieceComplete(i) {
1429                                                 // We had the piece, but not anymore.
1430                                                 break another
1431                                         }
1432                                 }
1433                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1434                                 // If we failed to send a chunk, choke the peer to ensure they
1435                                 // flush all their requests. We've probably dropped a piece,
1436                                 // but there's no way to communicate this to the peer. If they
1437                                 // ask for it again, we'll kick them to allow us to send them
1438                                 // an updated bitfield.
1439                                 break another
1440                         }
1441                         delete(c.PeerRequests, r)
1442                         if !more {
1443                                 return false
1444                         }
1445                         goto another
1446                 }
1447                 return true
1448         }
1449         return c.Choke(msg)
1450 }
1451
1452 func (cn *connection) Drop() {
1453         cn.t.dropConnection(cn)
1454 }
1455
1456 func (cn *connection) netGoodPiecesDirtied() int64 {
1457         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1458 }
1459
1460 func (c *connection) peerHasWantedPieces() bool {
1461         return !c.pieceRequestOrder.IsEmpty()
1462 }
1463
1464 func (c *connection) numLocalRequests() int {
1465         return len(c.requests)
1466 }
1467
1468 func (c *connection) deleteRequest(r request) bool {
1469         if _, ok := c.requests[r]; !ok {
1470                 return false
1471         }
1472         delete(c.requests, r)
1473         c.updateExpectingChunks()
1474         if t, ok := c.t.lastRequested[r]; ok {
1475                 t.Stop()
1476                 delete(c.t.lastRequested, r)
1477         }
1478         pr := c.t.pendingRequests
1479         pr[r]--
1480         n := pr[r]
1481         if n == 0 {
1482                 delete(pr, r)
1483         }
1484         if n < 0 {
1485                 panic(n)
1486         }
1487         c.updateRequests()
1488         for _c := range c.t.conns {
1489                 if !_c.Interested && _c != c && c.PeerHasPiece(pieceIndex(r.Index)) {
1490                         _c.updateRequests()
1491                 }
1492         }
1493         return true
1494 }
1495
1496 func (c *connection) deleteAllRequests() {
1497         for r := range c.requests {
1498                 c.deleteRequest(r)
1499         }
1500         if len(c.requests) != 0 {
1501                 panic(len(c.requests))
1502         }
1503         // for c := range c.t.conns {
1504         //      c.tickleWriter()
1505         // }
1506 }
1507
1508 func (c *connection) tickleWriter() {
1509         c.writerCond.Broadcast()
1510 }
1511
1512 func (c *connection) postCancel(r request) bool {
1513         if !c.deleteRequest(r) {
1514                 return false
1515         }
1516         c.Post(makeCancelMessage(r))
1517         return true
1518 }
1519
1520 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1521         // Count the chunk being sent, even if it isn't.
1522         b := make([]byte, r.Length)
1523         p := c.t.info.Piece(int(r.Index))
1524         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1525         if n != len(b) {
1526                 if err == nil {
1527                         panic("expected error")
1528                 }
1529                 return
1530         } else if err == io.EOF {
1531                 err = nil
1532         }
1533         more = msg(pp.Message{
1534                 Type:  pp.Piece,
1535                 Index: r.Index,
1536                 Begin: r.Begin,
1537                 Piece: b,
1538         })
1539         c.lastChunkSent = time.Now()
1540         return
1541 }
1542
1543 func (c *connection) setTorrent(t *Torrent) {
1544         if c.t != nil {
1545                 panic("connection already associated with a torrent")
1546         }
1547         c.t = t
1548         t.reconcileHandshakeStats(c)
1549 }
1550
1551 func (c *connection) peerPriority() peerPriority {
1552         return bep40PriorityIgnoreError(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1553 }
1554
1555 func (c *connection) remoteIp() net.IP {
1556         return c.remoteAddr.IP
1557 }
1558
1559 func (c *connection) remoteIpPort() IpPort {
1560         return c.remoteAddr
1561 }