]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
8e415a5039eff158a1168a93f484f16fd55cf985
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "math"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/dht/v2"
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22         "github.com/pkg/errors"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/mse"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27 )
28
29 type peerSource string
30
31 const (
32         peerSourceTracker         = "Tr"
33         peerSourceIncoming        = "I"
34         peerSourceDhtGetPeers     = "Hg" // Peers we found by searching a DHT.
35         peerSourceDhtAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
36         peerSourcePex             = "X"
37 )
38
39 // Maintains the state of a connection with a peer.
40 type connection struct {
41         // First to ensure 64-bit alignment for atomics. See #262.
42         stats ConnStats
43
44         t *Torrent
45         // The actual Conn, used for closing, and setting socket options.
46         conn       net.Conn
47         outgoing   bool
48         network    string
49         remoteAddr IpPort
50         // The Reader and Writer for this Conn, with hooks installed for stats,
51         // limiting, deadlines etc.
52         w io.Writer
53         r io.Reader
54         // True if the connection is operating over MSE obfuscation.
55         headerEncrypted bool
56         cryptoMethod    mse.CryptoMethod
57         Discovery       peerSource
58         closed          missinggo.Event
59         // Set true after we've added our ConnStats generated during handshake to
60         // other ConnStat instances as determined when the *Torrent became known.
61         reconciledHandshakeStats bool
62
63         lastMessageReceived     time.Time
64         completedHandshake      time.Time
65         lastUsefulChunkReceived time.Time
66         lastChunkSent           time.Time
67
68         // Stuff controlled by the local peer.
69         Interested           bool
70         lastBecameInterested time.Time
71         priorInterest        time.Duration
72
73         lastStartedExpectingToReceiveChunks time.Time
74         cumulativeExpectedToReceiveChunks   time.Duration
75         chunksReceivedWhileExpecting        int64
76
77         Choked           bool
78         requests         map[request]struct{}
79         requestsLowWater int
80         // Chunks that we might reasonably expect to receive from the peer. Due to
81         // latency, buffering, and implementation differences, we may receive
82         // chunks that are no longer in the set of requests actually want.
83         validReceiveChunks map[request]struct{}
84         // Indexed by metadata piece, set to true if posted and pending a
85         // response.
86         metadataRequests []bool
87         sentHaves        bitmap.Bitmap
88
89         // Stuff controlled by the remote peer.
90         PeerID             PeerID
91         PeerInterested     bool
92         PeerChoked         bool
93         PeerRequests       map[request]struct{}
94         PeerExtensionBytes pp.PeerExtensionBits
95         // The pieces the peer has claimed to have.
96         peerPieces bitmap.Bitmap
97         // The peer has everything. This can occur due to a special message, when
98         // we may not even know the number of pieces in the torrent yet.
99         peerSentHaveAll bool
100         // The highest possible number of pieces the torrent could have based on
101         // communication with the peer. Generally only useful until we have the
102         // torrent info.
103         peerMinPieces pieceIndex
104         // Pieces we've accepted chunks for from the peer.
105         peerTouchedPieces map[pieceIndex]struct{}
106         peerAllowedFast   bitmap.Bitmap
107
108         PeerMaxRequests  int // Maximum pending requests the peer allows.
109         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
110         PeerClientName   string
111
112         pieceInclination  []int
113         pieceRequestOrder prioritybitmap.PriorityBitmap
114
115         writeBuffer *bytes.Buffer
116         uploadTimer *time.Timer
117         writerCond  sync.Cond
118
119         logger log.Logger
120 }
121
122 func (cn *connection) updateExpectingChunks() {
123         if cn.expectingChunks() {
124                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
125                         cn.lastStartedExpectingToReceiveChunks = time.Now()
126                 }
127         } else {
128                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
129                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
130                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
131                 }
132         }
133 }
134
135 func (cn *connection) expectingChunks() bool {
136         return cn.Interested && !cn.PeerChoked
137 }
138
139 // Returns true if the connection is over IPv6.
140 func (cn *connection) ipv6() bool {
141         ip := cn.remoteAddr.IP
142         if ip.To4() != nil {
143                 return false
144         }
145         return len(ip) == net.IPv6len
146 }
147
148 // Returns true the dialer has the lower client peer ID. TODO: Find the
149 // specification for this.
150 func (cn *connection) isPreferredDirection() bool {
151         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
152 }
153
154 // Returns whether the left connection should be preferred over the right one,
155 // considering only their networking properties. If ok is false, we can't
156 // decide.
157 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
158         var ml multiLess
159         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
160         ml.NextBool(!l.utp(), !r.utp())
161         ml.NextBool(l.ipv6(), r.ipv6())
162         return ml.FinalOk()
163 }
164
165 func (cn *connection) cumInterest() time.Duration {
166         ret := cn.priorInterest
167         if cn.Interested {
168                 ret += time.Since(cn.lastBecameInterested)
169         }
170         return ret
171 }
172
173 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
174         if cn.peerSentHaveAll {
175                 return true, true
176         }
177         if !cn.t.haveInfo() {
178                 return false, false
179         }
180         return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
181 }
182
183 func (cn *connection) mu() sync.Locker {
184         return cn.t.cl.locker()
185 }
186
187 func (cn *connection) localAddr() net.Addr {
188         return cn.conn.LocalAddr()
189 }
190
191 func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
192         _, ok := cn.PeerExtensionIDs[ext]
193         return ok
194 }
195
196 // The best guess at number of pieces in the torrent for this peer.
197 func (cn *connection) bestPeerNumPieces() pieceIndex {
198         if cn.t.haveInfo() {
199                 return cn.t.numPieces()
200         }
201         return cn.peerMinPieces
202 }
203
204 func (cn *connection) completedString() string {
205         have := pieceIndex(cn.peerPieces.Len())
206         if cn.peerSentHaveAll {
207                 have = cn.bestPeerNumPieces()
208         }
209         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
210 }
211
212 // Correct the PeerPieces slice length. Return false if the existing slice is
213 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
214 // messages.
215 func (cn *connection) setNumPieces(num pieceIndex) error {
216         cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
217         cn.peerPiecesChanged()
218         return nil
219 }
220
221 func eventAgeString(t time.Time) string {
222         if t.IsZero() {
223                 return "never"
224         }
225         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
226 }
227
228 func (cn *connection) connectionFlags() (ret string) {
229         c := func(b byte) {
230                 ret += string([]byte{b})
231         }
232         if cn.cryptoMethod == mse.CryptoMethodRC4 {
233                 c('E')
234         } else if cn.headerEncrypted {
235                 c('e')
236         }
237         ret += string(cn.Discovery)
238         if cn.utp() {
239                 c('U')
240         }
241         return
242 }
243
244 func (cn *connection) utp() bool {
245         return parseNetworkString(cn.network).Udp
246 }
247
248 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
249 func (cn *connection) statusFlags() (ret string) {
250         c := func(b byte) {
251                 ret += string([]byte{b})
252         }
253         if cn.Interested {
254                 c('i')
255         }
256         if cn.Choked {
257                 c('c')
258         }
259         c('-')
260         ret += cn.connectionFlags()
261         c('-')
262         if cn.PeerInterested {
263                 c('i')
264         }
265         if cn.PeerChoked {
266                 c('c')
267         }
268         return
269 }
270
271 // func (cn *connection) String() string {
272 //      var buf bytes.Buffer
273 //      cn.WriteStatus(&buf, nil)
274 //      return buf.String()
275 // }
276
277 func (cn *connection) downloadRate() float64 {
278         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
279 }
280
281 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
282         // \t isn't preserved in <pre> blocks?
283         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
284         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
285                 eventAgeString(cn.lastMessageReceived),
286                 eventAgeString(cn.completedHandshake),
287                 eventAgeString(cn.lastHelpful()),
288                 cn.cumInterest(),
289                 cn.totalExpectingTime(),
290         )
291         fmt.Fprintf(w,
292                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
293                 cn.completedString(),
294                 len(cn.peerTouchedPieces),
295                 &cn.stats.ChunksReadUseful,
296                 &cn.stats.ChunksRead,
297                 &cn.stats.ChunksWritten,
298                 cn.requestsLowWater,
299                 cn.numLocalRequests(),
300                 cn.nominalMaxRequests(),
301                 len(cn.PeerRequests),
302                 cn.statusFlags(),
303                 cn.downloadRate()/(1<<10),
304         )
305         fmt.Fprintf(w, "    next pieces: %v%s\n",
306                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
307                 func() string {
308                         if cn.shouldRequestWithoutBias() {
309                                 return " (fastest)"
310                         } else {
311                                 return ""
312                         }
313                 }())
314 }
315
316 func (cn *connection) Close() {
317         if !cn.closed.Set() {
318                 return
319         }
320         cn.tickleWriter()
321         cn.discardPieceInclination()
322         cn.pieceRequestOrder.Clear()
323         if cn.conn != nil {
324                 go cn.conn.Close()
325         }
326 }
327
328 func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
329         return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
330 }
331
332 // Writes a message into the write buffer.
333 func (cn *connection) Post(msg pp.Message) {
334         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
335         // We don't need to track bytes here because a connection.w Writer wrapper
336         // takes care of that (although there's some delay between us recording
337         // the message, and the connection writer flushing it out.).
338         cn.writeBuffer.Write(msg.MustMarshalBinary())
339         // Last I checked only Piece messages affect stats, and we don't post
340         // those.
341         cn.wroteMsg(&msg)
342         cn.tickleWriter()
343 }
344
345 func (cn *connection) requestMetadataPiece(index int) {
346         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
347         if eID == 0 {
348                 return
349         }
350         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
351                 return
352         }
353         cn.logger.Printf("requesting metadata piece %d", index)
354         cn.Post(pp.Message{
355                 Type:       pp.Extended,
356                 ExtendedID: eID,
357                 ExtendedPayload: func() []byte {
358                         b, err := bencode.Marshal(map[string]int{
359                                 "msg_type": pp.RequestMetadataExtensionMsgType,
360                                 "piece":    index,
361                         })
362                         if err != nil {
363                                 panic(err)
364                         }
365                         return b
366                 }(),
367         })
368         for index >= len(cn.metadataRequests) {
369                 cn.metadataRequests = append(cn.metadataRequests, false)
370         }
371         cn.metadataRequests[index] = true
372 }
373
374 func (cn *connection) requestedMetadataPiece(index int) bool {
375         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
376 }
377
378 // The actual value to use as the maximum outbound requests.
379 func (cn *connection) nominalMaxRequests() (ret int) {
380         if cn.t.requestStrategy == 3 {
381                 expectingTime := int64(cn.totalExpectingTime())
382                 if expectingTime == 0 {
383                         expectingTime = math.MaxInt64
384                 } else {
385                         expectingTime *= 2
386                 }
387                 return int(clamp(
388                         1,
389                         int64(cn.PeerMaxRequests),
390                         max(
391                                 // It makes sense to always pipeline at least one connection,
392                                 // since latency must be non-zero.
393                                 2,
394                                 // Request only as many as we expect to receive in the
395                                 // dupliateRequestTimeout window. We are trying to avoid having to
396                                 // duplicate requests.
397                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
398                         ),
399                 ))
400         }
401         return int(clamp(
402                 1,
403                 int64(cn.PeerMaxRequests),
404                 max(64,
405                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
406 }
407
408 func (cn *connection) totalExpectingTime() (ret time.Duration) {
409         ret = cn.cumulativeExpectedToReceiveChunks
410         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
411                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
412         }
413         return
414
415 }
416
417 func (cn *connection) onPeerSentCancel(r request) {
418         if _, ok := cn.PeerRequests[r]; !ok {
419                 torrent.Add("unexpected cancels received", 1)
420                 return
421         }
422         if cn.fastEnabled() {
423                 cn.reject(r)
424         } else {
425                 delete(cn.PeerRequests, r)
426         }
427 }
428
429 func (cn *connection) Choke(msg messageWriter) (more bool) {
430         if cn.Choked {
431                 return true
432         }
433         cn.Choked = true
434         more = msg(pp.Message{
435                 Type: pp.Choke,
436         })
437         if cn.fastEnabled() {
438                 for r := range cn.PeerRequests {
439                         // TODO: Don't reject pieces in allowed fast set.
440                         cn.reject(r)
441                 }
442         } else {
443                 cn.PeerRequests = nil
444         }
445         return
446 }
447
448 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
449         if !cn.Choked {
450                 return true
451         }
452         cn.Choked = false
453         return msg(pp.Message{
454                 Type: pp.Unchoke,
455         })
456 }
457
458 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
459         if cn.Interested == interested {
460                 return true
461         }
462         cn.Interested = interested
463         if interested {
464                 cn.lastBecameInterested = time.Now()
465         } else if !cn.lastBecameInterested.IsZero() {
466                 cn.priorInterest += time.Since(cn.lastBecameInterested)
467         }
468         cn.updateExpectingChunks()
469         // log.Printf("%p: setting interest: %v", cn, interested)
470         return msg(pp.Message{
471                 Type: func() pp.MessageType {
472                         if interested {
473                                 return pp.Interested
474                         } else {
475                                 return pp.NotInterested
476                         }
477                 }(),
478         })
479 }
480
481 // The function takes a message to be sent, and returns true if more messages
482 // are okay.
483 type messageWriter func(pp.Message) bool
484
485 // Proxies the messageWriter's response.
486 func (cn *connection) request(r request, mw messageWriter) bool {
487         if _, ok := cn.requests[r]; ok {
488                 panic("chunk already requested")
489         }
490         if !cn.PeerHasPiece(pieceIndex(r.Index)) {
491                 panic("requesting piece peer doesn't have")
492         }
493         if _, ok := cn.t.conns[cn]; !ok {
494                 panic("requesting but not in active conns")
495         }
496         if cn.closed.IsSet() {
497                 panic("requesting when connection is closed")
498         }
499         if cn.PeerChoked {
500                 if cn.peerAllowedFast.Get(int(r.Index)) {
501                         torrent.Add("allowed fast requests sent", 1)
502                 } else {
503                         panic("requesting while choked and not allowed fast")
504                 }
505         }
506         if cn.t.hashingPiece(pieceIndex(r.Index)) {
507                 panic("piece is being hashed")
508         }
509         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
510                 panic("piece is queued for hash")
511         }
512         if cn.requests == nil {
513                 cn.requests = make(map[request]struct{})
514         }
515         cn.requests[r] = struct{}{}
516         if cn.validReceiveChunks == nil {
517                 cn.validReceiveChunks = make(map[request]struct{})
518         }
519         cn.validReceiveChunks[r] = struct{}{}
520         cn.t.pendingRequests[r]++
521         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
522                 torrent.Add("duplicate request timeouts", 1)
523                 cn.mu().Lock()
524                 defer cn.mu().Unlock()
525                 delete(cn.t.lastRequested, r)
526                 for cn := range cn.t.conns {
527                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
528                                 cn.updateRequests()
529                         }
530                 }
531         })
532         cn.updateExpectingChunks()
533         return mw(pp.Message{
534                 Type:   pp.Request,
535                 Index:  r.Index,
536                 Begin:  r.Begin,
537                 Length: r.Length,
538         })
539 }
540
541 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
542         if !cn.t.networkingEnabled {
543                 if !cn.SetInterested(false, msg) {
544                         return
545                 }
546                 if len(cn.requests) != 0 {
547                         for r := range cn.requests {
548                                 cn.deleteRequest(r)
549                                 // log.Printf("%p: cancelling request: %v", cn, r)
550                                 if !msg(makeCancelMessage(r)) {
551                                         return
552                                 }
553                         }
554                 }
555         }
556         if len(cn.requests) <= cn.requestsLowWater {
557                 filledBuffer := false
558                 cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
559                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
560                                 if !cn.SetInterested(true, msg) {
561                                         filledBuffer = true
562                                         return false
563                                 }
564                                 if len(cn.requests) >= cn.nominalMaxRequests() {
565                                         return false
566                                 }
567                                 // Choking is looked at here because our interest is dependent
568                                 // on whether we'd make requests in its absence.
569                                 if cn.PeerChoked {
570                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
571                                                 return false
572                                         }
573                                 }
574                                 if _, ok := cn.requests[r]; ok {
575                                         return true
576                                 }
577                                 filledBuffer = !cn.request(r, msg)
578                                 return !filledBuffer
579                         })
580                         return !filledBuffer
581                 })
582                 if filledBuffer {
583                         // If we didn't completely top up the requests, we shouldn't mark
584                         // the low water, since we'll want to top up the requests as soon
585                         // as we have more write buffer space.
586                         return
587                 }
588                 cn.requestsLowWater = len(cn.requests) / 2
589         }
590
591         cn.upload(msg)
592 }
593
594 // Routine that writes to the peer. Some of what to write is buffered by
595 // activity elsewhere in the Client, and some is determined locally when the
596 // connection is writable.
597 func (cn *connection) writer(keepAliveTimeout time.Duration) {
598         var (
599                 lastWrite      time.Time = time.Now()
600                 keepAliveTimer *time.Timer
601         )
602         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
603                 cn.mu().Lock()
604                 defer cn.mu().Unlock()
605                 if time.Since(lastWrite) >= keepAliveTimeout {
606                         cn.tickleWriter()
607                 }
608                 keepAliveTimer.Reset(keepAliveTimeout)
609         })
610         cn.mu().Lock()
611         defer cn.mu().Unlock()
612         defer cn.Close()
613         defer keepAliveTimer.Stop()
614         frontBuf := new(bytes.Buffer)
615         for {
616                 if cn.closed.IsSet() {
617                         return
618                 }
619                 if cn.writeBuffer.Len() == 0 {
620                         cn.fillWriteBuffer(func(msg pp.Message) bool {
621                                 cn.wroteMsg(&msg)
622                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
623                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
624                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
625                         })
626                 }
627                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
628                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
629                         postedKeepalives.Add(1)
630                 }
631                 if cn.writeBuffer.Len() == 0 {
632                         // TODO: Minimize wakeups....
633                         cn.writerCond.Wait()
634                         continue
635                 }
636                 // Flip the buffers.
637                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
638                 cn.mu().Unlock()
639                 n, err := cn.w.Write(frontBuf.Bytes())
640                 cn.mu().Lock()
641                 if n != 0 {
642                         lastWrite = time.Now()
643                         keepAliveTimer.Reset(keepAliveTimeout)
644                 }
645                 if err != nil {
646                         return
647                 }
648                 if n != frontBuf.Len() {
649                         panic("short write")
650                 }
651                 frontBuf.Reset()
652         }
653 }
654
655 func (cn *connection) Have(piece pieceIndex) {
656         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
657                 return
658         }
659         cn.Post(pp.Message{
660                 Type:  pp.Have,
661                 Index: pp.Integer(piece),
662         })
663         cn.sentHaves.Add(bitmap.BitIndex(piece))
664 }
665
666 func (cn *connection) PostBitfield() {
667         if cn.sentHaves.Len() != 0 {
668                 panic("bitfield must be first have-related message sent")
669         }
670         if !cn.t.haveAnyPieces() {
671                 return
672         }
673         cn.Post(pp.Message{
674                 Type:     pp.Bitfield,
675                 Bitfield: cn.t.bitfield(),
676         })
677         cn.sentHaves = cn.t.completedPieces.Copy()
678 }
679
680 func (cn *connection) updateRequests() {
681         // log.Print("update requests")
682         cn.tickleWriter()
683 }
684
685 // Emits the indices in the Bitmaps bms in order, never repeating any index.
686 // skip is mutated during execution, and its initial values will never be
687 // emitted.
688 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
689         return func(cb iter.Callback) {
690                 for _, bm := range bms {
691                         if !iter.All(func(i interface{}) bool {
692                                 skip.Add(i.(int))
693                                 return cb(i)
694                         }, bitmap.Sub(bm, *skip).Iter) {
695                                 return
696                         }
697                 }
698         }
699 }
700
701 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
702         now, readahead := cn.t.readerPiecePriorities()
703         var skip bitmap.Bitmap
704         if !cn.peerSentHaveAll {
705                 // Pieces to skip include pieces the peer doesn't have.
706                 skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
707         }
708         // And pieces that we already have.
709         skip.Union(cn.t.completedPieces)
710         skip.Union(cn.t.piecesQueuedForHash)
711         // Return an iterator over the different priority classes, minus the skip
712         // pieces.
713         return iter.All(
714                 func(_piece interface{}) bool {
715                         i := _piece.(bitmap.BitIndex)
716                         if cn.t.hashingPiece(pieceIndex(i)) {
717                                 return true
718                         }
719                         return f(pieceIndex(i))
720                 },
721                 iterBitmapsDistinct(&skip, now, readahead),
722                 func(cb iter.Callback) {
723                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
724                                 if skip.Contains(piece) {
725                                         return true
726                                 }
727                                 more := cb(piece)
728                                 skip.Add(piece)
729                                 return more
730                         })
731                 },
732         )
733 }
734
735 // The connection should download highest priority pieces first, without any
736 // inclination toward avoiding wastage. Generally we might do this if there's
737 // a single connection, or this is the fastest connection, and we have active
738 // readers that signal an ordering preference. It's conceivable that the best
739 // connection should do this, since it's least likely to waste our time if
740 // assigned to the highest priority pieces, and assigning more than one this
741 // role would cause significant wasted bandwidth.
742 func (cn *connection) shouldRequestWithoutBias() bool {
743         if cn.t.requestStrategy != 2 {
744                 return false
745         }
746         if len(cn.t.readers) == 0 {
747                 return false
748         }
749         if len(cn.t.conns) == 1 {
750                 return true
751         }
752         if cn == cn.t.fastestConn {
753                 return true
754         }
755         return false
756 }
757
758 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
759         if !cn.t.haveInfo() {
760                 return false
761         }
762         if cn.t.requestStrategy == 3 {
763                 return cn.iterUnbiasedPieceRequestOrder(f)
764         }
765         if cn.shouldRequestWithoutBias() {
766                 return cn.iterUnbiasedPieceRequestOrder(f)
767         } else {
768                 return cn.pieceRequestOrder.IterTyped(func(i int) bool {
769                         return f(pieceIndex(i))
770                 })
771         }
772 }
773
774 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
775         cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
776 }
777
778 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
779         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
780                 r := request{pp.Integer(piece), cs}
781                 if cn.t.requestStrategy == 3 {
782                         if _, ok := cn.t.lastRequested[r]; ok {
783                                 // This piece has been requested on another connection, and
784                                 // the duplicate request timer is still running.
785                                 return true
786                         }
787                 }
788                 return f(r)
789         })
790 }
791
792 func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
793         p := &t.pieces[piece]
794         if t.requestStrategy == 3 {
795                 for i := pp.Integer(0); i < p.numChunks(); i++ {
796                         if !p.dirtyChunks.Get(bitmap.BitIndex(i)) {
797                                 if !f(t.chunkIndexSpec(i, piece)) {
798                                         return false
799                                 }
800                         }
801                 }
802                 return true
803         }
804         chunkIndices := t.pieces[piece].undirtiedChunkIndices()
805         return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
806                 ci, err := chunkIndices.RB.Select(uint32(i))
807                 if err != nil {
808                         panic(err)
809                 }
810                 return f(t.chunkIndexSpec(pp.Integer(ci), piece))
811         })
812 }
813
814 // check callers updaterequests
815 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
816         return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
817 }
818
819 // This is distinct from Torrent piece priority, which is the user's
820 // preference. Connection piece priority is specific to a connection and is
821 // used to pseudorandomly avoid connections always requesting the same pieces
822 // and thus wasting effort.
823 func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
824         tpp := cn.t.piecePriority(piece)
825         if !cn.PeerHasPiece(piece) {
826                 tpp = PiecePriorityNone
827         }
828         if tpp == PiecePriorityNone {
829                 return cn.stopRequestingPiece(piece)
830         }
831         prio := cn.getPieceInclination()[piece]
832         switch cn.t.requestStrategy {
833         case 1:
834                 switch tpp {
835                 case PiecePriorityNormal:
836                 case PiecePriorityReadahead:
837                         prio -= int(cn.t.numPieces())
838                 case PiecePriorityNext, PiecePriorityNow:
839                         prio -= 2 * int(cn.t.numPieces())
840                 default:
841                         panic(tpp)
842                 }
843                 prio += int(piece / 3)
844         default:
845         }
846         return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
847 }
848
849 func (cn *connection) getPieceInclination() []int {
850         if cn.pieceInclination == nil {
851                 cn.pieceInclination = cn.t.getConnPieceInclination()
852         }
853         return cn.pieceInclination
854 }
855
856 func (cn *connection) discardPieceInclination() {
857         if cn.pieceInclination == nil {
858                 return
859         }
860         cn.t.putPieceInclination(cn.pieceInclination)
861         cn.pieceInclination = nil
862 }
863
864 func (cn *connection) peerPiecesChanged() {
865         if cn.t.haveInfo() {
866                 prioritiesChanged := false
867                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
868                         if cn.updatePiecePriority(i) {
869                                 prioritiesChanged = true
870                         }
871                 }
872                 if prioritiesChanged {
873                         cn.updateRequests()
874                 }
875         }
876 }
877
878 func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
879         if newMin > cn.peerMinPieces {
880                 cn.peerMinPieces = newMin
881         }
882 }
883
884 func (cn *connection) peerSentHave(piece pieceIndex) error {
885         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
886                 return errors.New("invalid piece")
887         }
888         if cn.PeerHasPiece(piece) {
889                 return nil
890         }
891         cn.raisePeerMinPieces(piece + 1)
892         cn.peerPieces.Set(bitmap.BitIndex(piece), true)
893         if cn.updatePiecePriority(piece) {
894                 cn.updateRequests()
895         }
896         return nil
897 }
898
899 func (cn *connection) peerSentBitfield(bf []bool) error {
900         cn.peerSentHaveAll = false
901         if len(bf)%8 != 0 {
902                 panic("expected bitfield length divisible by 8")
903         }
904         // We know that the last byte means that at most the last 7 bits are
905         // wasted.
906         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
907         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
908                 // Ignore known excess pieces.
909                 bf = bf[:cn.t.numPieces()]
910         }
911         for i, have := range bf {
912                 if have {
913                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
914                 }
915                 cn.peerPieces.Set(i, have)
916         }
917         cn.peerPiecesChanged()
918         return nil
919 }
920
921 func (cn *connection) onPeerSentHaveAll() error {
922         cn.peerSentHaveAll = true
923         cn.peerPieces.Clear()
924         cn.peerPiecesChanged()
925         return nil
926 }
927
928 func (cn *connection) peerSentHaveNone() error {
929         cn.peerPieces.Clear()
930         cn.peerSentHaveAll = false
931         cn.peerPiecesChanged()
932         return nil
933 }
934
935 func (c *connection) requestPendingMetadata() {
936         if c.t.haveInfo() {
937                 return
938         }
939         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
940                 // Peer doesn't support this.
941                 return
942         }
943         // Request metadata pieces that we don't have in a random order.
944         var pending []int
945         for index := 0; index < c.t.metadataPieceCount(); index++ {
946                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
947                         pending = append(pending, index)
948                 }
949         }
950         rand.Shuffle(len(pending), func(i, j int) { pending[i], pending[j] = pending[j], pending[i] })
951         for _, i := range pending {
952                 c.requestMetadataPiece(i)
953         }
954 }
955
956 func (cn *connection) wroteMsg(msg *pp.Message) {
957         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
958         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
959 }
960
961 func (cn *connection) readMsg(msg *pp.Message) {
962         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
963 }
964
965 // After handshake, we know what Torrent and Client stats to include for a
966 // connection.
967 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
968         t := cn.t
969         f(&t.stats)
970         f(&t.cl.stats)
971 }
972
973 // All ConnStats that include this connection. Some objects are not known
974 // until the handshake is complete, after which it's expected to reconcile the
975 // differences.
976 func (cn *connection) allStats(f func(*ConnStats)) {
977         f(&cn.stats)
978         if cn.reconciledHandshakeStats {
979                 cn.postHandshakeStats(f)
980         }
981 }
982
983 func (cn *connection) wroteBytes(n int64) {
984         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
985 }
986
987 func (cn *connection) readBytes(n int64) {
988         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
989 }
990
991 // Returns whether the connection could be useful to us. We're seeding and
992 // they want data, we don't have metainfo and they can provide it, etc.
993 func (c *connection) useful() bool {
994         t := c.t
995         if c.closed.IsSet() {
996                 return false
997         }
998         if !t.haveInfo() {
999                 return c.supportsExtension("ut_metadata")
1000         }
1001         if t.seeding() && c.PeerInterested {
1002                 return true
1003         }
1004         if c.peerHasWantedPieces() {
1005                 return true
1006         }
1007         return false
1008 }
1009
1010 func (c *connection) lastHelpful() (ret time.Time) {
1011         ret = c.lastUsefulChunkReceived
1012         if c.t.seeding() && c.lastChunkSent.After(ret) {
1013                 ret = c.lastChunkSent
1014         }
1015         return
1016 }
1017
1018 func (c *connection) fastEnabled() bool {
1019         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1020 }
1021
1022 func (c *connection) reject(r request) {
1023         if !c.fastEnabled() {
1024                 panic("fast not enabled")
1025         }
1026         c.Post(r.ToMsg(pp.Reject))
1027         delete(c.PeerRequests, r)
1028 }
1029
1030 func (c *connection) onReadRequest(r request) error {
1031         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1032         if _, ok := c.PeerRequests[r]; ok {
1033                 torrent.Add("duplicate requests received", 1)
1034                 return nil
1035         }
1036         if c.Choked {
1037                 torrent.Add("requests received while choking", 1)
1038                 if c.fastEnabled() {
1039                         torrent.Add("requests rejected while choking", 1)
1040                         c.reject(r)
1041                 }
1042                 return nil
1043         }
1044         if len(c.PeerRequests) >= maxRequests {
1045                 torrent.Add("requests received while queue full", 1)
1046                 if c.fastEnabled() {
1047                         c.reject(r)
1048                 }
1049                 // BEP 6 says we may close here if we choose.
1050                 return nil
1051         }
1052         if !c.t.havePiece(pieceIndex(r.Index)) {
1053                 // This isn't necessarily them screwing up. We can drop pieces
1054                 // from our storage, and can't communicate this to peers
1055                 // except by reconnecting.
1056                 requestsReceivedForMissingPieces.Add(1)
1057                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1058         }
1059         // Check this after we know we have the piece, so that the piece length will be known.
1060         if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
1061                 torrent.Add("bad requests received", 1)
1062                 return errors.New("bad request")
1063         }
1064         if c.PeerRequests == nil {
1065                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1066         }
1067         c.PeerRequests[r] = struct{}{}
1068         c.tickleWriter()
1069         return nil
1070 }
1071
1072 // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
1073 // exit. Returning will end the connection.
1074 func (c *connection) mainReadLoop() (err error) {
1075         defer func() {
1076                 if err != nil {
1077                         torrent.Add("connection.mainReadLoop returned with error", 1)
1078                 } else {
1079                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1080                 }
1081         }()
1082         t := c.t
1083         cl := t.cl
1084
1085         decoder := pp.Decoder{
1086                 R:         bufio.NewReaderSize(c.r, 1<<17),
1087                 MaxLength: 256 * 1024,
1088                 Pool:      t.chunkPool,
1089         }
1090         for {
1091                 var msg pp.Message
1092                 func() {
1093                         cl.unlock()
1094                         defer cl.lock()
1095                         err = decoder.Decode(&msg)
1096                 }()
1097                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1098                         return nil
1099                 }
1100                 if err != nil {
1101                         return err
1102                 }
1103                 c.readMsg(&msg)
1104                 c.lastMessageReceived = time.Now()
1105                 if msg.Keepalive {
1106                         receivedKeepalives.Add(1)
1107                         continue
1108                 }
1109                 messageTypesReceived.Add(msg.Type.String(), 1)
1110                 if msg.Type.FastExtension() && !c.fastEnabled() {
1111                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1112                 }
1113                 switch msg.Type {
1114                 case pp.Choke:
1115                         c.PeerChoked = true
1116                         c.deleteAllRequests()
1117                         // We can then reset our interest.
1118                         c.updateRequests()
1119                         c.updateExpectingChunks()
1120                 case pp.Unchoke:
1121                         c.PeerChoked = false
1122                         c.tickleWriter()
1123                         c.updateExpectingChunks()
1124                 case pp.Interested:
1125                         c.PeerInterested = true
1126                         c.tickleWriter()
1127                 case pp.NotInterested:
1128                         c.PeerInterested = false
1129                         // We don't clear their requests since it isn't clear in the spec.
1130                         // We'll probably choke them for this, which will clear them if
1131                         // appropriate, and is clearly specified.
1132                 case pp.Have:
1133                         err = c.peerSentHave(pieceIndex(msg.Index))
1134                 case pp.Bitfield:
1135                         err = c.peerSentBitfield(msg.Bitfield)
1136                 case pp.Request:
1137                         r := newRequestFromMessage(&msg)
1138                         err = c.onReadRequest(r)
1139                 case pp.Piece:
1140                         err = c.receiveChunk(&msg)
1141                         if len(msg.Piece) == int(t.chunkSize) {
1142                                 t.chunkPool.Put(&msg.Piece)
1143                         }
1144                         if err != nil {
1145                                 err = fmt.Errorf("receiving chunk: %s", err)
1146                         }
1147                 case pp.Cancel:
1148                         req := newRequestFromMessage(&msg)
1149                         c.onPeerSentCancel(req)
1150                 case pp.Port:
1151                         pingAddr := net.UDPAddr{
1152                                 IP:   c.remoteAddr.IP,
1153                                 Port: int(c.remoteAddr.Port),
1154                         }
1155                         if msg.Port != 0 {
1156                                 pingAddr.Port = int(msg.Port)
1157                         }
1158                         cl.eachDhtServer(func(s *dht.Server) {
1159                                 go s.Ping(&pingAddr, nil)
1160                         })
1161                 case pp.Suggest:
1162                         torrent.Add("suggests received", 1)
1163                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1164                         c.updateRequests()
1165                 case pp.HaveAll:
1166                         err = c.onPeerSentHaveAll()
1167                 case pp.HaveNone:
1168                         err = c.peerSentHaveNone()
1169                 case pp.Reject:
1170                         c.deleteRequest(newRequestFromMessage(&msg))
1171                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1172                 case pp.AllowedFast:
1173                         torrent.Add("allowed fasts received", 1)
1174                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1175                         c.peerAllowedFast.Add(int(msg.Index))
1176                         c.updateRequests()
1177                 case pp.Extended:
1178                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1179                 default:
1180                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1181                 }
1182                 if err != nil {
1183                         return err
1184                 }
1185         }
1186 }
1187
1188 func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1189         defer func() {
1190                 // TODO: Should we still do this?
1191                 if err != nil {
1192                         // These clients use their own extension IDs for outgoing message
1193                         // types, which is incorrect.
1194                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1195                                 err = nil
1196                         }
1197                 }
1198         }()
1199         t := c.t
1200         cl := t.cl
1201         switch id {
1202         case pp.HandshakeExtendedID:
1203                 var d pp.ExtendedHandshakeMessage
1204                 if err := bencode.Unmarshal(payload, &d); err != nil {
1205                         c.t.logger.Printf("error parsing extended handshake message %q: %s", payload, err)
1206                         return errors.Wrap(err, "unmarshalling extended handshake payload")
1207                 }
1208                 if d.Reqq != 0 {
1209                         c.PeerMaxRequests = d.Reqq
1210                 }
1211                 c.PeerClientName = d.V
1212                 if c.PeerExtensionIDs == nil {
1213                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1214                 }
1215                 for name, id := range d.M {
1216                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1217                                 torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
1218                         }
1219                         c.PeerExtensionIDs[name] = id
1220                 }
1221                 if d.MetadataSize != 0 {
1222                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1223                                 return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
1224                         }
1225                 }
1226                 c.requestPendingMetadata()
1227                 return nil
1228         case metadataExtendedId:
1229                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1230                 if err != nil {
1231                         return fmt.Errorf("handling metadata extension message: %w", err)
1232                 }
1233                 return nil
1234         case pexExtendedId:
1235                 if cl.config.DisablePEX {
1236                         // TODO: Maybe close the connection. Check that we're not
1237                         // advertising that we support PEX if it's disabled.
1238                         return nil
1239                 }
1240                 var pexMsg pp.PexMsg
1241                 err := bencode.Unmarshal(payload, &pexMsg)
1242                 if err != nil {
1243                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1244                 }
1245                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1246                 var peers Peers
1247                 peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
1248                 peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
1249                 t.addPeers(peers)
1250                 return nil
1251         default:
1252                 return fmt.Errorf("unexpected extended message ID: %v", id)
1253         }
1254 }
1255
1256 // Set both the Reader and Writer for the connection from a single ReadWriter.
1257 func (cn *connection) setRW(rw io.ReadWriter) {
1258         cn.r = rw
1259         cn.w = rw
1260 }
1261
1262 // Returns the Reader and Writer as a combined ReadWriter.
1263 func (cn *connection) rw() io.ReadWriter {
1264         return struct {
1265                 io.Reader
1266                 io.Writer
1267         }{cn.r, cn.w}
1268 }
1269
1270 // Handle a received chunk from a peer.
1271 func (c *connection) receiveChunk(msg *pp.Message) error {
1272         t := c.t
1273         cl := t.cl
1274         torrent.Add("chunks received", 1)
1275
1276         req := newRequestFromMessage(msg)
1277
1278         if c.PeerChoked {
1279                 torrent.Add("chunks received while choked", 1)
1280         }
1281
1282         if _, ok := c.validReceiveChunks[req]; !ok {
1283                 torrent.Add("chunks received unexpected", 1)
1284                 return errors.New("received unexpected chunk")
1285         }
1286         delete(c.validReceiveChunks, req)
1287
1288         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1289                 torrent.Add("chunks received due to allowed fast", 1)
1290         }
1291
1292         // Request has been satisfied.
1293         if c.deleteRequest(req) {
1294                 if c.expectingChunks() {
1295                         c.chunksReceivedWhileExpecting++
1296                 }
1297         } else {
1298                 torrent.Add("chunks received unwanted", 1)
1299         }
1300
1301         // Do we actually want this chunk?
1302         if t.haveChunk(req) {
1303                 torrent.Add("chunks received wasted", 1)
1304                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1305                 return nil
1306         }
1307
1308         piece := &t.pieces[req.Index]
1309
1310         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1311         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1312         c.lastUsefulChunkReceived = time.Now()
1313         // if t.fastestConn != c {
1314         // log.Printf("setting fastest connection %p", c)
1315         // }
1316         t.fastestConn = c
1317
1318         // Need to record that it hasn't been written yet, before we attempt to do
1319         // anything with it.
1320         piece.incrementPendingWrites()
1321         // Record that we have the chunk, so we aren't trying to download it while
1322         // waiting for it to be written to storage.
1323         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1324
1325         // Cancel pending requests for this chunk.
1326         for c := range t.conns {
1327                 c.postCancel(req)
1328         }
1329
1330         err := func() error {
1331                 cl.unlock()
1332                 defer cl.lock()
1333                 concurrentChunkWrites.Add(1)
1334                 defer concurrentChunkWrites.Add(-1)
1335                 // Write the chunk out. Note that the upper bound on chunk writing
1336                 // concurrency will be the number of connections. We write inline with
1337                 // receiving the chunk (with this lock dance), because we want to
1338                 // handle errors synchronously and I haven't thought of a nice way to
1339                 // defer any concurrency to the storage and have that notify the
1340                 // client of errors. TODO: Do that instead.
1341                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1342         }()
1343
1344         piece.decrementPendingWrites()
1345
1346         if err != nil {
1347                 panic(fmt.Sprintf("error writing chunk: %v", err))
1348                 t.pendRequest(req)
1349                 t.updatePieceCompletion(pieceIndex(msg.Index))
1350                 return nil
1351         }
1352
1353         // It's important that the piece is potentially queued before we check if
1354         // the piece is still wanted, because if it is queued, it won't be wanted.
1355         if t.pieceAllDirty(pieceIndex(req.Index)) {
1356                 t.queuePieceCheck(pieceIndex(req.Index))
1357                 t.pendAllChunkSpecs(pieceIndex(req.Index))
1358         }
1359
1360         c.onDirtiedPiece(pieceIndex(req.Index))
1361
1362         cl.event.Broadcast()
1363         t.publishPieceChange(pieceIndex(req.Index))
1364
1365         return nil
1366 }
1367
1368 func (c *connection) onDirtiedPiece(piece pieceIndex) {
1369         if c.peerTouchedPieces == nil {
1370                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1371         }
1372         c.peerTouchedPieces[piece] = struct{}{}
1373         ds := &c.t.pieces[piece].dirtiers
1374         if *ds == nil {
1375                 *ds = make(map[*connection]struct{})
1376         }
1377         (*ds)[c] = struct{}{}
1378 }
1379
1380 func (c *connection) uploadAllowed() bool {
1381         if c.t.cl.config.NoUpload {
1382                 return false
1383         }
1384         if c.t.seeding() {
1385                 return true
1386         }
1387         if !c.peerHasWantedPieces() {
1388                 return false
1389         }
1390         // Don't upload more than 100 KiB more than we download.
1391         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1392                 return false
1393         }
1394         return true
1395 }
1396
1397 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1398         if c.uploadTimer == nil {
1399                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1400         } else {
1401                 c.uploadTimer.Reset(delay)
1402         }
1403 }
1404
1405 // Also handles choking and unchoking of the remote peer.
1406 func (c *connection) upload(msg func(pp.Message) bool) bool {
1407         // Breaking or completing this loop means we don't want to upload to the
1408         // peer anymore, and we choke them.
1409 another:
1410         for c.uploadAllowed() {
1411                 // We want to upload to the peer.
1412                 if !c.Unchoke(msg) {
1413                         return false
1414                 }
1415                 for r := range c.PeerRequests {
1416                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1417                         if !res.OK() {
1418                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1419                         }
1420                         delay := res.Delay()
1421                         if delay > 0 {
1422                                 res.Cancel()
1423                                 c.setRetryUploadTimer(delay)
1424                                 // Hard to say what to return here.
1425                                 return true
1426                         }
1427                         more, err := c.sendChunk(r, msg)
1428                         if err != nil {
1429                                 i := pieceIndex(r.Index)
1430                                 if c.t.pieceComplete(i) {
1431                                         c.t.updatePieceCompletion(i)
1432                                         if !c.t.pieceComplete(i) {
1433                                                 // We had the piece, but not anymore.
1434                                                 break another
1435                                         }
1436                                 }
1437                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1438                                 // If we failed to send a chunk, choke the peer to ensure they
1439                                 // flush all their requests. We've probably dropped a piece,
1440                                 // but there's no way to communicate this to the peer. If they
1441                                 // ask for it again, we'll kick them to allow us to send them
1442                                 // an updated bitfield.
1443                                 break another
1444                         }
1445                         delete(c.PeerRequests, r)
1446                         if !more {
1447                                 return false
1448                         }
1449                         goto another
1450                 }
1451                 return true
1452         }
1453         return c.Choke(msg)
1454 }
1455
1456 func (cn *connection) Drop() {
1457         cn.t.dropConnection(cn)
1458 }
1459
1460 func (cn *connection) netGoodPiecesDirtied() int64 {
1461         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1462 }
1463
1464 func (c *connection) peerHasWantedPieces() bool {
1465         return !c.pieceRequestOrder.IsEmpty()
1466 }
1467
1468 func (c *connection) numLocalRequests() int {
1469         return len(c.requests)
1470 }
1471
1472 func (c *connection) deleteRequest(r request) bool {
1473         if _, ok := c.requests[r]; !ok {
1474                 return false
1475         }
1476         delete(c.requests, r)
1477         c.updateExpectingChunks()
1478         if t, ok := c.t.lastRequested[r]; ok {
1479                 t.Stop()
1480                 delete(c.t.lastRequested, r)
1481         }
1482         pr := c.t.pendingRequests
1483         pr[r]--
1484         n := pr[r]
1485         if n == 0 {
1486                 delete(pr, r)
1487         }
1488         if n < 0 {
1489                 panic(n)
1490         }
1491         c.updateRequests()
1492         for _c := range c.t.conns {
1493                 if !_c.Interested && _c != c && c.PeerHasPiece(pieceIndex(r.Index)) {
1494                         _c.updateRequests()
1495                 }
1496         }
1497         return true
1498 }
1499
1500 func (c *connection) deleteAllRequests() {
1501         for r := range c.requests {
1502                 c.deleteRequest(r)
1503         }
1504         if len(c.requests) != 0 {
1505                 panic(len(c.requests))
1506         }
1507         // for c := range c.t.conns {
1508         //      c.tickleWriter()
1509         // }
1510 }
1511
1512 func (c *connection) tickleWriter() {
1513         c.writerCond.Broadcast()
1514 }
1515
1516 func (c *connection) postCancel(r request) bool {
1517         if !c.deleteRequest(r) {
1518                 return false
1519         }
1520         c.Post(makeCancelMessage(r))
1521         return true
1522 }
1523
1524 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1525         // Count the chunk being sent, even if it isn't.
1526         b := make([]byte, r.Length)
1527         p := c.t.info.Piece(int(r.Index))
1528         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1529         if n != len(b) {
1530                 if err == nil {
1531                         panic("expected error")
1532                 }
1533                 return
1534         } else if err == io.EOF {
1535                 err = nil
1536         }
1537         more = msg(pp.Message{
1538                 Type:  pp.Piece,
1539                 Index: r.Index,
1540                 Begin: r.Begin,
1541                 Piece: b,
1542         })
1543         c.lastChunkSent = time.Now()
1544         return
1545 }
1546
1547 func (c *connection) setTorrent(t *Torrent) {
1548         if c.t != nil {
1549                 panic("connection already associated with a torrent")
1550         }
1551         c.t = t
1552         c.logger.Printf("torrent=%v", t)
1553         t.reconcileHandshakeStats(c)
1554 }
1555
1556 func (c *connection) peerPriority() peerPriority {
1557         return bep40PriorityIgnoreError(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1558 }
1559
1560 func (c *connection) remoteIp() net.IP {
1561         return c.remoteAddr.IP
1562 }
1563
1564 func (c *connection) remoteIpPort() IpPort {
1565         return c.remoteAddr
1566 }
1567
1568 func (c *connection) String() string {
1569         return fmt.Sprintf("connection %p", c)
1570 }