]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Comments and trivial tweaks
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "math"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/dht"
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22         "github.com/pkg/errors"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/mse"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27 )
28
29 type peerSource string
30
31 const (
32         peerSourceTracker         = "Tr"
33         peerSourceIncoming        = "I"
34         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
35         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
36         peerSourcePEX             = "X"
37 )
38
39 // Maintains the state of a connection with a peer.
40 type connection struct {
41         // First to ensure 64-bit alignment for atomics. See #262.
42         stats ConnStats
43
44         t *Torrent
45         // The actual Conn, used for closing, and setting socket options.
46         conn     net.Conn
47         outgoing bool
48         // The Reader and Writer for this Conn, with hooks installed for stats,
49         // limiting, deadlines etc.
50         w io.Writer
51         r io.Reader
52         // True if the connection is operating over MSE obfuscation.
53         headerEncrypted bool
54         cryptoMethod    mse.CryptoMethod
55         Discovery       peerSource
56         closed          missinggo.Event
57         // Set true after we've added our ConnStats generated during handshake to
58         // other ConnStat instances as determined when the *Torrent became known.
59         reconciledHandshakeStats bool
60
61         lastMessageReceived     time.Time
62         completedHandshake      time.Time
63         lastUsefulChunkReceived time.Time
64         lastChunkSent           time.Time
65
66         // Stuff controlled by the local peer.
67         Interested           bool
68         lastBecameInterested time.Time
69         priorInterest        time.Duration
70
71         lastStartedExpectingToReceiveChunks time.Time
72         cumulativeExpectedToReceiveChunks   time.Duration
73         chunksReceivedWhileExpecting        int64
74
75         Choked           bool
76         requests         map[request]struct{}
77         requestsLowWater int
78         // Chunks that we might reasonably expect to receive from the peer. Due to
79         // latency, buffering, and implementation differences, we may receive
80         // chunks that are no longer in the set of requests actually want.
81         validReceiveChunks map[request]struct{}
82         // Indexed by metadata piece, set to true if posted and pending a
83         // response.
84         metadataRequests []bool
85         sentHaves        bitmap.Bitmap
86
87         // Stuff controlled by the remote peer.
88         PeerID             PeerID
89         PeerInterested     bool
90         PeerChoked         bool
91         PeerRequests       map[request]struct{}
92         PeerExtensionBytes pp.PeerExtensionBits
93         // The pieces the peer has claimed to have.
94         peerPieces bitmap.Bitmap
95         // The peer has everything. This can occur due to a special message, when
96         // we may not even know the number of pieces in the torrent yet.
97         peerSentHaveAll bool
98         // The highest possible number of pieces the torrent could have based on
99         // communication with the peer. Generally only useful until we have the
100         // torrent info.
101         peerMinPieces pieceIndex
102         // Pieces we've accepted chunks for from the peer.
103         peerTouchedPieces map[pieceIndex]struct{}
104         peerAllowedFast   bitmap.Bitmap
105
106         PeerMaxRequests  int // Maximum pending requests the peer allows.
107         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
108         PeerClientName   string
109
110         pieceInclination  []int
111         pieceRequestOrder prioritybitmap.PriorityBitmap
112
113         writeBuffer *bytes.Buffer
114         uploadTimer *time.Timer
115         writerCond  sync.Cond
116 }
117
118 func (cn *connection) updateExpectingChunks() {
119         if cn.expectingChunks() {
120                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
121                         cn.lastStartedExpectingToReceiveChunks = time.Now()
122                 }
123         } else {
124                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
125                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
126                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
127                 }
128         }
129 }
130
131 func (cn *connection) expectingChunks() bool {
132         return cn.Interested && !cn.PeerChoked
133 }
134
135 // Returns true if the connection is over IPv6.
136 func (cn *connection) ipv6() bool {
137         ip := missinggo.AddrIP(cn.remoteAddr())
138         if ip.To4() != nil {
139                 return false
140         }
141         return len(ip) == net.IPv6len
142 }
143
144 // Returns true the dialer has the lower client peer ID. TODO: Find the
145 // specification for this.
146 func (cn *connection) isPreferredDirection() bool {
147         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
148 }
149
150 // Returns whether the left connection should be preferred over the right one,
151 // considering only their networking properties. If ok is false, we can't
152 // decide.
153 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
154         var ml multiLess
155         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
156         ml.NextBool(!l.utp(), !r.utp())
157         ml.NextBool(l.ipv6(), r.ipv6())
158         return ml.FinalOk()
159 }
160
161 func (cn *connection) cumInterest() time.Duration {
162         ret := cn.priorInterest
163         if cn.Interested {
164                 ret += time.Since(cn.lastBecameInterested)
165         }
166         return ret
167 }
168
169 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
170         if cn.peerSentHaveAll {
171                 return true, true
172         }
173         if !cn.t.haveInfo() {
174                 return false, false
175         }
176         return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
177 }
178
179 func (cn *connection) mu() sync.Locker {
180         return &cn.t.cl.mu
181 }
182
183 func (cn *connection) remoteAddr() net.Addr {
184         return cn.conn.RemoteAddr()
185 }
186
187 func (cn *connection) localAddr() net.Addr {
188         return cn.conn.LocalAddr()
189 }
190
191 func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
192         _, ok := cn.PeerExtensionIDs[ext]
193         return ok
194 }
195
196 // The best guess at number of pieces in the torrent for this peer.
197 func (cn *connection) bestPeerNumPieces() pieceIndex {
198         if cn.t.haveInfo() {
199                 return cn.t.numPieces()
200         }
201         return cn.peerMinPieces
202 }
203
204 func (cn *connection) completedString() string {
205         have := pieceIndex(cn.peerPieces.Len())
206         if cn.peerSentHaveAll {
207                 have = cn.bestPeerNumPieces()
208         }
209         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
210 }
211
212 // Correct the PeerPieces slice length. Return false if the existing slice is
213 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
214 // messages.
215 func (cn *connection) setNumPieces(num pieceIndex) error {
216         cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
217         cn.peerPiecesChanged()
218         return nil
219 }
220
221 func eventAgeString(t time.Time) string {
222         if t.IsZero() {
223                 return "never"
224         }
225         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
226 }
227
228 func (cn *connection) connectionFlags() (ret string) {
229         c := func(b byte) {
230                 ret += string([]byte{b})
231         }
232         if cn.cryptoMethod == mse.CryptoMethodRC4 {
233                 c('E')
234         } else if cn.headerEncrypted {
235                 c('e')
236         }
237         ret += string(cn.Discovery)
238         if cn.utp() {
239                 c('U')
240         }
241         return
242 }
243
244 func (cn *connection) utp() bool {
245         return strings.Contains(cn.remoteAddr().Network(), "utp")
246 }
247
248 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
249 func (cn *connection) statusFlags() (ret string) {
250         c := func(b byte) {
251                 ret += string([]byte{b})
252         }
253         if cn.Interested {
254                 c('i')
255         }
256         if cn.Choked {
257                 c('c')
258         }
259         c('-')
260         ret += cn.connectionFlags()
261         c('-')
262         if cn.PeerInterested {
263                 c('i')
264         }
265         if cn.PeerChoked {
266                 c('c')
267         }
268         return
269 }
270
271 // func (cn *connection) String() string {
272 //      var buf bytes.Buffer
273 //      cn.WriteStatus(&buf, nil)
274 //      return buf.String()
275 // }
276
277 func (cn *connection) downloadRate() float64 {
278         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
279 }
280
281 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
282         // \t isn't preserved in <pre> blocks?
283         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
284         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
285                 eventAgeString(cn.lastMessageReceived),
286                 eventAgeString(cn.completedHandshake),
287                 eventAgeString(cn.lastHelpful()),
288                 cn.cumInterest(),
289                 cn.totalExpectingTime(),
290         )
291         fmt.Fprintf(w,
292                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
293                 cn.completedString(),
294                 len(cn.peerTouchedPieces),
295                 &cn.stats.ChunksReadUseful,
296                 &cn.stats.ChunksRead,
297                 &cn.stats.ChunksWritten,
298                 cn.requestsLowWater,
299                 cn.numLocalRequests(),
300                 cn.nominalMaxRequests(),
301                 len(cn.PeerRequests),
302                 cn.statusFlags(),
303                 cn.downloadRate()/(1<<10),
304         )
305         fmt.Fprintf(w, "    next pieces: %v%s\n",
306                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
307                 func() string {
308                         if cn.shouldRequestWithoutBias() {
309                                 return " (fastest)"
310                         } else {
311                                 return ""
312                         }
313                 }())
314 }
315
316 func (cn *connection) Close() {
317         if !cn.closed.Set() {
318                 return
319         }
320         cn.tickleWriter()
321         cn.discardPieceInclination()
322         cn.pieceRequestOrder.Clear()
323         if cn.conn != nil {
324                 go cn.conn.Close()
325         }
326 }
327
328 func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
329         return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
330 }
331
332 // Writes a message into the write buffer.
333 func (cn *connection) Post(msg pp.Message) {
334         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
335         // We don't need to track bytes here because a connection.w Writer wrapper
336         // takes care of that (although there's some delay between us recording
337         // the message, and the connection writer flushing it out.).
338         cn.writeBuffer.Write(msg.MustMarshalBinary())
339         // Last I checked only Piece messages affect stats, and we don't post
340         // those.
341         cn.wroteMsg(&msg)
342         cn.tickleWriter()
343 }
344
345 func (cn *connection) requestMetadataPiece(index int) {
346         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
347         if eID == 0 {
348                 return
349         }
350         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
351                 return
352         }
353         cn.Post(pp.Message{
354                 Type:       pp.Extended,
355                 ExtendedID: eID,
356                 ExtendedPayload: func() []byte {
357                         b, err := bencode.Marshal(map[string]int{
358                                 "msg_type": pp.RequestMetadataExtensionMsgType,
359                                 "piece":    index,
360                         })
361                         if err != nil {
362                                 panic(err)
363                         }
364                         return b
365                 }(),
366         })
367         for index >= len(cn.metadataRequests) {
368                 cn.metadataRequests = append(cn.metadataRequests, false)
369         }
370         cn.metadataRequests[index] = true
371 }
372
373 func (cn *connection) requestedMetadataPiece(index int) bool {
374         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
375 }
376
377 // The actual value to use as the maximum outbound requests.
378 func (cn *connection) nominalMaxRequests() (ret int) {
379         if cn.t.requestStrategy == 3 {
380                 expectingTime := int64(cn.totalExpectingTime())
381                 if expectingTime == 0 {
382                         expectingTime = math.MaxInt64
383                 } else {
384                         expectingTime *= 2
385                 }
386                 return int(clamp(
387                         1,
388                         int64(cn.PeerMaxRequests),
389                         max(
390                                 // It makes sense to always pipeline at least one connection,
391                                 // since latency must be non-zero.
392                                 2,
393                                 // Request only as many as we expect to receive in the
394                                 // dupliateRequestTimeout window. We are trying to avoid having to
395                                 // duplicate requests.
396                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
397                         ),
398                 ))
399         }
400         return int(clamp(
401                 1,
402                 int64(cn.PeerMaxRequests),
403                 max(64,
404                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
405 }
406
407 func (cn *connection) totalExpectingTime() (ret time.Duration) {
408         ret = cn.cumulativeExpectedToReceiveChunks
409         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
410                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
411         }
412         return
413
414 }
415
416 func (cn *connection) onPeerSentCancel(r request) {
417         if _, ok := cn.PeerRequests[r]; !ok {
418                 torrent.Add("unexpected cancels received", 1)
419                 return
420         }
421         if cn.fastEnabled() {
422                 cn.reject(r)
423         } else {
424                 delete(cn.PeerRequests, r)
425         }
426 }
427
428 func (cn *connection) Choke(msg messageWriter) (more bool) {
429         if cn.Choked {
430                 return true
431         }
432         cn.Choked = true
433         more = msg(pp.Message{
434                 Type: pp.Choke,
435         })
436         if cn.fastEnabled() {
437                 for r := range cn.PeerRequests {
438                         // TODO: Don't reject pieces in allowed fast set.
439                         cn.reject(r)
440                 }
441         } else {
442                 cn.PeerRequests = nil
443         }
444         return
445 }
446
447 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
448         if !cn.Choked {
449                 return true
450         }
451         cn.Choked = false
452         return msg(pp.Message{
453                 Type: pp.Unchoke,
454         })
455 }
456
457 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
458         if cn.Interested == interested {
459                 return true
460         }
461         cn.Interested = interested
462         if interested {
463                 cn.lastBecameInterested = time.Now()
464         } else if !cn.lastBecameInterested.IsZero() {
465                 cn.priorInterest += time.Since(cn.lastBecameInterested)
466         }
467         cn.updateExpectingChunks()
468         // log.Printf("%p: setting interest: %v", cn, interested)
469         return msg(pp.Message{
470                 Type: func() pp.MessageType {
471                         if interested {
472                                 return pp.Interested
473                         } else {
474                                 return pp.NotInterested
475                         }
476                 }(),
477         })
478 }
479
480 // The function takes a message to be sent, and returns true if more messages
481 // are okay.
482 type messageWriter func(pp.Message) bool
483
484 // Proxies the messageWriter's response.
485 func (cn *connection) request(r request, mw messageWriter) bool {
486         if _, ok := cn.requests[r]; ok {
487                 panic("chunk already requested")
488         }
489         if !cn.PeerHasPiece(r.Index) {
490                 panic("requesting piece peer doesn't have")
491         }
492         if _, ok := cn.t.conns[cn]; !ok {
493                 panic("requesting but not in active conns")
494         }
495         if cn.closed.IsSet() {
496                 panic("requesting when connection is closed")
497         }
498         if cn.PeerChoked {
499                 if cn.peerAllowedFast.Get(int(r.Index)) {
500                         torrent.Add("allowed fast requests sent", 1)
501                 } else {
502                         panic("requesting while choked and not allowed fast")
503                 }
504         }
505         if cn.t.hashingPiece(pieceIndex(r.Index)) {
506                 panic("piece is being hashed")
507         }
508         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
509                 panic("piece is queued for hash")
510         }
511         if cn.requests == nil {
512                 cn.requests = make(map[request]struct{})
513         }
514         cn.requests[r] = struct{}{}
515         if cn.validReceiveChunks == nil {
516                 cn.validReceiveChunks = make(map[request]struct{})
517         }
518         cn.validReceiveChunks[r] = struct{}{}
519         cn.t.pendingRequests[r]++
520         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
521                 torrent.Add("duplicate request timeouts", 1)
522                 cn.mu().Lock()
523                 defer cn.mu().Unlock()
524                 delete(cn.t.lastRequested, r)
525                 for cn := range cn.t.conns {
526                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
527                                 cn.updateRequests()
528                         }
529                 }
530         })
531         cn.updateExpectingChunks()
532         return mw(pp.Message{
533                 Type:   pp.Request,
534                 Index:  r.Index,
535                 Begin:  r.Begin,
536                 Length: r.Length,
537         })
538 }
539
540 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
541         if !cn.t.networkingEnabled {
542                 if !cn.SetInterested(false, msg) {
543                         return
544                 }
545                 if len(cn.requests) != 0 {
546                         for r := range cn.requests {
547                                 cn.deleteRequest(r)
548                                 // log.Printf("%p: cancelling request: %v", cn, r)
549                                 if !msg(makeCancelMessage(r)) {
550                                         return
551                                 }
552                         }
553                 }
554         }
555         if len(cn.requests) <= cn.requestsLowWater {
556                 filledBuffer := false
557                 cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
558                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
559                                 if !cn.SetInterested(true, msg) {
560                                         filledBuffer = true
561                                         return false
562                                 }
563                                 if len(cn.requests) >= cn.nominalMaxRequests() {
564                                         return false
565                                 }
566                                 // Choking is looked at here because our interest is dependent
567                                 // on whether we'd make requests in its absence.
568                                 if cn.PeerChoked {
569                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
570                                                 return false
571                                         }
572                                 }
573                                 if _, ok := cn.requests[r]; ok {
574                                         return true
575                                 }
576                                 filledBuffer = !cn.request(r, msg)
577                                 return !filledBuffer
578                         })
579                         return !filledBuffer
580                 })
581                 if filledBuffer {
582                         // If we didn't completely top up the requests, we shouldn't mark
583                         // the low water, since we'll want to top up the requests as soon
584                         // as we have more write buffer space.
585                         return
586                 }
587                 cn.requestsLowWater = len(cn.requests) / 2
588         }
589
590         cn.upload(msg)
591 }
592
593 // Routine that writes to the peer. Some of what to write is buffered by
594 // activity elsewhere in the Client, and some is determined locally when the
595 // connection is writable.
596 func (cn *connection) writer(keepAliveTimeout time.Duration) {
597         var (
598                 lastWrite      time.Time = time.Now()
599                 keepAliveTimer *time.Timer
600         )
601         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
602                 cn.mu().Lock()
603                 defer cn.mu().Unlock()
604                 if time.Since(lastWrite) >= keepAliveTimeout {
605                         cn.tickleWriter()
606                 }
607                 keepAliveTimer.Reset(keepAliveTimeout)
608         })
609         cn.mu().Lock()
610         defer cn.mu().Unlock()
611         defer cn.Close()
612         defer keepAliveTimer.Stop()
613         frontBuf := new(bytes.Buffer)
614         for {
615                 if cn.closed.IsSet() {
616                         return
617                 }
618                 if cn.writeBuffer.Len() == 0 {
619                         cn.fillWriteBuffer(func(msg pp.Message) bool {
620                                 cn.wroteMsg(&msg)
621                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
622                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
623                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
624                         })
625                 }
626                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
627                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
628                         postedKeepalives.Add(1)
629                 }
630                 if cn.writeBuffer.Len() == 0 {
631                         // TODO: Minimize wakeups....
632                         cn.writerCond.Wait()
633                         continue
634                 }
635                 // Flip the buffers.
636                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
637                 cn.mu().Unlock()
638                 n, err := cn.w.Write(frontBuf.Bytes())
639                 cn.mu().Lock()
640                 if n != 0 {
641                         lastWrite = time.Now()
642                         keepAliveTimer.Reset(keepAliveTimeout)
643                 }
644                 if err != nil {
645                         return
646                 }
647                 if n != frontBuf.Len() {
648                         panic("short write")
649                 }
650                 frontBuf.Reset()
651         }
652 }
653
654 func (cn *connection) Have(piece pieceIndex) {
655         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
656                 return
657         }
658         cn.Post(pp.Message{
659                 Type:  pp.Have,
660                 Index: pp.Integer(piece),
661         })
662         cn.sentHaves.Add(bitmap.BitIndex(piece))
663 }
664
665 func (cn *connection) PostBitfield() {
666         if cn.sentHaves.Len() != 0 {
667                 panic("bitfield must be first have-related message sent")
668         }
669         if !cn.t.haveAnyPieces() {
670                 return
671         }
672         cn.Post(pp.Message{
673                 Type:     pp.Bitfield,
674                 Bitfield: cn.t.bitfield(),
675         })
676         cn.sentHaves = cn.t.completedPieces.Copy()
677 }
678
679 func (cn *connection) updateRequests() {
680         // log.Print("update requests")
681         cn.tickleWriter()
682 }
683
684 // Emits the indices in the Bitmaps bms in order, never repeating any index.
685 // skip is mutated during execution, and its initial values will never be
686 // emitted.
687 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
688         return func(cb iter.Callback) {
689                 for _, bm := range bms {
690                         if !iter.All(func(i interface{}) bool {
691                                 skip.Add(i.(int))
692                                 return cb(i)
693                         }, bitmap.Sub(bm, *skip).Iter) {
694                                 return
695                         }
696                 }
697         }
698 }
699
700 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
701         now, readahead := cn.t.readerPiecePriorities()
702         var skip bitmap.Bitmap
703         if !cn.peerSentHaveAll {
704                 // Pieces to skip include pieces the peer doesn't have.
705                 skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
706         }
707         // And pieces that we already have.
708         skip.Union(cn.t.completedPieces)
709         skip.Union(cn.t.piecesQueuedForHash)
710         // Return an iterator over the different priority classes, minus the skip
711         // pieces.
712         return iter.All(
713                 func(_piece interface{}) bool {
714                         i := _piece.(bitmap.BitIndex)
715                         if cn.t.hashingPiece(pieceIndex(i)) {
716                                 return true
717                         }
718                         return f(pieceIndex(i))
719                 },
720                 iterBitmapsDistinct(&skip, now, readahead),
721                 func(cb iter.Callback) {
722                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
723                                 if skip.Contains(piece) {
724                                         return true
725                                 }
726                                 more := cb(piece)
727                                 skip.Add(piece)
728                                 return more
729                         })
730                 },
731         )
732 }
733
734 // The connection should download highest priority pieces first, without any
735 // inclination toward avoiding wastage. Generally we might do this if there's
736 // a single connection, or this is the fastest connection, and we have active
737 // readers that signal an ordering preference. It's conceivable that the best
738 // connection should do this, since it's least likely to waste our time if
739 // assigned to the highest priority pieces, and assigning more than one this
740 // role would cause significant wasted bandwidth.
741 func (cn *connection) shouldRequestWithoutBias() bool {
742         if cn.t.requestStrategy != 2 {
743                 return false
744         }
745         if len(cn.t.readers) == 0 {
746                 return false
747         }
748         if len(cn.t.conns) == 1 {
749                 return true
750         }
751         if cn == cn.t.fastestConn {
752                 return true
753         }
754         return false
755 }
756
757 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
758         if !cn.t.haveInfo() {
759                 return false
760         }
761         if cn.t.requestStrategy == 3 {
762                 return cn.iterUnbiasedPieceRequestOrder(f)
763         }
764         if cn.shouldRequestWithoutBias() {
765                 return cn.iterUnbiasedPieceRequestOrder(f)
766         } else {
767                 return cn.pieceRequestOrder.IterTyped(func(i int) bool {
768                         return f(pieceIndex(i))
769                 })
770         }
771 }
772
773 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
774         cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
775 }
776
777 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
778         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
779                 r := request{pp.Integer(piece), cs}
780                 if cn.t.requestStrategy == 3 {
781                         if _, ok := cn.t.lastRequested[r]; ok {
782                                 // This piece has been requested on another connection, and
783                                 // the duplicate request timer is still running.
784                                 return true
785                         }
786                 }
787                 return f(r)
788         })
789 }
790
791 func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
792         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
793         // TODO: Use "math/rand".Shuffle >= Go 1.10
794         return iter.ForPerm(len(chunkIndices), func(i int) bool {
795                 return f(t.chunkIndexSpec(pieceIndex(chunkIndices[i]), piece))
796         })
797 }
798
799 // check callers updaterequests
800 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
801         return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
802 }
803
804 // This is distinct from Torrent piece priority, which is the user's
805 // preference. Connection piece priority is specific to a connection and is
806 // used to pseudorandomly avoid connections always requesting the same pieces
807 // and thus wasting effort.
808 func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
809         tpp := cn.t.piecePriority(piece)
810         if !cn.PeerHasPiece(piece) {
811                 tpp = PiecePriorityNone
812         }
813         if tpp == PiecePriorityNone {
814                 return cn.stopRequestingPiece(piece)
815         }
816         prio := cn.getPieceInclination()[piece]
817         switch cn.t.requestStrategy {
818         case 1:
819                 switch tpp {
820                 case PiecePriorityNormal:
821                 case PiecePriorityReadahead:
822                         prio -= int(cn.t.numPieces())
823                 case PiecePriorityNext, PiecePriorityNow:
824                         prio -= 2 * int(cn.t.numPieces())
825                 default:
826                         panic(tpp)
827                 }
828                 prio += int(piece / 3)
829         default:
830         }
831         return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
832 }
833
834 func (cn *connection) getPieceInclination() []int {
835         if cn.pieceInclination == nil {
836                 cn.pieceInclination = cn.t.getConnPieceInclination()
837         }
838         return cn.pieceInclination
839 }
840
841 func (cn *connection) discardPieceInclination() {
842         if cn.pieceInclination == nil {
843                 return
844         }
845         cn.t.putPieceInclination(cn.pieceInclination)
846         cn.pieceInclination = nil
847 }
848
849 func (cn *connection) peerPiecesChanged() {
850         if cn.t.haveInfo() {
851                 prioritiesChanged := false
852                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
853                         if cn.updatePiecePriority(i) {
854                                 prioritiesChanged = true
855                         }
856                 }
857                 if prioritiesChanged {
858                         cn.updateRequests()
859                 }
860         }
861 }
862
863 func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
864         if newMin > cn.peerMinPieces {
865                 cn.peerMinPieces = newMin
866         }
867 }
868
869 func (cn *connection) peerSentHave(piece pieceIndex) error {
870         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
871                 return errors.New("invalid piece")
872         }
873         if cn.PeerHasPiece(piece) {
874                 return nil
875         }
876         cn.raisePeerMinPieces(piece + 1)
877         cn.peerPieces.Set(bitmap.BitIndex(piece), true)
878         if cn.updatePiecePriority(piece) {
879                 cn.updateRequests()
880         }
881         return nil
882 }
883
884 func (cn *connection) peerSentBitfield(bf []bool) error {
885         cn.peerSentHaveAll = false
886         if len(bf)%8 != 0 {
887                 panic("expected bitfield length divisible by 8")
888         }
889         // We know that the last byte means that at most the last 7 bits are
890         // wasted.
891         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
892         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
893                 // Ignore known excess pieces.
894                 bf = bf[:cn.t.numPieces()]
895         }
896         for i, have := range bf {
897                 if have {
898                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
899                 }
900                 cn.peerPieces.Set(i, have)
901         }
902         cn.peerPiecesChanged()
903         return nil
904 }
905
906 func (cn *connection) onPeerSentHaveAll() error {
907         cn.peerSentHaveAll = true
908         cn.peerPieces.Clear()
909         cn.peerPiecesChanged()
910         return nil
911 }
912
913 func (cn *connection) peerSentHaveNone() error {
914         cn.peerPieces.Clear()
915         cn.peerSentHaveAll = false
916         cn.peerPiecesChanged()
917         return nil
918 }
919
920 func (c *connection) requestPendingMetadata() {
921         if c.t.haveInfo() {
922                 return
923         }
924         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
925                 // Peer doesn't support this.
926                 return
927         }
928         // Request metadata pieces that we don't have in a random order.
929         var pending []int
930         for index := 0; index < c.t.metadataPieceCount(); index++ {
931                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
932                         pending = append(pending, index)
933                 }
934         }
935         for _, i := range rand.Perm(len(pending)) {
936                 c.requestMetadataPiece(pending[i])
937         }
938 }
939
940 func (cn *connection) wroteMsg(msg *pp.Message) {
941         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
942         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
943 }
944
945 func (cn *connection) readMsg(msg *pp.Message) {
946         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
947 }
948
949 // After handshake, we know what Torrent and Client stats to include for a
950 // connection.
951 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
952         t := cn.t
953         f(&t.stats)
954         f(&t.cl.stats)
955 }
956
957 // All ConnStats that include this connection. Some objects are not known
958 // until the handshake is complete, after which it's expected to reconcile the
959 // differences.
960 func (cn *connection) allStats(f func(*ConnStats)) {
961         f(&cn.stats)
962         if cn.reconciledHandshakeStats {
963                 cn.postHandshakeStats(f)
964         }
965 }
966
967 func (cn *connection) wroteBytes(n int64) {
968         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
969 }
970
971 func (cn *connection) readBytes(n int64) {
972         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
973 }
974
975 // Returns whether the connection could be useful to us. We're seeding and
976 // they want data, we don't have metainfo and they can provide it, etc.
977 func (c *connection) useful() bool {
978         t := c.t
979         if c.closed.IsSet() {
980                 return false
981         }
982         if !t.haveInfo() {
983                 return c.supportsExtension("ut_metadata")
984         }
985         if t.seeding() && c.PeerInterested {
986                 return true
987         }
988         if c.peerHasWantedPieces() {
989                 return true
990         }
991         return false
992 }
993
994 func (c *connection) lastHelpful() (ret time.Time) {
995         ret = c.lastUsefulChunkReceived
996         if c.t.seeding() && c.lastChunkSent.After(ret) {
997                 ret = c.lastChunkSent
998         }
999         return
1000 }
1001
1002 func (c *connection) fastEnabled() bool {
1003         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1004 }
1005
1006 func (c *connection) reject(r request) {
1007         if !c.fastEnabled() {
1008                 panic("fast not enabled")
1009         }
1010         c.Post(r.ToMsg(pp.Reject))
1011         delete(c.PeerRequests, r)
1012 }
1013
1014 func (c *connection) onReadRequest(r request) error {
1015         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1016         if r.Begin+r.Length > c.t.pieceLength(r.Index) {
1017                 torrent.Add("bad requests received", 1)
1018                 return errors.New("bad request")
1019         }
1020         if _, ok := c.PeerRequests[r]; ok {
1021                 torrent.Add("duplicate requests received", 1)
1022                 return nil
1023         }
1024         if c.Choked {
1025                 torrent.Add("requests received while choking", 1)
1026                 if c.fastEnabled() {
1027                         torrent.Add("requests rejected while choking", 1)
1028                         c.reject(r)
1029                 }
1030                 return nil
1031         }
1032         if len(c.PeerRequests) >= maxRequests {
1033                 torrent.Add("requests received while queue full", 1)
1034                 if c.fastEnabled() {
1035                         c.reject(r)
1036                 }
1037                 // BEP 6 says we may close here if we choose.
1038                 return nil
1039         }
1040         if !c.t.havePiece(r.Index) {
1041                 // This isn't necessarily them screwing up. We can drop pieces
1042                 // from our storage, and can't communicate this to peers
1043                 // except by reconnecting.
1044                 requestsReceivedForMissingPieces.Add(1)
1045                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1046         }
1047         if c.PeerRequests == nil {
1048                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1049         }
1050         c.PeerRequests[r] = struct{}{}
1051         c.tickleWriter()
1052         return nil
1053 }
1054
1055 // Processes incoming bittorrent messages. The client lock is held upon entry
1056 // and exit. Returning will end the connection.
1057 func (c *connection) mainReadLoop() (err error) {
1058         defer func() {
1059                 if err != nil {
1060                         torrent.Add("connection.mainReadLoop returned with error", 1)
1061                 } else {
1062                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1063                 }
1064         }()
1065         t := c.t
1066         cl := t.cl
1067
1068         decoder := pp.Decoder{
1069                 R:         bufio.NewReaderSize(c.r, 1<<17),
1070                 MaxLength: 256 * 1024,
1071                 Pool:      t.chunkPool,
1072         }
1073         for {
1074                 var msg pp.Message
1075                 func() {
1076                         cl.mu.Unlock()
1077                         defer cl.mu.Lock()
1078                         err = decoder.Decode(&msg)
1079                 }()
1080                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1081                         return nil
1082                 }
1083                 if err != nil {
1084                         return err
1085                 }
1086                 c.readMsg(&msg)
1087                 c.lastMessageReceived = time.Now()
1088                 if msg.Keepalive {
1089                         receivedKeepalives.Add(1)
1090                         continue
1091                 }
1092                 messageTypesReceived.Add(msg.Type.String(), 1)
1093                 if msg.Type.FastExtension() && !c.fastEnabled() {
1094                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1095                 }
1096                 switch msg.Type {
1097                 case pp.Choke:
1098                         c.PeerChoked = true
1099                         c.deleteAllRequests()
1100                         // We can then reset our interest.
1101                         c.updateRequests()
1102                         c.updateExpectingChunks()
1103                 case pp.Reject:
1104                         c.deleteRequest(newRequestFromMessage(&msg))
1105                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1106                 case pp.Unchoke:
1107                         c.PeerChoked = false
1108                         c.tickleWriter()
1109                         c.updateExpectingChunks()
1110                 case pp.Interested:
1111                         c.PeerInterested = true
1112                         c.tickleWriter()
1113                 case pp.NotInterested:
1114                         c.PeerInterested = false
1115                         // We don't clear their requests since it isn't clear in the spec.
1116                         // We'll probably choke them for this, which will clear them if
1117                         // appropriate, and is clearly specified.
1118                 case pp.Have:
1119                         err = c.peerSentHave(msg.Index)
1120                 case pp.Request:
1121                         r := newRequestFromMessage(&msg)
1122                         err = c.onReadRequest(r)
1123                 case pp.Cancel:
1124                         req := newRequestFromMessage(&msg)
1125                         c.onPeerSentCancel(req)
1126                 case pp.Bitfield:
1127                         err = c.peerSentBitfield(msg.Bitfield)
1128                 case pp.HaveAll:
1129                         err = c.onPeerSentHaveAll()
1130                 case pp.HaveNone:
1131                         err = c.peerSentHaveNone()
1132                 case pp.Piece:
1133                         err = c.receiveChunk(&msg)
1134                         if len(msg.Piece) == int(t.chunkSize) {
1135                                 t.chunkPool.Put(&msg.Piece)
1136                         }
1137                         if err != nil {
1138                                 err = fmt.Errorf("receiving chunk: %s", err)
1139                         }
1140                 case pp.Extended:
1141                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1142                 case pp.Port:
1143                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1144                         if err != nil {
1145                                 panic(err)
1146                         }
1147                         if msg.Port != 0 {
1148                                 pingAddr.Port = int(msg.Port)
1149                         }
1150                         cl.eachDhtServer(func(s *dht.Server) {
1151                                 go s.Ping(pingAddr, nil)
1152                         })
1153                 case pp.AllowedFast:
1154                         torrent.Add("allowed fasts received", 1)
1155                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1156                         c.peerAllowedFast.Add(int(msg.Index))
1157                         c.updateRequests()
1158                 case pp.Suggest:
1159                         torrent.Add("suggests received", 1)
1160                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1161                         c.updateRequests()
1162                 default:
1163                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1164                 }
1165                 if err != nil {
1166                         return err
1167                 }
1168         }
1169 }
1170
1171 func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1172         defer func() {
1173                 // TODO: Should we still do this?
1174                 if err != nil {
1175                         // These clients use their own extension IDs for outgoing message
1176                         // types, which is incorrect.
1177                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1178                                 err = nil
1179                         }
1180                 }
1181         }()
1182         t := c.t
1183         cl := t.cl
1184         switch id {
1185         case pp.HandshakeExtendedID:
1186                 var d pp.ExtendedHandshakeMessage
1187                 if err := bencode.Unmarshal(payload, &d); err != nil {
1188                         log.Print(err)
1189                         return errors.Wrap(err, "unmarshalling extended handshake payload")
1190                 }
1191                 if d.Reqq != 0 {
1192                         c.PeerMaxRequests = d.Reqq
1193                 }
1194                 c.PeerClientName = d.V
1195                 if c.PeerExtensionIDs == nil {
1196                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1197                 }
1198                 for name, id := range d.M {
1199                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1200                                 torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
1201                         }
1202                         c.PeerExtensionIDs[name] = id
1203                 }
1204                 if d.MetadataSize != 0 {
1205                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1206                                 return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
1207                         }
1208                 }
1209                 if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok {
1210                         c.requestPendingMetadata()
1211                 }
1212                 return nil
1213         case metadataExtendedId:
1214                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1215                 if err != nil {
1216                         return fmt.Errorf("error handling metadata extension message: %s", err)
1217                 }
1218                 return nil
1219         case pexExtendedId:
1220                 if cl.config.DisablePEX {
1221                         // TODO: Maybe close the connection. Check that we're not
1222                         // advertising that we support PEX if it's disabled.
1223                         return nil
1224                 }
1225                 var pexMsg pp.PexMsg
1226                 err := bencode.Unmarshal(payload, &pexMsg)
1227                 if err != nil {
1228                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1229                 }
1230                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1231                 var peers Peers
1232                 peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
1233                 peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
1234                 t.addPeers(peers)
1235                 return nil
1236         default:
1237                 return fmt.Errorf("unexpected extended message ID: %v", id)
1238         }
1239 }
1240
1241 // Set both the Reader and Writer for the connection from a single ReadWriter.
1242 func (cn *connection) setRW(rw io.ReadWriter) {
1243         cn.r = rw
1244         cn.w = rw
1245 }
1246
1247 // Returns the Reader and Writer as a combined ReadWriter.
1248 func (cn *connection) rw() io.ReadWriter {
1249         return struct {
1250                 io.Reader
1251                 io.Writer
1252         }{cn.r, cn.w}
1253 }
1254
1255 // Handle a received chunk from a peer.
1256 func (c *connection) receiveChunk(msg *pp.Message) error {
1257         t := c.t
1258         cl := t.cl
1259         torrent.Add("chunks received", 1)
1260
1261         req := newRequestFromMessage(msg)
1262
1263         if c.PeerChoked {
1264                 torrent.Add("chunks received while choked", 1)
1265         }
1266
1267         if _, ok := c.validReceiveChunks[req]; !ok {
1268                 torrent.Add("chunks received unexpected", 1)
1269                 return errors.New("received unexpected chunk")
1270         }
1271         delete(c.validReceiveChunks, req)
1272
1273         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1274                 torrent.Add("chunks received due to allowed fast", 1)
1275         }
1276
1277         // Request has been satisfied.
1278         if c.deleteRequest(req) {
1279                 if c.expectingChunks() {
1280                         c.chunksReceivedWhileExpecting++
1281                 }
1282         } else {
1283                 torrent.Add("chunks received unwanted", 1)
1284         }
1285
1286         // Do we actually want this chunk?
1287         if t.haveChunk(req) {
1288                 torrent.Add("chunks received wasted", 1)
1289                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1290                 return nil
1291         }
1292
1293         piece := &t.pieces[req.Index]
1294
1295         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1296         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1297         c.lastUsefulChunkReceived = time.Now()
1298         // if t.fastestConn != c {
1299         // log.Printf("setting fastest connection %p", c)
1300         // }
1301         t.fastestConn = c
1302
1303         // Need to record that it hasn't been written yet, before we attempt to do
1304         // anything with it.
1305         piece.incrementPendingWrites()
1306         // Record that we have the chunk, so we aren't trying to download it while
1307         // waiting for it to be written to storage.
1308         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1309
1310         // Cancel pending requests for this chunk.
1311         for c := range t.conns {
1312                 c.postCancel(req)
1313         }
1314
1315         err := func() error {
1316                 cl.mu.Unlock()
1317                 defer cl.mu.Lock()
1318                 // Write the chunk out. Note that the upper bound on chunk writing
1319                 // concurrency will be the number of connections. We write inline with
1320                 // receiving the chunk (with this lock dance), because we want to
1321                 // handle errors synchronously and I haven't thought of a nice way to
1322                 // defer any concurrency to the storage and have that notify the
1323                 // client of errors. TODO: Do that instead.
1324                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1325         }()
1326
1327         piece.decrementPendingWrites()
1328
1329         if err != nil {
1330                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1331                 t.pendRequest(req)
1332                 t.updatePieceCompletion(msg.Index)
1333                 return nil
1334         }
1335
1336         // It's important that the piece is potentially queued before we check if
1337         // the piece is still wanted, because if it is queued, it won't be wanted.
1338         if t.pieceAllDirty(req.Index) {
1339                 t.queuePieceCheck(req.Index)
1340                 t.pendAllChunkSpecs(req.Index)
1341         }
1342
1343         c.onDirtiedPiece(req.Index)
1344
1345         cl.event.Broadcast()
1346         t.publishPieceChange(req.Index)
1347
1348         return nil
1349 }
1350
1351 func (c *connection) onDirtiedPiece(piece pieceIndex) {
1352         if c.peerTouchedPieces == nil {
1353                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1354         }
1355         c.peerTouchedPieces[piece] = struct{}{}
1356         ds := &c.t.pieces[piece].dirtiers
1357         if *ds == nil {
1358                 *ds = make(map[*connection]struct{})
1359         }
1360         (*ds)[c] = struct{}{}
1361 }
1362
1363 func (c *connection) uploadAllowed() bool {
1364         if c.t.cl.config.NoUpload {
1365                 return false
1366         }
1367         if c.t.seeding() {
1368                 return true
1369         }
1370         if !c.peerHasWantedPieces() {
1371                 return false
1372         }
1373         // Don't upload more than 100 KiB more than we download.
1374         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1375                 return false
1376         }
1377         return true
1378 }
1379
1380 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1381         if c.uploadTimer == nil {
1382                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1383         } else {
1384                 c.uploadTimer.Reset(delay)
1385         }
1386 }
1387
1388 // Also handles choking and unchoking of the remote peer.
1389 func (c *connection) upload(msg func(pp.Message) bool) bool {
1390         // Breaking or completing this loop means we don't want to upload to the
1391         // peer anymore, and we choke them.
1392 another:
1393         for c.uploadAllowed() {
1394                 // We want to upload to the peer.
1395                 if !c.Unchoke(msg) {
1396                         return false
1397                 }
1398                 for r := range c.PeerRequests {
1399                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1400                         if !res.OK() {
1401                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1402                         }
1403                         delay := res.Delay()
1404                         if delay > 0 {
1405                                 res.Cancel()
1406                                 c.setRetryUploadTimer(delay)
1407                                 // Hard to say what to return here.
1408                                 return true
1409                         }
1410                         more, err := c.sendChunk(r, msg)
1411                         if err != nil {
1412                                 i := r.Index
1413                                 if c.t.pieceComplete(i) {
1414                                         c.t.updatePieceCompletion(i)
1415                                         if !c.t.pieceComplete(i) {
1416                                                 // We had the piece, but not anymore.
1417                                                 break another
1418                                         }
1419                                 }
1420                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1421                                 // If we failed to send a chunk, choke the peer to ensure they
1422                                 // flush all their requests. We've probably dropped a piece,
1423                                 // but there's no way to communicate this to the peer. If they
1424                                 // ask for it again, we'll kick them to allow us to send them
1425                                 // an updated bitfield.
1426                                 break another
1427                         }
1428                         delete(c.PeerRequests, r)
1429                         if !more {
1430                                 return false
1431                         }
1432                         goto another
1433                 }
1434                 return true
1435         }
1436         return c.Choke(msg)
1437 }
1438
1439 func (cn *connection) Drop() {
1440         cn.t.dropConnection(cn)
1441 }
1442
1443 func (cn *connection) netGoodPiecesDirtied() int64 {
1444         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1445 }
1446
1447 func (c *connection) peerHasWantedPieces() bool {
1448         return !c.pieceRequestOrder.IsEmpty()
1449 }
1450
1451 func (c *connection) numLocalRequests() int {
1452         return len(c.requests)
1453 }
1454
1455 func (c *connection) deleteRequest(r request) bool {
1456         if _, ok := c.requests[r]; !ok {
1457                 return false
1458         }
1459         delete(c.requests, r)
1460         c.updateExpectingChunks()
1461         if t, ok := c.t.lastRequested[r]; ok {
1462                 t.Stop()
1463                 delete(c.t.lastRequested, r)
1464         }
1465         pr := c.t.pendingRequests
1466         pr[r]--
1467         n := pr[r]
1468         if n == 0 {
1469                 delete(pr, r)
1470         }
1471         if n < 0 {
1472                 panic(n)
1473         }
1474         c.updateRequests()
1475         // TODO: Other connections that aren't interested, and can provide this
1476         // chunk might be wakeable?
1477         return true
1478 }
1479
1480 func (c *connection) deleteAllRequests() {
1481         for r := range c.requests {
1482                 c.deleteRequest(r)
1483         }
1484         if len(c.requests) != 0 {
1485                 panic(len(c.requests))
1486         }
1487         // for c := range c.t.conns {
1488         //      c.tickleWriter()
1489         // }
1490 }
1491
1492 func (c *connection) tickleWriter() {
1493         c.writerCond.Broadcast()
1494 }
1495
1496 func (c *connection) postCancel(r request) bool {
1497         if !c.deleteRequest(r) {
1498                 return false
1499         }
1500         c.Post(makeCancelMessage(r))
1501         return true
1502 }
1503
1504 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1505         // Count the chunk being sent, even if it isn't.
1506         b := make([]byte, r.Length)
1507         p := c.t.info.Piece(int(r.Index))
1508         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1509         if n != len(b) {
1510                 if err == nil {
1511                         panic("expected error")
1512                 }
1513                 return
1514         } else if err == io.EOF {
1515                 err = nil
1516         }
1517         more = msg(pp.Message{
1518                 Type:  pp.Piece,
1519                 Index: r.Index,
1520                 Begin: r.Begin,
1521                 Piece: b,
1522         })
1523         c.lastChunkSent = time.Now()
1524         return
1525 }
1526
1527 func (c *connection) setTorrent(t *Torrent) {
1528         if c.t != nil {
1529                 panic("connection already associated with a torrent")
1530         }
1531         c.t = t
1532         t.reconcileHandshakeStats(c)
1533 }
1534
1535 func (c *connection) peerPriority() peerPriority {
1536         return bep40PriorityIgnoreError(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1537 }
1538
1539 func (c *connection) remoteIp() net.IP {
1540         return missinggo.AddrIP(c.remoteAddr())
1541 }
1542
1543 func (c *connection) remoteIpPort() ipPort {
1544         return ipPort{missinggo.AddrIP(c.remoteAddr()), uint16(missinggo.AddrPort(c.remoteAddr()))}
1545 }