]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
cea8dab68b254285aea3eca8142e23d733d79d65
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/dht"
18         "github.com/anacrolix/log"
19
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/iter"
23         "github.com/anacrolix/missinggo/prioritybitmap"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/mse"
27         pp "github.com/anacrolix/torrent/peer_protocol"
28 )
29
30 type peerSource string
31
32 const (
33         peerSourceTracker         = "Tr"
34         peerSourceIncoming        = "I"
35         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
36         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
37         peerSourcePEX             = "X"
38 )
39
40 // Maintains the state of a connection with a peer.
41 type connection struct {
42         // First to ensure 64-bit alignment for atomics. See #262.
43         stats ConnStats
44
45         t *Torrent
46         // The actual Conn, used for closing, and setting socket options.
47         conn     net.Conn
48         outgoing bool
49         // The Reader and Writer for this Conn, with hooks installed for stats,
50         // limiting, deadlines etc.
51         w io.Writer
52         r io.Reader
53         // True if the connection is operating over MSE obfuscation.
54         headerEncrypted bool
55         cryptoMethod    mse.CryptoMethod
56         Discovery       peerSource
57         closed          missinggo.Event
58         // Set true after we've added our ConnStats generated during handshake to
59         // other ConnStat instances as determined when the *Torrent became known.
60         reconciledHandshakeStats bool
61
62         lastMessageReceived     time.Time
63         completedHandshake      time.Time
64         lastUsefulChunkReceived time.Time
65         lastChunkSent           time.Time
66
67         // Stuff controlled by the local peer.
68         Interested           bool
69         lastBecameInterested time.Time
70         priorInterest        time.Duration
71
72         lastStartedExpectingToReceiveChunks time.Time
73         cumulativeExpectedToReceiveChunks   time.Duration
74         chunksReceivedWhileExpecting        int64
75
76         Choked           bool
77         requests         map[request]struct{}
78         requestsLowWater int
79         // Chunks that we might reasonably expect to receive from the peer. Due to
80         // latency, buffering, and implementation differences, we may receive
81         // chunks that are no longer in the set of requests actually want.
82         validReceiveChunks map[request]struct{}
83         // Indexed by metadata piece, set to true if posted and pending a
84         // response.
85         metadataRequests []bool
86         sentHaves        bitmap.Bitmap
87
88         // Stuff controlled by the remote peer.
89         PeerID             PeerID
90         PeerInterested     bool
91         PeerChoked         bool
92         PeerRequests       map[request]struct{}
93         PeerExtensionBytes peerExtensionBytes
94         // The pieces the peer has claimed to have.
95         peerPieces bitmap.Bitmap
96         // The peer has everything. This can occur due to a special message, when
97         // we may not even know the number of pieces in the torrent yet.
98         peerSentHaveAll bool
99         // The highest possible number of pieces the torrent could have based on
100         // communication with the peer. Generally only useful until we have the
101         // torrent info.
102         peerMinPieces int
103         // Pieces we've accepted chunks for from the peer.
104         peerTouchedPieces map[int]struct{}
105         peerAllowedFast   bitmap.Bitmap
106
107         PeerMaxRequests  int // Maximum pending requests the peer allows.
108         PeerExtensionIDs map[string]byte
109         PeerClientName   string
110
111         pieceInclination  []int
112         pieceRequestOrder prioritybitmap.PriorityBitmap
113
114         writeBuffer *bytes.Buffer
115         uploadTimer *time.Timer
116         writerCond  sync.Cond
117 }
118
119 func (cn *connection) updateExpectingChunks() {
120         if cn.expectingChunks() {
121                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
122                         cn.lastStartedExpectingToReceiveChunks = time.Now()
123                 }
124         } else {
125                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
126                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
127                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
128                 }
129         }
130 }
131
132 func (cn *connection) expectingChunks() bool {
133         return cn.Interested && !cn.PeerChoked
134 }
135
136 // Returns true if the connection is over IPv6.
137 func (cn *connection) ipv6() bool {
138         ip := missinggo.AddrIP(cn.remoteAddr())
139         if ip.To4() != nil {
140                 return false
141         }
142         return len(ip) == net.IPv6len
143 }
144
145 // Returns true the dialer has the lower client peer ID. TODO: Find the
146 // specification for this.
147 func (cn *connection) isPreferredDirection() bool {
148         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
149 }
150
151 // Returns whether the left connection should be preferred over the right one,
152 // considering only their networking properties. If ok is false, we can't
153 // decide.
154 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
155         var ml multiLess
156         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
157         ml.NextBool(!l.utp(), !r.utp())
158         ml.NextBool(l.ipv6(), r.ipv6())
159         return ml.FinalOk()
160 }
161
162 func (cn *connection) cumInterest() time.Duration {
163         ret := cn.priorInterest
164         if cn.Interested {
165                 ret += time.Since(cn.lastBecameInterested)
166         }
167         return ret
168 }
169
170 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
171         if cn.peerSentHaveAll {
172                 return true, true
173         }
174         if !cn.t.haveInfo() {
175                 return false, false
176         }
177         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
178 }
179
180 func (cn *connection) mu() sync.Locker {
181         return &cn.t.cl.mu
182 }
183
184 func (cn *connection) remoteAddr() net.Addr {
185         return cn.conn.RemoteAddr()
186 }
187
188 func (cn *connection) localAddr() net.Addr {
189         return cn.conn.LocalAddr()
190 }
191
192 func (cn *connection) supportsExtension(ext string) bool {
193         _, ok := cn.PeerExtensionIDs[ext]
194         return ok
195 }
196
197 // The best guess at number of pieces in the torrent for this peer.
198 func (cn *connection) bestPeerNumPieces() int {
199         if cn.t.haveInfo() {
200                 return cn.t.numPieces()
201         }
202         return cn.peerMinPieces
203 }
204
205 func (cn *connection) completedString() string {
206         have := cn.peerPieces.Len()
207         if cn.peerSentHaveAll {
208                 have = cn.bestPeerNumPieces()
209         }
210         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
211 }
212
213 // Correct the PeerPieces slice length. Return false if the existing slice is
214 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
215 // messages.
216 func (cn *connection) setNumPieces(num int) error {
217         cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
218         cn.peerPiecesChanged()
219         return nil
220 }
221
222 func eventAgeString(t time.Time) string {
223         if t.IsZero() {
224                 return "never"
225         }
226         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
227 }
228
229 func (cn *connection) connectionFlags() (ret string) {
230         c := func(b byte) {
231                 ret += string([]byte{b})
232         }
233         if cn.cryptoMethod == mse.CryptoMethodRC4 {
234                 c('E')
235         } else if cn.headerEncrypted {
236                 c('e')
237         }
238         ret += string(cn.Discovery)
239         if cn.utp() {
240                 c('U')
241         }
242         return
243 }
244
245 func (cn *connection) utp() bool {
246         return strings.Contains(cn.remoteAddr().Network(), "utp")
247 }
248
249 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
250 func (cn *connection) statusFlags() (ret string) {
251         c := func(b byte) {
252                 ret += string([]byte{b})
253         }
254         if cn.Interested {
255                 c('i')
256         }
257         if cn.Choked {
258                 c('c')
259         }
260         c('-')
261         ret += cn.connectionFlags()
262         c('-')
263         if cn.PeerInterested {
264                 c('i')
265         }
266         if cn.PeerChoked {
267                 c('c')
268         }
269         return
270 }
271
272 // func (cn *connection) String() string {
273 //      var buf bytes.Buffer
274 //      cn.WriteStatus(&buf, nil)
275 //      return buf.String()
276 // }
277
278 func (cn *connection) downloadRate() float64 {
279         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
280 }
281
282 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
283         // \t isn't preserved in <pre> blocks?
284         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
285         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
286                 eventAgeString(cn.lastMessageReceived),
287                 eventAgeString(cn.completedHandshake),
288                 eventAgeString(cn.lastHelpful()),
289                 cn.cumInterest(),
290                 cn.totalExpectingTime(),
291         )
292         fmt.Fprintf(w,
293                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
294                 cn.completedString(),
295                 len(cn.peerTouchedPieces),
296                 &cn.stats.ChunksReadUseful,
297                 &cn.stats.ChunksRead,
298                 &cn.stats.ChunksWritten,
299                 cn.requestsLowWater,
300                 cn.numLocalRequests(),
301                 cn.nominalMaxRequests(),
302                 len(cn.PeerRequests),
303                 cn.statusFlags(),
304                 cn.downloadRate()/(1<<10),
305         )
306         roi := cn.pieceRequestOrderIter()
307         fmt.Fprintf(w, "    next pieces: %v%s\n",
308                 iter.ToSlice(iter.Head(10, roi)),
309                 func() string {
310                         if cn.shouldRequestWithoutBias() {
311                                 return " (fastest)"
312                         } else {
313                                 return ""
314                         }
315                 }())
316 }
317
318 func (cn *connection) Close() {
319         if !cn.closed.Set() {
320                 return
321         }
322         cn.tickleWriter()
323         cn.discardPieceInclination()
324         cn.pieceRequestOrder.Clear()
325         if cn.conn != nil {
326                 go cn.conn.Close()
327         }
328 }
329
330 func (cn *connection) PeerHasPiece(piece int) bool {
331         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
332 }
333
334 // Writes a message into the write buffer.
335 func (cn *connection) Post(msg pp.Message) {
336         messageTypesPosted.Add(msg.Type.String(), 1)
337         // We don't need to track bytes here because a connection.w Writer wrapper
338         // takes care of that (although there's some delay between us recording
339         // the message, and the connection writer flushing it out.).
340         cn.writeBuffer.Write(msg.MustMarshalBinary())
341         // Last I checked only Piece messages affect stats, and we don't post
342         // those.
343         cn.wroteMsg(&msg)
344         cn.tickleWriter()
345 }
346
347 func (cn *connection) requestMetadataPiece(index int) {
348         eID := cn.PeerExtensionIDs["ut_metadata"]
349         if eID == 0 {
350                 return
351         }
352         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
353                 return
354         }
355         cn.Post(pp.Message{
356                 Type:       pp.Extended,
357                 ExtendedID: eID,
358                 ExtendedPayload: func() []byte {
359                         b, err := bencode.Marshal(map[string]int{
360                                 "msg_type": pp.RequestMetadataExtensionMsgType,
361                                 "piece":    index,
362                         })
363                         if err != nil {
364                                 panic(err)
365                         }
366                         return b
367                 }(),
368         })
369         for index >= len(cn.metadataRequests) {
370                 cn.metadataRequests = append(cn.metadataRequests, false)
371         }
372         cn.metadataRequests[index] = true
373 }
374
375 func (cn *connection) requestedMetadataPiece(index int) bool {
376         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
377 }
378
379 // The actual value to use as the maximum outbound requests.
380 func (cn *connection) nominalMaxRequests() (ret int) {
381         if cn.t.requestStrategy == 3 {
382                 expectingTime := int64(cn.totalExpectingTime())
383                 if expectingTime == 0 {
384                         expectingTime = math.MaxInt64
385                 }
386                 return int(clamp(
387                         1,
388                         int64(cn.PeerMaxRequests),
389                         max(
390                                 // It makes sense to always pipeline at least one connection,
391                                 // since latency must be non-zero.
392                                 2,
393                                 // Request only as many as we expect to receive in the
394                                 // dupliateRequestTimeout window. We are trying to avoid having to
395                                 // duplicate requests.
396                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
397                         ),
398                 ))
399         }
400         return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
401 }
402
403 func (cn *connection) totalExpectingTime() (ret time.Duration) {
404         ret = cn.cumulativeExpectedToReceiveChunks
405         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
406                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
407         }
408         return
409
410 }
411
412 func (cn *connection) onPeerSentCancel(r request) {
413         if _, ok := cn.PeerRequests[r]; !ok {
414                 torrent.Add("unexpected cancels received", 1)
415                 return
416         }
417         if cn.fastEnabled() {
418                 cn.reject(r)
419         } else {
420                 delete(cn.PeerRequests, r)
421         }
422 }
423
424 func (cn *connection) Choke(msg messageWriter) (more bool) {
425         if cn.Choked {
426                 return true
427         }
428         cn.Choked = true
429         more = msg(pp.Message{
430                 Type: pp.Choke,
431         })
432         if cn.fastEnabled() {
433                 for r := range cn.PeerRequests {
434                         // TODO: Don't reject pieces in allowed fast set.
435                         cn.reject(r)
436                 }
437         } else {
438                 cn.PeerRequests = nil
439         }
440         return
441 }
442
443 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
444         if !cn.Choked {
445                 return true
446         }
447         cn.Choked = false
448         return msg(pp.Message{
449                 Type: pp.Unchoke,
450         })
451 }
452
453 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
454         if cn.Interested == interested {
455                 return true
456         }
457         cn.Interested = interested
458         if interested {
459                 cn.lastBecameInterested = time.Now()
460         } else if !cn.lastBecameInterested.IsZero() {
461                 cn.priorInterest += time.Since(cn.lastBecameInterested)
462         }
463         cn.updateExpectingChunks()
464         // log.Printf("%p: setting interest: %v", cn, interested)
465         return msg(pp.Message{
466                 Type: func() pp.MessageType {
467                         if interested {
468                                 return pp.Interested
469                         } else {
470                                 return pp.NotInterested
471                         }
472                 }(),
473         })
474 }
475
476 // The function takes a message to be sent, and returns true if more messages
477 // are okay.
478 type messageWriter func(pp.Message) bool
479
480 // Proxies the messageWriter's response.
481 func (cn *connection) request(r request, mw messageWriter) bool {
482         if _, ok := cn.requests[r]; ok {
483                 panic("chunk already requested")
484         }
485         if !cn.PeerHasPiece(r.Index.Int()) {
486                 panic("requesting piece peer doesn't have")
487         }
488         if _, ok := cn.t.conns[cn]; !ok {
489                 panic("requesting but not in active conns")
490         }
491         if cn.closed.IsSet() {
492                 panic("requesting when connection is closed")
493         }
494         if cn.PeerChoked {
495                 if cn.peerAllowedFast.Get(int(r.Index)) {
496                         torrent.Add("allowed fast requests sent", 1)
497                 } else {
498                         panic("requesting while choked and not allowed fast")
499                 }
500         }
501         if cn.t.hashingPiece(pieceIndex(r.Index)) {
502                 panic("piece is being hashed")
503         }
504         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
505                 panic("piece is queued for hash")
506         }
507         if cn.requests == nil {
508                 cn.requests = make(map[request]struct{})
509         }
510         cn.requests[r] = struct{}{}
511         if cn.validReceiveChunks == nil {
512                 cn.validReceiveChunks = make(map[request]struct{})
513         }
514         cn.validReceiveChunks[r] = struct{}{}
515         cn.t.pendingRequests[r]++
516         cn.t.lastRequested[r] = time.Now()
517         cn.updateExpectingChunks()
518         return mw(pp.Message{
519                 Type:   pp.Request,
520                 Index:  r.Index,
521                 Begin:  r.Begin,
522                 Length: r.Length,
523         })
524 }
525
526 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
527         numFillBuffers.Add(1)
528         cancel, new, i := cn.desiredRequestState()
529         if !cn.SetInterested(i, msg) {
530                 return
531         }
532         if cancel && len(cn.requests) != 0 {
533                 fillBufferSentCancels.Add(1)
534                 for r := range cn.requests {
535                         cn.deleteRequest(r)
536                         // log.Printf("%p: cancelling request: %v", cn, r)
537                         if !msg(makeCancelMessage(r)) {
538                                 return
539                         }
540                 }
541         }
542         if len(new) != 0 {
543                 fillBufferSentRequests.Add(1)
544                 for _, r := range new {
545                         if !cn.request(r, msg) {
546                                 // If we didn't completely top up the requests, we shouldn't
547                                 // mark the low water, since we'll want to top up the requests
548                                 // as soon as we have more write buffer space.
549                                 return
550                         }
551                 }
552                 cn.requestsLowWater = len(cn.requests) / 2
553         }
554         cn.upload(msg)
555 }
556
557 // Routine that writes to the peer. Some of what to write is buffered by
558 // activity elsewhere in the Client, and some is determined locally when the
559 // connection is writable.
560 func (cn *connection) writer(keepAliveTimeout time.Duration) {
561         var (
562                 lastWrite      time.Time = time.Now()
563                 keepAliveTimer *time.Timer
564         )
565         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
566                 cn.mu().Lock()
567                 defer cn.mu().Unlock()
568                 if time.Since(lastWrite) >= keepAliveTimeout {
569                         cn.tickleWriter()
570                 }
571                 keepAliveTimer.Reset(keepAliveTimeout)
572         })
573         cn.mu().Lock()
574         defer cn.mu().Unlock()
575         defer cn.Close()
576         defer keepAliveTimer.Stop()
577         frontBuf := new(bytes.Buffer)
578         for {
579                 if cn.closed.IsSet() {
580                         return
581                 }
582                 if cn.writeBuffer.Len() == 0 {
583                         cn.fillWriteBuffer(func(msg pp.Message) bool {
584                                 cn.wroteMsg(&msg)
585                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
586                                 return cn.writeBuffer.Len() < 1<<16
587                         })
588                 }
589                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
590                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
591                         postedKeepalives.Add(1)
592                 }
593                 if cn.writeBuffer.Len() == 0 {
594                         // TODO: Minimize wakeups....
595                         cn.writerCond.Wait()
596                         continue
597                 }
598                 // Flip the buffers.
599                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
600                 cn.mu().Unlock()
601                 n, err := cn.w.Write(frontBuf.Bytes())
602                 cn.mu().Lock()
603                 if n != 0 {
604                         lastWrite = time.Now()
605                         keepAliveTimer.Reset(keepAliveTimeout)
606                 }
607                 if err != nil {
608                         return
609                 }
610                 if n != frontBuf.Len() {
611                         panic("short write")
612                 }
613                 frontBuf.Reset()
614         }
615 }
616
617 func (cn *connection) Have(piece int) {
618         if cn.sentHaves.Get(piece) {
619                 return
620         }
621         cn.Post(pp.Message{
622                 Type:  pp.Have,
623                 Index: pp.Integer(piece),
624         })
625         cn.sentHaves.Add(piece)
626 }
627
628 func (cn *connection) PostBitfield() {
629         if cn.sentHaves.Len() != 0 {
630                 panic("bitfield must be first have-related message sent")
631         }
632         if !cn.t.haveAnyPieces() {
633                 return
634         }
635         cn.Post(pp.Message{
636                 Type:     pp.Bitfield,
637                 Bitfield: cn.t.bitfield(),
638         })
639         cn.sentHaves = cn.t.completedPieces.Copy()
640 }
641
642 // Determines interest and requests to send to a connected peer.
643 func nextRequestState(
644         networkingEnabled bool,
645         currentRequests map[request]struct{},
646         peerChoking bool,
647         iterPendingRequests func(f func(request) bool),
648         requestsLowWater int,
649         requestsHighWater int,
650         allowedFast bitmap.Bitmap,
651 ) (
652         cancelExisting bool, // Cancel all our pending requests
653         newRequests []request, // Chunks to request that we currently aren't
654         interested bool, // Whether we should indicate interest, even if we don't request anything
655 ) {
656         if !networkingEnabled {
657                 return true, nil, false
658         }
659         if len(currentRequests) > requestsLowWater {
660                 return false, nil, true
661         }
662         // If we have existing requests, better maintain interest to ensure we get
663         // them. iterPendingRequests might not iterate over outstanding requests.
664         interested = len(currentRequests) != 0
665         iterPendingRequests(func(r request) bool {
666                 interested = true
667                 if peerChoking {
668                         if allowedFast.IsEmpty() {
669                                 return false
670                         }
671                         if !allowedFast.Get(int(r.Index)) {
672                                 return true
673                         }
674                 }
675                 if len(currentRequests)+len(newRequests) >= requestsHighWater {
676                         return false
677                 }
678                 if _, ok := currentRequests[r]; !ok {
679                         if newRequests == nil {
680                                 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
681                         }
682                         newRequests = append(newRequests, r)
683                 }
684                 return true
685         })
686         return
687 }
688
689 func (cn *connection) updateRequests() {
690         // log.Print("update requests")
691         cn.tickleWriter()
692 }
693
694 // Emits the indices in the Bitmaps bms in order, never repeating any index.
695 // skip is mutated during execution, and its initial values will never be
696 // emitted.
697 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
698         return func(cb iter.Callback) {
699                 for _, bm := range bms {
700                         if !iter.All(func(i interface{}) bool {
701                                 skip.Add(i.(int))
702                                 return cb(i)
703                         }, bitmap.Sub(bm, *skip).Iter) {
704                                 return
705                         }
706                 }
707         }
708 }
709
710 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
711         now, readahead := cn.t.readerPiecePriorities()
712         var skip bitmap.Bitmap
713         if !cn.peerSentHaveAll {
714                 // Pieces to skip include pieces the peer doesn't have
715                 skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
716         }
717         // And pieces that we already have.
718         skip.Union(cn.t.completedPieces)
719         // Return an iterator over the different priority classes, minus the skip
720         // pieces.
721         return iter.Chain(
722                 iterBitmapsDistinct(&skip, now, readahead),
723                 func(cb iter.Callback) {
724                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
725                                 if skip.Contains(piece) {
726                                         return true
727                                 }
728                                 more := cb(piece)
729                                 skip.Add(piece)
730                                 return more
731                         })
732                 },
733         )
734 }
735
736 // The connection should download highest priority pieces first, without any
737 // inclination toward avoiding wastage. Generally we might do this if there's
738 // a single connection, or this is the fastest connection, and we have active
739 // readers that signal an ordering preference. It's conceivable that the best
740 // connection should do this, since it's least likely to waste our time if
741 // assigned to the highest priority pieces, and assigning more than one this
742 // role would cause significant wasted bandwidth.
743 func (cn *connection) shouldRequestWithoutBias() bool {
744         if cn.t.requestStrategy != 2 {
745                 return false
746         }
747         if len(cn.t.readers) == 0 {
748                 return false
749         }
750         if len(cn.t.conns) == 1 {
751                 return true
752         }
753         if cn == cn.t.fastestConn {
754                 return true
755         }
756         return false
757 }
758
759 func (cn *connection) pieceRequestOrderIter() iter.Func {
760         if cn.t.requestStrategy == 3 {
761                 return cn.unbiasedPieceRequestOrder()
762         }
763         if cn.shouldRequestWithoutBias() {
764                 return cn.unbiasedPieceRequestOrder()
765         } else {
766                 return cn.pieceRequestOrder.Iter
767         }
768 }
769
770 func (cn *connection) iterPendingRequests(f func(request) bool) {
771         cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
772                 piece := _piece.(int)
773                 return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
774                         r := request{pp.Integer(piece), cs}
775                         if cn.t.requestStrategy == 3 {
776                                 lr := cn.t.lastRequested[r]
777                                 if !lr.IsZero() {
778                                         if time.Since(lr) < cn.t.duplicateRequestTimeout {
779                                                 return true
780                                         } else {
781                                                 torrent.Add("requests duplicated due to timeout", 1)
782                                         }
783                                 }
784                         }
785                         return f(r)
786                 })
787         })
788 }
789
790 func (cn *connection) desiredRequestState() (bool, []request, bool) {
791         return nextRequestState(
792                 cn.t.networkingEnabled,
793                 cn.requests,
794                 cn.PeerChoked,
795                 cn.iterPendingRequests,
796                 cn.requestsLowWater,
797                 cn.nominalMaxRequests(),
798                 cn.peerAllowedFast,
799         )
800 }
801
802 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
803         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
804         // TODO: Use "math/rand".Shuffle >= Go 1.10
805         return iter.ForPerm(len(chunkIndices), func(i int) bool {
806                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
807         })
808 }
809
810 // check callers updaterequests
811 func (cn *connection) stopRequestingPiece(piece int) bool {
812         return cn.pieceRequestOrder.Remove(piece)
813 }
814
815 // This is distinct from Torrent piece priority, which is the user's
816 // preference. Connection piece priority is specific to a connection and is
817 // used to pseudorandomly avoid connections always requesting the same pieces
818 // and thus wasting effort.
819 func (cn *connection) updatePiecePriority(piece int) bool {
820         tpp := cn.t.piecePriority(piece)
821         if !cn.PeerHasPiece(piece) {
822                 tpp = PiecePriorityNone
823         }
824         if tpp == PiecePriorityNone {
825                 return cn.stopRequestingPiece(piece)
826         }
827         prio := cn.getPieceInclination()[piece]
828         switch cn.t.requestStrategy {
829         case 1:
830                 switch tpp {
831                 case PiecePriorityNormal:
832                 case PiecePriorityReadahead:
833                         prio -= cn.t.numPieces()
834                 case PiecePriorityNext, PiecePriorityNow:
835                         prio -= 2 * cn.t.numPieces()
836                 default:
837                         panic(tpp)
838                 }
839                 prio += piece / 3
840         default:
841         }
842         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
843 }
844
845 func (cn *connection) getPieceInclination() []int {
846         if cn.pieceInclination == nil {
847                 cn.pieceInclination = cn.t.getConnPieceInclination()
848         }
849         return cn.pieceInclination
850 }
851
852 func (cn *connection) discardPieceInclination() {
853         if cn.pieceInclination == nil {
854                 return
855         }
856         cn.t.putPieceInclination(cn.pieceInclination)
857         cn.pieceInclination = nil
858 }
859
860 func (cn *connection) peerPiecesChanged() {
861         if cn.t.haveInfo() {
862                 prioritiesChanged := false
863                 for i := range iter.N(cn.t.numPieces()) {
864                         if cn.updatePiecePriority(i) {
865                                 prioritiesChanged = true
866                         }
867                 }
868                 if prioritiesChanged {
869                         cn.updateRequests()
870                 }
871         }
872 }
873
874 func (cn *connection) raisePeerMinPieces(newMin int) {
875         if newMin > cn.peerMinPieces {
876                 cn.peerMinPieces = newMin
877         }
878 }
879
880 func (cn *connection) peerSentHave(piece int) error {
881         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
882                 return errors.New("invalid piece")
883         }
884         if cn.PeerHasPiece(piece) {
885                 return nil
886         }
887         cn.raisePeerMinPieces(piece + 1)
888         cn.peerPieces.Set(piece, true)
889         if cn.updatePiecePriority(piece) {
890                 cn.updateRequests()
891         }
892         return nil
893 }
894
895 func (cn *connection) peerSentBitfield(bf []bool) error {
896         cn.peerSentHaveAll = false
897         if len(bf)%8 != 0 {
898                 panic("expected bitfield length divisible by 8")
899         }
900         // We know that the last byte means that at most the last 7 bits are
901         // wasted.
902         cn.raisePeerMinPieces(len(bf) - 7)
903         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
904                 // Ignore known excess pieces.
905                 bf = bf[:cn.t.numPieces()]
906         }
907         for i, have := range bf {
908                 if have {
909                         cn.raisePeerMinPieces(i + 1)
910                 }
911                 cn.peerPieces.Set(i, have)
912         }
913         cn.peerPiecesChanged()
914         return nil
915 }
916
917 func (cn *connection) onPeerSentHaveAll() error {
918         cn.peerSentHaveAll = true
919         cn.peerPieces.Clear()
920         cn.peerPiecesChanged()
921         return nil
922 }
923
924 func (cn *connection) peerSentHaveNone() error {
925         cn.peerPieces.Clear()
926         cn.peerSentHaveAll = false
927         cn.peerPiecesChanged()
928         return nil
929 }
930
931 func (c *connection) requestPendingMetadata() {
932         if c.t.haveInfo() {
933                 return
934         }
935         if c.PeerExtensionIDs["ut_metadata"] == 0 {
936                 // Peer doesn't support this.
937                 return
938         }
939         // Request metadata pieces that we don't have in a random order.
940         var pending []int
941         for index := 0; index < c.t.metadataPieceCount(); index++ {
942                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
943                         pending = append(pending, index)
944                 }
945         }
946         for _, i := range rand.Perm(len(pending)) {
947                 c.requestMetadataPiece(pending[i])
948         }
949 }
950
951 func (cn *connection) wroteMsg(msg *pp.Message) {
952         messageTypesSent.Add(msg.Type.String(), 1)
953         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
954 }
955
956 func (cn *connection) readMsg(msg *pp.Message) {
957         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
958 }
959
960 // After handshake, we know what Torrent and Client stats to include for a
961 // connection.
962 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
963         t := cn.t
964         f(&t.stats)
965         f(&t.cl.stats)
966 }
967
968 // All ConnStats that include this connection. Some objects are not known
969 // until the handshake is complete, after which it's expected to reconcile the
970 // differences.
971 func (cn *connection) allStats(f func(*ConnStats)) {
972         f(&cn.stats)
973         if cn.reconciledHandshakeStats {
974                 cn.postHandshakeStats(f)
975         }
976 }
977
978 func (cn *connection) wroteBytes(n int64) {
979         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
980 }
981
982 func (cn *connection) readBytes(n int64) {
983         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
984 }
985
986 // Returns whether the connection could be useful to us. We're seeding and
987 // they want data, we don't have metainfo and they can provide it, etc.
988 func (c *connection) useful() bool {
989         t := c.t
990         if c.closed.IsSet() {
991                 return false
992         }
993         if !t.haveInfo() {
994                 return c.supportsExtension("ut_metadata")
995         }
996         if t.seeding() && c.PeerInterested {
997                 return true
998         }
999         if c.peerHasWantedPieces() {
1000                 return true
1001         }
1002         return false
1003 }
1004
1005 func (c *connection) lastHelpful() (ret time.Time) {
1006         ret = c.lastUsefulChunkReceived
1007         if c.t.seeding() && c.lastChunkSent.After(ret) {
1008                 ret = c.lastChunkSent
1009         }
1010         return
1011 }
1012
1013 func (c *connection) fastEnabled() bool {
1014         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1015 }
1016
1017 func (c *connection) reject(r request) {
1018         if !c.fastEnabled() {
1019                 panic("fast not enabled")
1020         }
1021         c.Post(r.ToMsg(pp.Reject))
1022         delete(c.PeerRequests, r)
1023 }
1024
1025 func (c *connection) onReadRequest(r request) error {
1026         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1027         if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
1028                 torrent.Add("bad requests received", 1)
1029                 return errors.New("bad request")
1030         }
1031         if _, ok := c.PeerRequests[r]; ok {
1032                 torrent.Add("duplicate requests received", 1)
1033                 return nil
1034         }
1035         if c.Choked {
1036                 torrent.Add("requests received while choking", 1)
1037                 if c.fastEnabled() {
1038                         torrent.Add("requests rejected while choking", 1)
1039                         c.reject(r)
1040                 }
1041                 return nil
1042         }
1043         if len(c.PeerRequests) >= maxRequests {
1044                 torrent.Add("requests received while queue full", 1)
1045                 if c.fastEnabled() {
1046                         c.reject(r)
1047                 }
1048                 // BEP 6 says we may close here if we choose.
1049                 return nil
1050         }
1051         if !c.t.havePiece(r.Index.Int()) {
1052                 // This isn't necessarily them screwing up. We can drop pieces
1053                 // from our storage, and can't communicate this to peers
1054                 // except by reconnecting.
1055                 requestsReceivedForMissingPieces.Add(1)
1056                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1057         }
1058         if c.PeerRequests == nil {
1059                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1060         }
1061         c.PeerRequests[r] = struct{}{}
1062         c.tickleWriter()
1063         return nil
1064 }
1065
1066 // Processes incoming bittorrent messages. The client lock is held upon entry
1067 // and exit. Returning will end the connection.
1068 func (c *connection) mainReadLoop() (err error) {
1069         defer func() {
1070                 if err != nil {
1071                         torrent.Add("connection.mainReadLoop returned with error", 1)
1072                 } else {
1073                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1074                 }
1075         }()
1076         t := c.t
1077         cl := t.cl
1078
1079         decoder := pp.Decoder{
1080                 R:         bufio.NewReaderSize(c.r, 1<<17),
1081                 MaxLength: 256 * 1024,
1082                 Pool:      t.chunkPool,
1083         }
1084         for {
1085                 var msg pp.Message
1086                 func() {
1087                         cl.mu.Unlock()
1088                         defer cl.mu.Lock()
1089                         err = decoder.Decode(&msg)
1090                 }()
1091                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1092                         return nil
1093                 }
1094                 if err != nil {
1095                         return err
1096                 }
1097                 c.readMsg(&msg)
1098                 c.lastMessageReceived = time.Now()
1099                 if msg.Keepalive {
1100                         receivedKeepalives.Add(1)
1101                         continue
1102                 }
1103                 messageTypesReceived.Add(msg.Type.String(), 1)
1104                 if msg.Type.FastExtension() && !c.fastEnabled() {
1105                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1106                 }
1107                 switch msg.Type {
1108                 case pp.Choke:
1109                         c.PeerChoked = true
1110                         c.deleteAllRequests()
1111                         // We can then reset our interest.
1112                         c.updateRequests()
1113                         c.updateExpectingChunks()
1114                 case pp.Reject:
1115                         c.deleteRequest(newRequestFromMessage(&msg))
1116                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1117                 case pp.Unchoke:
1118                         c.PeerChoked = false
1119                         c.tickleWriter()
1120                         c.updateExpectingChunks()
1121                 case pp.Interested:
1122                         c.PeerInterested = true
1123                         c.tickleWriter()
1124                 case pp.NotInterested:
1125                         c.PeerInterested = false
1126                         // We don't clear their requests since it isn't clear in the spec.
1127                         // We'll probably choke them for this, which will clear them if
1128                         // appropriate, and is clearly specified.
1129                 case pp.Have:
1130                         err = c.peerSentHave(int(msg.Index))
1131                 case pp.Request:
1132                         r := newRequestFromMessage(&msg)
1133                         err = c.onReadRequest(r)
1134                 case pp.Cancel:
1135                         req := newRequestFromMessage(&msg)
1136                         c.onPeerSentCancel(req)
1137                 case pp.Bitfield:
1138                         err = c.peerSentBitfield(msg.Bitfield)
1139                 case pp.HaveAll:
1140                         err = c.onPeerSentHaveAll()
1141                 case pp.HaveNone:
1142                         err = c.peerSentHaveNone()
1143                 case pp.Piece:
1144                         err = c.receiveChunk(&msg)
1145                         if len(msg.Piece) == int(t.chunkSize) {
1146                                 t.chunkPool.Put(&msg.Piece)
1147                         }
1148                 case pp.Extended:
1149                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1150                 case pp.Port:
1151                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1152                         if err != nil {
1153                                 panic(err)
1154                         }
1155                         if msg.Port != 0 {
1156                                 pingAddr.Port = int(msg.Port)
1157                         }
1158                         cl.eachDhtServer(func(s *dht.Server) {
1159                                 go s.Ping(pingAddr, nil)
1160                         })
1161                 case pp.AllowedFast:
1162                         torrent.Add("allowed fasts received", 1)
1163                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1164                         c.peerAllowedFast.Add(int(msg.Index))
1165                         c.updateRequests()
1166                 case pp.Suggest:
1167                         torrent.Add("suggests received", 1)
1168                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1169                         c.updateRequests()
1170                 default:
1171                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1172                 }
1173                 if err != nil {
1174                         return err
1175                 }
1176         }
1177 }
1178
1179 func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
1180         defer func() {
1181                 // TODO: Should we still do this?
1182                 if err != nil {
1183                         // These clients use their own extension IDs for outgoing message
1184                         // types, which is incorrect.
1185                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1186                                 err = nil
1187                         }
1188                 }
1189         }()
1190         t := c.t
1191         cl := t.cl
1192         switch id {
1193         case pp.HandshakeExtendedID:
1194                 // TODO: Create a bencode struct for this.
1195                 var d map[string]interface{}
1196                 err := bencode.Unmarshal(payload, &d)
1197                 if err != nil {
1198                         return fmt.Errorf("error decoding extended message payload: %s", err)
1199                 }
1200                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1201                 if reqq, ok := d["reqq"]; ok {
1202                         if i, ok := reqq.(int64); ok {
1203                                 c.PeerMaxRequests = int(i)
1204                         }
1205                 }
1206                 if v, ok := d["v"]; ok {
1207                         c.PeerClientName = v.(string)
1208                 }
1209                 if m, ok := d["m"]; ok {
1210                         mTyped, ok := m.(map[string]interface{})
1211                         if !ok {
1212                                 return errors.New("handshake m value is not dict")
1213                         }
1214                         if c.PeerExtensionIDs == nil {
1215                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1216                         }
1217                         for name, v := range mTyped {
1218                                 id, ok := v.(int64)
1219                                 if !ok {
1220                                         log.Printf("bad handshake m item extension ID type: %T", v)
1221                                         continue
1222                                 }
1223                                 if id == 0 {
1224                                         delete(c.PeerExtensionIDs, name)
1225                                 } else {
1226                                         if c.PeerExtensionIDs[name] == 0 {
1227                                                 supportedExtensionMessages.Add(name, 1)
1228                                         }
1229                                         c.PeerExtensionIDs[name] = byte(id)
1230                                 }
1231                         }
1232                 }
1233                 metadata_sizeUntyped, ok := d["metadata_size"]
1234                 if ok {
1235                         metadata_size, ok := metadata_sizeUntyped.(int64)
1236                         if !ok {
1237                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1238                         } else {
1239                                 err = t.setMetadataSize(metadata_size)
1240                                 if err != nil {
1241                                         return fmt.Errorf("error setting metadata size to %d", metadata_size)
1242                                 }
1243                         }
1244                 }
1245                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1246                         c.requestPendingMetadata()
1247                 }
1248                 return nil
1249         case metadataExtendedId:
1250                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1251                 if err != nil {
1252                         return fmt.Errorf("error handling metadata extension message: %s", err)
1253                 }
1254                 return nil
1255         case pexExtendedId:
1256                 if cl.config.DisablePEX {
1257                         // TODO: Maybe close the connection. Check that we're not
1258                         // advertising that we support PEX if it's disabled.
1259                         return nil
1260                 }
1261                 var pexMsg peerExchangeMessage
1262                 err := bencode.Unmarshal(payload, &pexMsg)
1263                 if err != nil {
1264                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1265                 }
1266                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1267                 t.addPeers(pexMsg.AddedPeers())
1268                 return nil
1269         default:
1270                 return fmt.Errorf("unexpected extended message ID: %v", id)
1271         }
1272 }
1273
1274 // Set both the Reader and Writer for the connection from a single ReadWriter.
1275 func (cn *connection) setRW(rw io.ReadWriter) {
1276         cn.r = rw
1277         cn.w = rw
1278 }
1279
1280 // Returns the Reader and Writer as a combined ReadWriter.
1281 func (cn *connection) rw() io.ReadWriter {
1282         return struct {
1283                 io.Reader
1284                 io.Writer
1285         }{cn.r, cn.w}
1286 }
1287
1288 // Handle a received chunk from a peer.
1289 func (c *connection) receiveChunk(msg *pp.Message) error {
1290         t := c.t
1291         cl := t.cl
1292         torrent.Add("chunks received", 1)
1293
1294         req := newRequestFromMessage(msg)
1295
1296         if c.PeerChoked {
1297                 torrent.Add("chunks received while choked", 1)
1298         }
1299
1300         if _, ok := c.validReceiveChunks[req]; !ok {
1301                 torrent.Add("chunks received unexpected", 1)
1302                 return errors.New("received unexpected chunk")
1303         }
1304         delete(c.validReceiveChunks, req)
1305
1306         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1307                 torrent.Add("chunks received due to allowed fast", 1)
1308         }
1309
1310         // Request has been satisfied.
1311         if c.deleteRequest(req) {
1312                 if c.expectingChunks() {
1313                         c.chunksReceivedWhileExpecting++
1314                 }
1315         } else {
1316                 torrent.Add("chunks received unwanted", 1)
1317         }
1318
1319         // Do we actually want this chunk?
1320         if t.haveChunk(req) {
1321                 torrent.Add("chunks received wasted", 1)
1322                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1323                 return nil
1324         }
1325
1326         index := int(req.Index)
1327         piece := &t.pieces[index]
1328
1329         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1330         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1331         c.lastUsefulChunkReceived = time.Now()
1332         // if t.fastestConn != c {
1333         // log.Printf("setting fastest connection %p", c)
1334         // }
1335         t.fastestConn = c
1336
1337         // Need to record that it hasn't been written yet, before we attempt to do
1338         // anything with it.
1339         piece.incrementPendingWrites()
1340         // Record that we have the chunk, so we aren't trying to download it while
1341         // waiting for it to be written to storage.
1342         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1343
1344         // Cancel pending requests for this chunk.
1345         for c := range t.conns {
1346                 c.postCancel(req)
1347         }
1348
1349         err := func() error {
1350                 cl.mu.Unlock()
1351                 defer cl.mu.Lock()
1352                 // Write the chunk out. Note that the upper bound on chunk writing
1353                 // concurrency will be the number of connections. We write inline with
1354                 // receiving the chunk (with this lock dance), because we want to
1355                 // handle errors synchronously and I haven't thought of a nice way to
1356                 // defer any concurrency to the storage and have that notify the
1357                 // client of errors. TODO: Do that instead.
1358                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1359         }()
1360
1361         piece.decrementPendingWrites()
1362
1363         if err != nil {
1364                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1365                 t.pendRequest(req)
1366                 t.updatePieceCompletion(int(msg.Index))
1367                 return nil
1368         }
1369
1370         // It's important that the piece is potentially queued before we check if
1371         // the piece is still wanted, because if it is queued, it won't be wanted.
1372         if t.pieceAllDirty(index) {
1373                 t.queuePieceCheck(int(req.Index))
1374                 t.pendAllChunkSpecs(index)
1375         }
1376
1377         c.onDirtiedPiece(index)
1378
1379         cl.event.Broadcast()
1380         t.publishPieceChange(int(req.Index))
1381
1382         return nil
1383 }
1384
1385 func (c *connection) onDirtiedPiece(piece int) {
1386         if c.peerTouchedPieces == nil {
1387                 c.peerTouchedPieces = make(map[int]struct{})
1388         }
1389         c.peerTouchedPieces[piece] = struct{}{}
1390         ds := &c.t.pieces[piece].dirtiers
1391         if *ds == nil {
1392                 *ds = make(map[*connection]struct{})
1393         }
1394         (*ds)[c] = struct{}{}
1395 }
1396
1397 func (c *connection) uploadAllowed() bool {
1398         if c.t.cl.config.NoUpload {
1399                 return false
1400         }
1401         if c.t.seeding() {
1402                 return true
1403         }
1404         if !c.peerHasWantedPieces() {
1405                 return false
1406         }
1407         // Don't upload more than 100 KiB more than we download.
1408         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1409                 return false
1410         }
1411         return true
1412 }
1413
1414 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1415         if c.uploadTimer == nil {
1416                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1417         } else {
1418                 c.uploadTimer.Reset(delay)
1419         }
1420 }
1421
1422 // Also handles choking and unchoking of the remote peer.
1423 func (c *connection) upload(msg func(pp.Message) bool) bool {
1424         // Breaking or completing this loop means we don't want to upload to the
1425         // peer anymore, and we choke them.
1426 another:
1427         for c.uploadAllowed() {
1428                 // We want to upload to the peer.
1429                 if !c.Unchoke(msg) {
1430                         return false
1431                 }
1432                 for r := range c.PeerRequests {
1433                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1434                         if !res.OK() {
1435                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1436                         }
1437                         delay := res.Delay()
1438                         if delay > 0 {
1439                                 res.Cancel()
1440                                 c.setRetryUploadTimer(delay)
1441                                 // Hard to say what to return here.
1442                                 return true
1443                         }
1444                         more, err := c.sendChunk(r, msg)
1445                         if err != nil {
1446                                 i := int(r.Index)
1447                                 if c.t.pieceComplete(i) {
1448                                         c.t.updatePieceCompletion(i)
1449                                         if !c.t.pieceComplete(i) {
1450                                                 // We had the piece, but not anymore.
1451                                                 break another
1452                                         }
1453                                 }
1454                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1455                                 // If we failed to send a chunk, choke the peer to ensure they
1456                                 // flush all their requests. We've probably dropped a piece,
1457                                 // but there's no way to communicate this to the peer. If they
1458                                 // ask for it again, we'll kick them to allow us to send them
1459                                 // an updated bitfield.
1460                                 break another
1461                         }
1462                         delete(c.PeerRequests, r)
1463                         if !more {
1464                                 return false
1465                         }
1466                         goto another
1467                 }
1468                 return true
1469         }
1470         return c.Choke(msg)
1471 }
1472
1473 func (cn *connection) Drop() {
1474         cn.t.dropConnection(cn)
1475 }
1476
1477 func (cn *connection) netGoodPiecesDirtied() int64 {
1478         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1479 }
1480
1481 func (c *connection) peerHasWantedPieces() bool {
1482         return !c.pieceRequestOrder.IsEmpty()
1483 }
1484
1485 func (c *connection) numLocalRequests() int {
1486         return len(c.requests)
1487 }
1488
1489 func (c *connection) deleteRequest(r request) bool {
1490         if _, ok := c.requests[r]; !ok {
1491                 return false
1492         }
1493         delete(c.requests, r)
1494         c.updateExpectingChunks()
1495         delete(c.t.lastRequested, r)
1496         pr := c.t.pendingRequests
1497         pr[r]--
1498         n := pr[r]
1499         if n == 0 {
1500                 delete(pr, r)
1501         }
1502         if n < 0 {
1503                 panic(n)
1504         }
1505         c.updateRequests()
1506         return true
1507 }
1508
1509 func (c *connection) deleteAllRequests() {
1510         for r := range c.requests {
1511                 c.deleteRequest(r)
1512         }
1513         if len(c.requests) != 0 {
1514                 panic(len(c.requests))
1515         }
1516         // for c := range c.t.conns {
1517         //      c.tickleWriter()
1518         // }
1519 }
1520
1521 func (c *connection) tickleWriter() {
1522         c.writerCond.Broadcast()
1523 }
1524
1525 func (c *connection) postCancel(r request) bool {
1526         if !c.deleteRequest(r) {
1527                 return false
1528         }
1529         c.Post(makeCancelMessage(r))
1530         return true
1531 }
1532
1533 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1534         // Count the chunk being sent, even if it isn't.
1535         b := make([]byte, r.Length)
1536         p := c.t.info.Piece(int(r.Index))
1537         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1538         if n != len(b) {
1539                 if err == nil {
1540                         panic("expected error")
1541                 }
1542                 return
1543         } else if err == io.EOF {
1544                 err = nil
1545         }
1546         more = msg(pp.Message{
1547                 Type:  pp.Piece,
1548                 Index: r.Index,
1549                 Begin: r.Begin,
1550                 Piece: b,
1551         })
1552         c.lastChunkSent = time.Now()
1553         return
1554 }
1555
1556 func (c *connection) setTorrent(t *Torrent) {
1557         if c.t != nil {
1558                 panic("connection already associated with a torrent")
1559         }
1560         c.t = t
1561         t.reconcileHandshakeStats(c)
1562 }