]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
7bad4f27624a814727be41beff6818cb22840b60
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/dht"
18         "github.com/anacrolix/log"
19
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/iter"
23         "github.com/anacrolix/missinggo/prioritybitmap"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/mse"
27         pp "github.com/anacrolix/torrent/peer_protocol"
28 )
29
30 type peerSource string
31
32 const (
33         peerSourceTracker         = "Tr"
34         peerSourceIncoming        = "I"
35         peerSourceDHTGetPeers     = "Hg" // Peers we found by searching a DHT.
36         peerSourceDHTAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
37         peerSourcePEX             = "X"
38 )
39
40 // Maintains the state of a connection with a peer.
41 type connection struct {
42         // First to ensure 64-bit alignment for atomics. See #262.
43         stats ConnStats
44
45         t *Torrent
46         // The actual Conn, used for closing, and setting socket options.
47         conn     net.Conn
48         outgoing bool
49         // The Reader and Writer for this Conn, with hooks installed for stats,
50         // limiting, deadlines etc.
51         w io.Writer
52         r io.Reader
53         // True if the connection is operating over MSE obfuscation.
54         headerEncrypted bool
55         cryptoMethod    mse.CryptoMethod
56         Discovery       peerSource
57         closed          missinggo.Event
58         // Set true after we've added our ConnStats generated during handshake to
59         // other ConnStat instances as determined when the *Torrent became known.
60         reconciledHandshakeStats bool
61
62         lastMessageReceived     time.Time
63         completedHandshake      time.Time
64         lastUsefulChunkReceived time.Time
65         lastChunkSent           time.Time
66
67         // Stuff controlled by the local peer.
68         Interested           bool
69         lastBecameInterested time.Time
70         priorInterest        time.Duration
71
72         lastStartedExpectingToReceiveChunks time.Time
73         cumulativeExpectedToReceiveChunks   time.Duration
74         chunksReceivedWhileExpecting        int64
75
76         Choked           bool
77         requests         map[request]struct{}
78         requestsLowWater int
79         // Chunks that we might reasonably expect to receive from the peer. Due to
80         // latency, buffering, and implementation differences, we may receive
81         // chunks that are no longer in the set of requests actually want.
82         validReceiveChunks map[request]struct{}
83         // Indexed by metadata piece, set to true if posted and pending a
84         // response.
85         metadataRequests []bool
86         sentHaves        bitmap.Bitmap
87
88         // Stuff controlled by the remote peer.
89         PeerID             PeerID
90         PeerInterested     bool
91         PeerChoked         bool
92         PeerRequests       map[request]struct{}
93         PeerExtensionBytes pp.PeerExtensionBits
94         // The pieces the peer has claimed to have.
95         peerPieces bitmap.Bitmap
96         // The peer has everything. This can occur due to a special message, when
97         // we may not even know the number of pieces in the torrent yet.
98         peerSentHaveAll bool
99         // The highest possible number of pieces the torrent could have based on
100         // communication with the peer. Generally only useful until we have the
101         // torrent info.
102         peerMinPieces int
103         // Pieces we've accepted chunks for from the peer.
104         peerTouchedPieces map[int]struct{}
105         peerAllowedFast   bitmap.Bitmap
106
107         PeerMaxRequests  int // Maximum pending requests the peer allows.
108         PeerExtensionIDs map[string]byte
109         PeerClientName   string
110
111         pieceInclination  []int
112         pieceRequestOrder prioritybitmap.PriorityBitmap
113
114         writeBuffer *bytes.Buffer
115         uploadTimer *time.Timer
116         writerCond  sync.Cond
117 }
118
119 func (cn *connection) updateExpectingChunks() {
120         if cn.expectingChunks() {
121                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
122                         cn.lastStartedExpectingToReceiveChunks = time.Now()
123                 }
124         } else {
125                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
126                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
127                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
128                 }
129         }
130 }
131
132 func (cn *connection) expectingChunks() bool {
133         return cn.Interested && !cn.PeerChoked
134 }
135
136 // Returns true if the connection is over IPv6.
137 func (cn *connection) ipv6() bool {
138         ip := missinggo.AddrIP(cn.remoteAddr())
139         if ip.To4() != nil {
140                 return false
141         }
142         return len(ip) == net.IPv6len
143 }
144
145 // Returns true the dialer has the lower client peer ID. TODO: Find the
146 // specification for this.
147 func (cn *connection) isPreferredDirection() bool {
148         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
149 }
150
151 // Returns whether the left connection should be preferred over the right one,
152 // considering only their networking properties. If ok is false, we can't
153 // decide.
154 func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
155         var ml multiLess
156         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
157         ml.NextBool(!l.utp(), !r.utp())
158         ml.NextBool(l.ipv6(), r.ipv6())
159         return ml.FinalOk()
160 }
161
162 func (cn *connection) cumInterest() time.Duration {
163         ret := cn.priorInterest
164         if cn.Interested {
165                 ret += time.Since(cn.lastBecameInterested)
166         }
167         return ret
168 }
169
170 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
171         if cn.peerSentHaveAll {
172                 return true, true
173         }
174         if !cn.t.haveInfo() {
175                 return false, false
176         }
177         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
178 }
179
180 func (cn *connection) mu() sync.Locker {
181         return &cn.t.cl.mu
182 }
183
184 func (cn *connection) remoteAddr() net.Addr {
185         return cn.conn.RemoteAddr()
186 }
187
188 func (cn *connection) localAddr() net.Addr {
189         return cn.conn.LocalAddr()
190 }
191
192 func (cn *connection) supportsExtension(ext string) bool {
193         _, ok := cn.PeerExtensionIDs[ext]
194         return ok
195 }
196
197 // The best guess at number of pieces in the torrent for this peer.
198 func (cn *connection) bestPeerNumPieces() int {
199         if cn.t.haveInfo() {
200                 return cn.t.numPieces()
201         }
202         return cn.peerMinPieces
203 }
204
205 func (cn *connection) completedString() string {
206         have := cn.peerPieces.Len()
207         if cn.peerSentHaveAll {
208                 have = cn.bestPeerNumPieces()
209         }
210         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
211 }
212
213 // Correct the PeerPieces slice length. Return false if the existing slice is
214 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
215 // messages.
216 func (cn *connection) setNumPieces(num int) error {
217         cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
218         cn.peerPiecesChanged()
219         return nil
220 }
221
222 func eventAgeString(t time.Time) string {
223         if t.IsZero() {
224                 return "never"
225         }
226         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
227 }
228
229 func (cn *connection) connectionFlags() (ret string) {
230         c := func(b byte) {
231                 ret += string([]byte{b})
232         }
233         if cn.cryptoMethod == mse.CryptoMethodRC4 {
234                 c('E')
235         } else if cn.headerEncrypted {
236                 c('e')
237         }
238         ret += string(cn.Discovery)
239         if cn.utp() {
240                 c('U')
241         }
242         return
243 }
244
245 func (cn *connection) utp() bool {
246         return strings.Contains(cn.remoteAddr().Network(), "utp")
247 }
248
249 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
250 func (cn *connection) statusFlags() (ret string) {
251         c := func(b byte) {
252                 ret += string([]byte{b})
253         }
254         if cn.Interested {
255                 c('i')
256         }
257         if cn.Choked {
258                 c('c')
259         }
260         c('-')
261         ret += cn.connectionFlags()
262         c('-')
263         if cn.PeerInterested {
264                 c('i')
265         }
266         if cn.PeerChoked {
267                 c('c')
268         }
269         return
270 }
271
272 // func (cn *connection) String() string {
273 //      var buf bytes.Buffer
274 //      cn.WriteStatus(&buf, nil)
275 //      return buf.String()
276 // }
277
278 func (cn *connection) downloadRate() float64 {
279         return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
280 }
281
282 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
283         // \t isn't preserved in <pre> blocks?
284         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
285         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
286                 eventAgeString(cn.lastMessageReceived),
287                 eventAgeString(cn.completedHandshake),
288                 eventAgeString(cn.lastHelpful()),
289                 cn.cumInterest(),
290                 cn.totalExpectingTime(),
291         )
292         fmt.Fprintf(w,
293                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
294                 cn.completedString(),
295                 len(cn.peerTouchedPieces),
296                 &cn.stats.ChunksReadUseful,
297                 &cn.stats.ChunksRead,
298                 &cn.stats.ChunksWritten,
299                 cn.requestsLowWater,
300                 cn.numLocalRequests(),
301                 cn.nominalMaxRequests(),
302                 len(cn.PeerRequests),
303                 cn.statusFlags(),
304                 cn.downloadRate()/(1<<10),
305         )
306         fmt.Fprintf(w, "    next pieces: %v%s\n",
307                 iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
308                 func() string {
309                         if cn.shouldRequestWithoutBias() {
310                                 return " (fastest)"
311                         } else {
312                                 return ""
313                         }
314                 }())
315 }
316
317 func (cn *connection) Close() {
318         if !cn.closed.Set() {
319                 return
320         }
321         cn.tickleWriter()
322         cn.discardPieceInclination()
323         cn.pieceRequestOrder.Clear()
324         if cn.conn != nil {
325                 go cn.conn.Close()
326         }
327 }
328
329 func (cn *connection) PeerHasPiece(piece int) bool {
330         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
331 }
332
333 // Writes a message into the write buffer.
334 func (cn *connection) Post(msg pp.Message) {
335         torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
336         // We don't need to track bytes here because a connection.w Writer wrapper
337         // takes care of that (although there's some delay between us recording
338         // the message, and the connection writer flushing it out.).
339         cn.writeBuffer.Write(msg.MustMarshalBinary())
340         // Last I checked only Piece messages affect stats, and we don't post
341         // those.
342         cn.wroteMsg(&msg)
343         cn.tickleWriter()
344 }
345
346 func (cn *connection) requestMetadataPiece(index int) {
347         eID := cn.PeerExtensionIDs["ut_metadata"]
348         if eID == 0 {
349                 return
350         }
351         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
352                 return
353         }
354         cn.Post(pp.Message{
355                 Type:       pp.Extended,
356                 ExtendedID: eID,
357                 ExtendedPayload: func() []byte {
358                         b, err := bencode.Marshal(map[string]int{
359                                 "msg_type": pp.RequestMetadataExtensionMsgType,
360                                 "piece":    index,
361                         })
362                         if err != nil {
363                                 panic(err)
364                         }
365                         return b
366                 }(),
367         })
368         for index >= len(cn.metadataRequests) {
369                 cn.metadataRequests = append(cn.metadataRequests, false)
370         }
371         cn.metadataRequests[index] = true
372 }
373
374 func (cn *connection) requestedMetadataPiece(index int) bool {
375         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
376 }
377
378 // The actual value to use as the maximum outbound requests.
379 func (cn *connection) nominalMaxRequests() (ret int) {
380         if cn.t.requestStrategy == 3 {
381                 expectingTime := int64(cn.totalExpectingTime())
382                 if expectingTime == 0 {
383                         expectingTime = math.MaxInt64
384                 } else {
385                         expectingTime *= 2
386                 }
387                 return int(clamp(
388                         1,
389                         int64(cn.PeerMaxRequests),
390                         max(
391                                 // It makes sense to always pipeline at least one connection,
392                                 // since latency must be non-zero.
393                                 2,
394                                 // Request only as many as we expect to receive in the
395                                 // dupliateRequestTimeout window. We are trying to avoid having to
396                                 // duplicate requests.
397                                 cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
398                         ),
399                 ))
400         }
401         return int(clamp(
402                 1,
403                 int64(cn.PeerMaxRequests),
404                 max(64,
405                         cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
406 }
407
408 func (cn *connection) totalExpectingTime() (ret time.Duration) {
409         ret = cn.cumulativeExpectedToReceiveChunks
410         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
411                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
412         }
413         return
414
415 }
416
417 func (cn *connection) onPeerSentCancel(r request) {
418         if _, ok := cn.PeerRequests[r]; !ok {
419                 torrent.Add("unexpected cancels received", 1)
420                 return
421         }
422         if cn.fastEnabled() {
423                 cn.reject(r)
424         } else {
425                 delete(cn.PeerRequests, r)
426         }
427 }
428
429 func (cn *connection) Choke(msg messageWriter) (more bool) {
430         if cn.Choked {
431                 return true
432         }
433         cn.Choked = true
434         more = msg(pp.Message{
435                 Type: pp.Choke,
436         })
437         if cn.fastEnabled() {
438                 for r := range cn.PeerRequests {
439                         // TODO: Don't reject pieces in allowed fast set.
440                         cn.reject(r)
441                 }
442         } else {
443                 cn.PeerRequests = nil
444         }
445         return
446 }
447
448 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
449         if !cn.Choked {
450                 return true
451         }
452         cn.Choked = false
453         return msg(pp.Message{
454                 Type: pp.Unchoke,
455         })
456 }
457
458 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
459         if cn.Interested == interested {
460                 return true
461         }
462         cn.Interested = interested
463         if interested {
464                 cn.lastBecameInterested = time.Now()
465         } else if !cn.lastBecameInterested.IsZero() {
466                 cn.priorInterest += time.Since(cn.lastBecameInterested)
467         }
468         cn.updateExpectingChunks()
469         // log.Printf("%p: setting interest: %v", cn, interested)
470         return msg(pp.Message{
471                 Type: func() pp.MessageType {
472                         if interested {
473                                 return pp.Interested
474                         } else {
475                                 return pp.NotInterested
476                         }
477                 }(),
478         })
479 }
480
481 // The function takes a message to be sent, and returns true if more messages
482 // are okay.
483 type messageWriter func(pp.Message) bool
484
485 // Proxies the messageWriter's response.
486 func (cn *connection) request(r request, mw messageWriter) bool {
487         if _, ok := cn.requests[r]; ok {
488                 panic("chunk already requested")
489         }
490         if !cn.PeerHasPiece(r.Index.Int()) {
491                 panic("requesting piece peer doesn't have")
492         }
493         if _, ok := cn.t.conns[cn]; !ok {
494                 panic("requesting but not in active conns")
495         }
496         if cn.closed.IsSet() {
497                 panic("requesting when connection is closed")
498         }
499         if cn.PeerChoked {
500                 if cn.peerAllowedFast.Get(int(r.Index)) {
501                         torrent.Add("allowed fast requests sent", 1)
502                 } else {
503                         panic("requesting while choked and not allowed fast")
504                 }
505         }
506         if cn.t.hashingPiece(pieceIndex(r.Index)) {
507                 panic("piece is being hashed")
508         }
509         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
510                 panic("piece is queued for hash")
511         }
512         if cn.requests == nil {
513                 cn.requests = make(map[request]struct{})
514         }
515         cn.requests[r] = struct{}{}
516         if cn.validReceiveChunks == nil {
517                 cn.validReceiveChunks = make(map[request]struct{})
518         }
519         cn.validReceiveChunks[r] = struct{}{}
520         cn.t.pendingRequests[r]++
521         cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() {
522                 torrent.Add("duplicate request timeouts", 1)
523                 cn.mu().Lock()
524                 defer cn.mu().Unlock()
525                 delete(cn.t.lastRequested, r)
526                 for cn := range cn.t.conns {
527                         if cn.PeerHasPiece(pieceIndex(r.Index)) {
528                                 cn.updateRequests()
529                         }
530                 }
531         })
532         cn.updateExpectingChunks()
533         return mw(pp.Message{
534                 Type:   pp.Request,
535                 Index:  r.Index,
536                 Begin:  r.Begin,
537                 Length: r.Length,
538         })
539 }
540
541 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
542         if !cn.t.networkingEnabled {
543                 if !cn.SetInterested(false, msg) {
544                         return
545                 }
546                 if len(cn.requests) != 0 {
547                         for r := range cn.requests {
548                                 cn.deleteRequest(r)
549                                 // log.Printf("%p: cancelling request: %v", cn, r)
550                                 if !msg(makeCancelMessage(r)) {
551                                         return
552                                 }
553                         }
554                 }
555         }
556         if len(cn.requests) <= cn.requestsLowWater {
557                 filledBuffer := false
558                 cn.iterPendingPieces(func(pieceIndex int) bool {
559                         cn.iterPendingRequests(pieceIndex, func(r request) bool {
560                                 if !cn.SetInterested(true, msg) {
561                                         filledBuffer = true
562                                         return false
563                                 }
564                                 if len(cn.requests) >= cn.nominalMaxRequests() {
565                                         return false
566                                 }
567                                 // Choking is looked at here because our interest is dependent
568                                 // on whether we'd make requests in its absence.
569                                 if cn.PeerChoked {
570                                         if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
571                                                 return false
572                                         }
573                                 }
574                                 if _, ok := cn.requests[r]; ok {
575                                         return true
576                                 }
577                                 filledBuffer = !cn.request(r, msg)
578                                 return !filledBuffer
579                         })
580                         return !filledBuffer
581                 })
582                 if filledBuffer {
583                         // If we didn't completely top up the requests, we shouldn't mark
584                         // the low water, since we'll want to top up the requests as soon
585                         // as we have more write buffer space.
586                         return
587                 }
588                 cn.requestsLowWater = len(cn.requests) / 2
589         }
590
591         cn.upload(msg)
592 }
593
594 // Routine that writes to the peer. Some of what to write is buffered by
595 // activity elsewhere in the Client, and some is determined locally when the
596 // connection is writable.
597 func (cn *connection) writer(keepAliveTimeout time.Duration) {
598         var (
599                 lastWrite      time.Time = time.Now()
600                 keepAliveTimer *time.Timer
601         )
602         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
603                 cn.mu().Lock()
604                 defer cn.mu().Unlock()
605                 if time.Since(lastWrite) >= keepAliveTimeout {
606                         cn.tickleWriter()
607                 }
608                 keepAliveTimer.Reset(keepAliveTimeout)
609         })
610         cn.mu().Lock()
611         defer cn.mu().Unlock()
612         defer cn.Close()
613         defer keepAliveTimer.Stop()
614         frontBuf := new(bytes.Buffer)
615         for {
616                 if cn.closed.IsSet() {
617                         return
618                 }
619                 if cn.writeBuffer.Len() == 0 {
620                         cn.fillWriteBuffer(func(msg pp.Message) bool {
621                                 cn.wroteMsg(&msg)
622                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
623                                 torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
624                                 return cn.writeBuffer.Len() < 1<<16 // 64KiB
625                         })
626                 }
627                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
628                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
629                         postedKeepalives.Add(1)
630                 }
631                 if cn.writeBuffer.Len() == 0 {
632                         // TODO: Minimize wakeups....
633                         cn.writerCond.Wait()
634                         continue
635                 }
636                 // Flip the buffers.
637                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
638                 cn.mu().Unlock()
639                 n, err := cn.w.Write(frontBuf.Bytes())
640                 cn.mu().Lock()
641                 if n != 0 {
642                         lastWrite = time.Now()
643                         keepAliveTimer.Reset(keepAliveTimeout)
644                 }
645                 if err != nil {
646                         return
647                 }
648                 if n != frontBuf.Len() {
649                         panic("short write")
650                 }
651                 frontBuf.Reset()
652         }
653 }
654
655 func (cn *connection) Have(piece int) {
656         if cn.sentHaves.Get(piece) {
657                 return
658         }
659         cn.Post(pp.Message{
660                 Type:  pp.Have,
661                 Index: pp.Integer(piece),
662         })
663         cn.sentHaves.Add(piece)
664 }
665
666 func (cn *connection) PostBitfield() {
667         if cn.sentHaves.Len() != 0 {
668                 panic("bitfield must be first have-related message sent")
669         }
670         if !cn.t.haveAnyPieces() {
671                 return
672         }
673         cn.Post(pp.Message{
674                 Type:     pp.Bitfield,
675                 Bitfield: cn.t.bitfield(),
676         })
677         cn.sentHaves = cn.t.completedPieces.Copy()
678 }
679
680 func (cn *connection) updateRequests() {
681         // log.Print("update requests")
682         cn.tickleWriter()
683 }
684
685 // Emits the indices in the Bitmaps bms in order, never repeating any index.
686 // skip is mutated during execution, and its initial values will never be
687 // emitted.
688 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
689         return func(cb iter.Callback) {
690                 for _, bm := range bms {
691                         if !iter.All(func(i interface{}) bool {
692                                 skip.Add(i.(int))
693                                 return cb(i)
694                         }, bitmap.Sub(bm, *skip).Iter) {
695                                 return
696                         }
697                 }
698         }
699 }
700
701 func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
702         now, readahead := cn.t.readerPiecePriorities()
703         var skip bitmap.Bitmap
704         if !cn.peerSentHaveAll {
705                 // Pieces to skip include pieces the peer doesn't have.
706                 skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
707         }
708         // And pieces that we already have.
709         skip.Union(cn.t.completedPieces)
710         skip.Union(cn.t.piecesQueuedForHash)
711         // Return an iterator over the different priority classes, minus the skip
712         // pieces.
713         return iter.All(
714                 func(_piece interface{}) bool {
715                         i := _piece.(pieceIndex)
716                         if cn.t.hashingPiece(i) {
717                                 return true
718                         }
719                         return f(i)
720                 },
721                 iterBitmapsDistinct(&skip, now, readahead),
722                 func(cb iter.Callback) {
723                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
724                                 if skip.Contains(piece) {
725                                         return true
726                                 }
727                                 more := cb(piece)
728                                 skip.Add(piece)
729                                 return more
730                         })
731                 },
732         )
733 }
734
735 // The connection should download highest priority pieces first, without any
736 // inclination toward avoiding wastage. Generally we might do this if there's
737 // a single connection, or this is the fastest connection, and we have active
738 // readers that signal an ordering preference. It's conceivable that the best
739 // connection should do this, since it's least likely to waste our time if
740 // assigned to the highest priority pieces, and assigning more than one this
741 // role would cause significant wasted bandwidth.
742 func (cn *connection) shouldRequestWithoutBias() bool {
743         if cn.t.requestStrategy != 2 {
744                 return false
745         }
746         if len(cn.t.readers) == 0 {
747                 return false
748         }
749         if len(cn.t.conns) == 1 {
750                 return true
751         }
752         if cn == cn.t.fastestConn {
753                 return true
754         }
755         return false
756 }
757
758 func (cn *connection) iterPendingPieces(f func(int) bool) bool {
759         if !cn.t.haveInfo() {
760                 return false
761         }
762         if cn.t.requestStrategy == 3 {
763                 return cn.iterUnbiasedPieceRequestOrder(f)
764         }
765         if cn.shouldRequestWithoutBias() {
766                 return cn.iterUnbiasedPieceRequestOrder(f)
767         } else {
768                 return cn.pieceRequestOrder.IterTyped(f)
769         }
770 }
771
772 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
773         cn.iterPendingPieces(func(i int) bool { return f(i) })
774 }
775
776 func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
777         return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
778                 r := request{pp.Integer(piece), cs}
779                 if cn.t.requestStrategy == 3 {
780                         if _, ok := cn.t.lastRequested[r]; ok {
781                                 // This piece has been requested on another connection, and
782                                 // the duplicate request timer is still running.
783                                 return true
784                         }
785                 }
786                 return f(r)
787         })
788 }
789
790 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
791         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
792         // TODO: Use "math/rand".Shuffle >= Go 1.10
793         return iter.ForPerm(len(chunkIndices), func(i int) bool {
794                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
795         })
796 }
797
798 // check callers updaterequests
799 func (cn *connection) stopRequestingPiece(piece int) bool {
800         return cn.pieceRequestOrder.Remove(piece)
801 }
802
803 // This is distinct from Torrent piece priority, which is the user's
804 // preference. Connection piece priority is specific to a connection and is
805 // used to pseudorandomly avoid connections always requesting the same pieces
806 // and thus wasting effort.
807 func (cn *connection) updatePiecePriority(piece int) bool {
808         tpp := cn.t.piecePriority(piece)
809         if !cn.PeerHasPiece(piece) {
810                 tpp = PiecePriorityNone
811         }
812         if tpp == PiecePriorityNone {
813                 return cn.stopRequestingPiece(piece)
814         }
815         prio := cn.getPieceInclination()[piece]
816         switch cn.t.requestStrategy {
817         case 1:
818                 switch tpp {
819                 case PiecePriorityNormal:
820                 case PiecePriorityReadahead:
821                         prio -= cn.t.numPieces()
822                 case PiecePriorityNext, PiecePriorityNow:
823                         prio -= 2 * cn.t.numPieces()
824                 default:
825                         panic(tpp)
826                 }
827                 prio += piece / 3
828         default:
829         }
830         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
831 }
832
833 func (cn *connection) getPieceInclination() []int {
834         if cn.pieceInclination == nil {
835                 cn.pieceInclination = cn.t.getConnPieceInclination()
836         }
837         return cn.pieceInclination
838 }
839
840 func (cn *connection) discardPieceInclination() {
841         if cn.pieceInclination == nil {
842                 return
843         }
844         cn.t.putPieceInclination(cn.pieceInclination)
845         cn.pieceInclination = nil
846 }
847
848 func (cn *connection) peerPiecesChanged() {
849         if cn.t.haveInfo() {
850                 prioritiesChanged := false
851                 for i := range iter.N(cn.t.numPieces()) {
852                         if cn.updatePiecePriority(i) {
853                                 prioritiesChanged = true
854                         }
855                 }
856                 if prioritiesChanged {
857                         cn.updateRequests()
858                 }
859         }
860 }
861
862 func (cn *connection) raisePeerMinPieces(newMin int) {
863         if newMin > cn.peerMinPieces {
864                 cn.peerMinPieces = newMin
865         }
866 }
867
868 func (cn *connection) peerSentHave(piece int) error {
869         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
870                 return errors.New("invalid piece")
871         }
872         if cn.PeerHasPiece(piece) {
873                 return nil
874         }
875         cn.raisePeerMinPieces(piece + 1)
876         cn.peerPieces.Set(piece, true)
877         if cn.updatePiecePriority(piece) {
878                 cn.updateRequests()
879         }
880         return nil
881 }
882
883 func (cn *connection) peerSentBitfield(bf []bool) error {
884         cn.peerSentHaveAll = false
885         if len(bf)%8 != 0 {
886                 panic("expected bitfield length divisible by 8")
887         }
888         // We know that the last byte means that at most the last 7 bits are
889         // wasted.
890         cn.raisePeerMinPieces(len(bf) - 7)
891         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
892                 // Ignore known excess pieces.
893                 bf = bf[:cn.t.numPieces()]
894         }
895         for i, have := range bf {
896                 if have {
897                         cn.raisePeerMinPieces(i + 1)
898                 }
899                 cn.peerPieces.Set(i, have)
900         }
901         cn.peerPiecesChanged()
902         return nil
903 }
904
905 func (cn *connection) onPeerSentHaveAll() error {
906         cn.peerSentHaveAll = true
907         cn.peerPieces.Clear()
908         cn.peerPiecesChanged()
909         return nil
910 }
911
912 func (cn *connection) peerSentHaveNone() error {
913         cn.peerPieces.Clear()
914         cn.peerSentHaveAll = false
915         cn.peerPiecesChanged()
916         return nil
917 }
918
919 func (c *connection) requestPendingMetadata() {
920         if c.t.haveInfo() {
921                 return
922         }
923         if c.PeerExtensionIDs["ut_metadata"] == 0 {
924                 // Peer doesn't support this.
925                 return
926         }
927         // Request metadata pieces that we don't have in a random order.
928         var pending []int
929         for index := 0; index < c.t.metadataPieceCount(); index++ {
930                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
931                         pending = append(pending, index)
932                 }
933         }
934         for _, i := range rand.Perm(len(pending)) {
935                 c.requestMetadataPiece(pending[i])
936         }
937 }
938
939 func (cn *connection) wroteMsg(msg *pp.Message) {
940         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
941         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
942 }
943
944 func (cn *connection) readMsg(msg *pp.Message) {
945         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
946 }
947
948 // After handshake, we know what Torrent and Client stats to include for a
949 // connection.
950 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
951         t := cn.t
952         f(&t.stats)
953         f(&t.cl.stats)
954 }
955
956 // All ConnStats that include this connection. Some objects are not known
957 // until the handshake is complete, after which it's expected to reconcile the
958 // differences.
959 func (cn *connection) allStats(f func(*ConnStats)) {
960         f(&cn.stats)
961         if cn.reconciledHandshakeStats {
962                 cn.postHandshakeStats(f)
963         }
964 }
965
966 func (cn *connection) wroteBytes(n int64) {
967         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
968 }
969
970 func (cn *connection) readBytes(n int64) {
971         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
972 }
973
974 // Returns whether the connection could be useful to us. We're seeding and
975 // they want data, we don't have metainfo and they can provide it, etc.
976 func (c *connection) useful() bool {
977         t := c.t
978         if c.closed.IsSet() {
979                 return false
980         }
981         if !t.haveInfo() {
982                 return c.supportsExtension("ut_metadata")
983         }
984         if t.seeding() && c.PeerInterested {
985                 return true
986         }
987         if c.peerHasWantedPieces() {
988                 return true
989         }
990         return false
991 }
992
993 func (c *connection) lastHelpful() (ret time.Time) {
994         ret = c.lastUsefulChunkReceived
995         if c.t.seeding() && c.lastChunkSent.After(ret) {
996                 ret = c.lastChunkSent
997         }
998         return
999 }
1000
1001 func (c *connection) fastEnabled() bool {
1002         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
1003 }
1004
1005 func (c *connection) reject(r request) {
1006         if !c.fastEnabled() {
1007                 panic("fast not enabled")
1008         }
1009         c.Post(r.ToMsg(pp.Reject))
1010         delete(c.PeerRequests, r)
1011 }
1012
1013 func (c *connection) onReadRequest(r request) error {
1014         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
1015         if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
1016                 torrent.Add("bad requests received", 1)
1017                 return errors.New("bad request")
1018         }
1019         if _, ok := c.PeerRequests[r]; ok {
1020                 torrent.Add("duplicate requests received", 1)
1021                 return nil
1022         }
1023         if c.Choked {
1024                 torrent.Add("requests received while choking", 1)
1025                 if c.fastEnabled() {
1026                         torrent.Add("requests rejected while choking", 1)
1027                         c.reject(r)
1028                 }
1029                 return nil
1030         }
1031         if len(c.PeerRequests) >= maxRequests {
1032                 torrent.Add("requests received while queue full", 1)
1033                 if c.fastEnabled() {
1034                         c.reject(r)
1035                 }
1036                 // BEP 6 says we may close here if we choose.
1037                 return nil
1038         }
1039         if !c.t.havePiece(r.Index.Int()) {
1040                 // This isn't necessarily them screwing up. We can drop pieces
1041                 // from our storage, and can't communicate this to peers
1042                 // except by reconnecting.
1043                 requestsReceivedForMissingPieces.Add(1)
1044                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
1045         }
1046         if c.PeerRequests == nil {
1047                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1048         }
1049         c.PeerRequests[r] = struct{}{}
1050         c.tickleWriter()
1051         return nil
1052 }
1053
1054 // Processes incoming bittorrent messages. The client lock is held upon entry
1055 // and exit. Returning will end the connection.
1056 func (c *connection) mainReadLoop() (err error) {
1057         defer func() {
1058                 if err != nil {
1059                         torrent.Add("connection.mainReadLoop returned with error", 1)
1060                 } else {
1061                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1062                 }
1063         }()
1064         t := c.t
1065         cl := t.cl
1066
1067         decoder := pp.Decoder{
1068                 R:         bufio.NewReaderSize(c.r, 1<<17),
1069                 MaxLength: 256 * 1024,
1070                 Pool:      t.chunkPool,
1071         }
1072         for {
1073                 var msg pp.Message
1074                 func() {
1075                         cl.mu.Unlock()
1076                         defer cl.mu.Lock()
1077                         err = decoder.Decode(&msg)
1078                 }()
1079                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1080                         return nil
1081                 }
1082                 if err != nil {
1083                         return err
1084                 }
1085                 c.readMsg(&msg)
1086                 c.lastMessageReceived = time.Now()
1087                 if msg.Keepalive {
1088                         receivedKeepalives.Add(1)
1089                         continue
1090                 }
1091                 messageTypesReceived.Add(msg.Type.String(), 1)
1092                 if msg.Type.FastExtension() && !c.fastEnabled() {
1093                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1094                 }
1095                 switch msg.Type {
1096                 case pp.Choke:
1097                         c.PeerChoked = true
1098                         c.deleteAllRequests()
1099                         // We can then reset our interest.
1100                         c.updateRequests()
1101                         c.updateExpectingChunks()
1102                 case pp.Reject:
1103                         c.deleteRequest(newRequestFromMessage(&msg))
1104                         delete(c.validReceiveChunks, newRequestFromMessage(&msg))
1105                 case pp.Unchoke:
1106                         c.PeerChoked = false
1107                         c.tickleWriter()
1108                         c.updateExpectingChunks()
1109                 case pp.Interested:
1110                         c.PeerInterested = true
1111                         c.tickleWriter()
1112                 case pp.NotInterested:
1113                         c.PeerInterested = false
1114                         // We don't clear their requests since it isn't clear in the spec.
1115                         // We'll probably choke them for this, which will clear them if
1116                         // appropriate, and is clearly specified.
1117                 case pp.Have:
1118                         err = c.peerSentHave(int(msg.Index))
1119                 case pp.Request:
1120                         r := newRequestFromMessage(&msg)
1121                         err = c.onReadRequest(r)
1122                 case pp.Cancel:
1123                         req := newRequestFromMessage(&msg)
1124                         c.onPeerSentCancel(req)
1125                 case pp.Bitfield:
1126                         err = c.peerSentBitfield(msg.Bitfield)
1127                 case pp.HaveAll:
1128                         err = c.onPeerSentHaveAll()
1129                 case pp.HaveNone:
1130                         err = c.peerSentHaveNone()
1131                 case pp.Piece:
1132                         err = c.receiveChunk(&msg)
1133                         if len(msg.Piece) == int(t.chunkSize) {
1134                                 t.chunkPool.Put(&msg.Piece)
1135                         }
1136                         if err != nil {
1137                                 err = fmt.Errorf("receiving chunk: %s", err)
1138                         }
1139                 case pp.Extended:
1140                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1141                 case pp.Port:
1142                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1143                         if err != nil {
1144                                 panic(err)
1145                         }
1146                         if msg.Port != 0 {
1147                                 pingAddr.Port = int(msg.Port)
1148                         }
1149                         cl.eachDhtServer(func(s *dht.Server) {
1150                                 go s.Ping(pingAddr, nil)
1151                         })
1152                 case pp.AllowedFast:
1153                         torrent.Add("allowed fasts received", 1)
1154                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1155                         c.peerAllowedFast.Add(int(msg.Index))
1156                         c.updateRequests()
1157                 case pp.Suggest:
1158                         torrent.Add("suggests received", 1)
1159                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1160                         c.updateRequests()
1161                 default:
1162                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1163                 }
1164                 if err != nil {
1165                         return err
1166                 }
1167         }
1168 }
1169
1170 func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
1171         defer func() {
1172                 // TODO: Should we still do this?
1173                 if err != nil {
1174                         // These clients use their own extension IDs for outgoing message
1175                         // types, which is incorrect.
1176                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1177                                 err = nil
1178                         }
1179                 }
1180         }()
1181         t := c.t
1182         cl := t.cl
1183         switch id {
1184         case pp.HandshakeExtendedID:
1185                 // TODO: Create a bencode struct for this.
1186                 var d map[string]interface{}
1187                 err := bencode.Unmarshal(payload, &d)
1188                 if err != nil {
1189                         return fmt.Errorf("error decoding extended message payload: %s", err)
1190                 }
1191                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1192                 if reqq, ok := d["reqq"]; ok {
1193                         if i, ok := reqq.(int64); ok {
1194                                 c.PeerMaxRequests = int(i)
1195                         }
1196                 }
1197                 if v, ok := d["v"]; ok {
1198                         c.PeerClientName = v.(string)
1199                 }
1200                 if m, ok := d["m"]; ok {
1201                         mTyped, ok := m.(map[string]interface{})
1202                         if !ok {
1203                                 return errors.New("handshake m value is not dict")
1204                         }
1205                         if c.PeerExtensionIDs == nil {
1206                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1207                         }
1208                         for name, v := range mTyped {
1209                                 id, ok := v.(int64)
1210                                 if !ok {
1211                                         log.Printf("bad handshake m item extension ID type: %T", v)
1212                                         continue
1213                                 }
1214                                 if id == 0 {
1215                                         delete(c.PeerExtensionIDs, name)
1216                                 } else {
1217                                         if c.PeerExtensionIDs[name] == 0 {
1218                                                 supportedExtensionMessages.Add(name, 1)
1219                                         }
1220                                         c.PeerExtensionIDs[name] = byte(id)
1221                                 }
1222                         }
1223                 }
1224                 metadata_sizeUntyped, ok := d["metadata_size"]
1225                 if ok {
1226                         metadata_size, ok := metadata_sizeUntyped.(int64)
1227                         if !ok {
1228                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1229                         } else {
1230                                 err = t.setMetadataSize(metadata_size)
1231                                 if err != nil {
1232                                         return fmt.Errorf("error setting metadata size to %d", metadata_size)
1233                                 }
1234                         }
1235                 }
1236                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1237                         c.requestPendingMetadata()
1238                 }
1239                 return nil
1240         case metadataExtendedId:
1241                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1242                 if err != nil {
1243                         return fmt.Errorf("error handling metadata extension message: %s", err)
1244                 }
1245                 return nil
1246         case pexExtendedId:
1247                 if cl.config.DisablePEX {
1248                         // TODO: Maybe close the connection. Check that we're not
1249                         // advertising that we support PEX if it's disabled.
1250                         return nil
1251                 }
1252                 var pexMsg peerExchangeMessage
1253                 err := bencode.Unmarshal(payload, &pexMsg)
1254                 if err != nil {
1255                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1256                 }
1257                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1258                 t.addPeers(pexMsg.AddedPeers())
1259                 return nil
1260         default:
1261                 return fmt.Errorf("unexpected extended message ID: %v", id)
1262         }
1263 }
1264
1265 // Set both the Reader and Writer for the connection from a single ReadWriter.
1266 func (cn *connection) setRW(rw io.ReadWriter) {
1267         cn.r = rw
1268         cn.w = rw
1269 }
1270
1271 // Returns the Reader and Writer as a combined ReadWriter.
1272 func (cn *connection) rw() io.ReadWriter {
1273         return struct {
1274                 io.Reader
1275                 io.Writer
1276         }{cn.r, cn.w}
1277 }
1278
1279 // Handle a received chunk from a peer.
1280 func (c *connection) receiveChunk(msg *pp.Message) error {
1281         t := c.t
1282         cl := t.cl
1283         torrent.Add("chunks received", 1)
1284
1285         req := newRequestFromMessage(msg)
1286
1287         if c.PeerChoked {
1288                 torrent.Add("chunks received while choked", 1)
1289         }
1290
1291         if _, ok := c.validReceiveChunks[req]; !ok {
1292                 torrent.Add("chunks received unexpected", 1)
1293                 return errors.New("received unexpected chunk")
1294         }
1295         delete(c.validReceiveChunks, req)
1296
1297         if c.PeerChoked && c.peerAllowedFast.Get(int(req.Index)) {
1298                 torrent.Add("chunks received due to allowed fast", 1)
1299         }
1300
1301         // Request has been satisfied.
1302         if c.deleteRequest(req) {
1303                 if c.expectingChunks() {
1304                         c.chunksReceivedWhileExpecting++
1305                 }
1306         } else {
1307                 torrent.Add("chunks received unwanted", 1)
1308         }
1309
1310         // Do we actually want this chunk?
1311         if t.haveChunk(req) {
1312                 torrent.Add("chunks received wasted", 1)
1313                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1314                 return nil
1315         }
1316
1317         index := int(req.Index)
1318         piece := &t.pieces[index]
1319
1320         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1321         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1322         c.lastUsefulChunkReceived = time.Now()
1323         // if t.fastestConn != c {
1324         // log.Printf("setting fastest connection %p", c)
1325         // }
1326         t.fastestConn = c
1327
1328         // Need to record that it hasn't been written yet, before we attempt to do
1329         // anything with it.
1330         piece.incrementPendingWrites()
1331         // Record that we have the chunk, so we aren't trying to download it while
1332         // waiting for it to be written to storage.
1333         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1334
1335         // Cancel pending requests for this chunk.
1336         for c := range t.conns {
1337                 c.postCancel(req)
1338         }
1339
1340         err := func() error {
1341                 cl.mu.Unlock()
1342                 defer cl.mu.Lock()
1343                 // Write the chunk out. Note that the upper bound on chunk writing
1344                 // concurrency will be the number of connections. We write inline with
1345                 // receiving the chunk (with this lock dance), because we want to
1346                 // handle errors synchronously and I haven't thought of a nice way to
1347                 // defer any concurrency to the storage and have that notify the
1348                 // client of errors. TODO: Do that instead.
1349                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1350         }()
1351
1352         piece.decrementPendingWrites()
1353
1354         if err != nil {
1355                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1356                 t.pendRequest(req)
1357                 t.updatePieceCompletion(int(msg.Index))
1358                 return nil
1359         }
1360
1361         // It's important that the piece is potentially queued before we check if
1362         // the piece is still wanted, because if it is queued, it won't be wanted.
1363         if t.pieceAllDirty(index) {
1364                 t.queuePieceCheck(int(req.Index))
1365                 t.pendAllChunkSpecs(index)
1366         }
1367
1368         c.onDirtiedPiece(index)
1369
1370         cl.event.Broadcast()
1371         t.publishPieceChange(int(req.Index))
1372
1373         return nil
1374 }
1375
1376 func (c *connection) onDirtiedPiece(piece int) {
1377         if c.peerTouchedPieces == nil {
1378                 c.peerTouchedPieces = make(map[int]struct{})
1379         }
1380         c.peerTouchedPieces[piece] = struct{}{}
1381         ds := &c.t.pieces[piece].dirtiers
1382         if *ds == nil {
1383                 *ds = make(map[*connection]struct{})
1384         }
1385         (*ds)[c] = struct{}{}
1386 }
1387
1388 func (c *connection) uploadAllowed() bool {
1389         if c.t.cl.config.NoUpload {
1390                 return false
1391         }
1392         if c.t.seeding() {
1393                 return true
1394         }
1395         if !c.peerHasWantedPieces() {
1396                 return false
1397         }
1398         // Don't upload more than 100 KiB more than we download.
1399         if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
1400                 return false
1401         }
1402         return true
1403 }
1404
1405 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1406         if c.uploadTimer == nil {
1407                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1408         } else {
1409                 c.uploadTimer.Reset(delay)
1410         }
1411 }
1412
1413 // Also handles choking and unchoking of the remote peer.
1414 func (c *connection) upload(msg func(pp.Message) bool) bool {
1415         // Breaking or completing this loop means we don't want to upload to the
1416         // peer anymore, and we choke them.
1417 another:
1418         for c.uploadAllowed() {
1419                 // We want to upload to the peer.
1420                 if !c.Unchoke(msg) {
1421                         return false
1422                 }
1423                 for r := range c.PeerRequests {
1424                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1425                         if !res.OK() {
1426                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1427                         }
1428                         delay := res.Delay()
1429                         if delay > 0 {
1430                                 res.Cancel()
1431                                 c.setRetryUploadTimer(delay)
1432                                 // Hard to say what to return here.
1433                                 return true
1434                         }
1435                         more, err := c.sendChunk(r, msg)
1436                         if err != nil {
1437                                 i := int(r.Index)
1438                                 if c.t.pieceComplete(i) {
1439                                         c.t.updatePieceCompletion(i)
1440                                         if !c.t.pieceComplete(i) {
1441                                                 // We had the piece, but not anymore.
1442                                                 break another
1443                                         }
1444                                 }
1445                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1446                                 // If we failed to send a chunk, choke the peer to ensure they
1447                                 // flush all their requests. We've probably dropped a piece,
1448                                 // but there's no way to communicate this to the peer. If they
1449                                 // ask for it again, we'll kick them to allow us to send them
1450                                 // an updated bitfield.
1451                                 break another
1452                         }
1453                         delete(c.PeerRequests, r)
1454                         if !more {
1455                                 return false
1456                         }
1457                         goto another
1458                 }
1459                 return true
1460         }
1461         return c.Choke(msg)
1462 }
1463
1464 func (cn *connection) Drop() {
1465         cn.t.dropConnection(cn)
1466 }
1467
1468 func (cn *connection) netGoodPiecesDirtied() int64 {
1469         return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
1470 }
1471
1472 func (c *connection) peerHasWantedPieces() bool {
1473         return !c.pieceRequestOrder.IsEmpty()
1474 }
1475
1476 func (c *connection) numLocalRequests() int {
1477         return len(c.requests)
1478 }
1479
1480 func (c *connection) deleteRequest(r request) bool {
1481         if _, ok := c.requests[r]; !ok {
1482                 return false
1483         }
1484         delete(c.requests, r)
1485         c.updateExpectingChunks()
1486         if t, ok := c.t.lastRequested[r]; ok {
1487                 t.Stop()
1488                 delete(c.t.lastRequested, r)
1489         }
1490         pr := c.t.pendingRequests
1491         pr[r]--
1492         n := pr[r]
1493         if n == 0 {
1494                 delete(pr, r)
1495         }
1496         if n < 0 {
1497                 panic(n)
1498         }
1499         c.updateRequests()
1500         return true
1501 }
1502
1503 func (c *connection) deleteAllRequests() {
1504         for r := range c.requests {
1505                 c.deleteRequest(r)
1506         }
1507         if len(c.requests) != 0 {
1508                 panic(len(c.requests))
1509         }
1510         // for c := range c.t.conns {
1511         //      c.tickleWriter()
1512         // }
1513 }
1514
1515 func (c *connection) tickleWriter() {
1516         c.writerCond.Broadcast()
1517 }
1518
1519 func (c *connection) postCancel(r request) bool {
1520         if !c.deleteRequest(r) {
1521                 return false
1522         }
1523         c.Post(makeCancelMessage(r))
1524         return true
1525 }
1526
1527 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1528         // Count the chunk being sent, even if it isn't.
1529         b := make([]byte, r.Length)
1530         p := c.t.info.Piece(int(r.Index))
1531         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1532         if n != len(b) {
1533                 if err == nil {
1534                         panic("expected error")
1535                 }
1536                 return
1537         } else if err == io.EOF {
1538                 err = nil
1539         }
1540         more = msg(pp.Message{
1541                 Type:  pp.Piece,
1542                 Index: r.Index,
1543                 Begin: r.Begin,
1544                 Piece: b,
1545         })
1546         c.lastChunkSent = time.Now()
1547         return
1548 }
1549
1550 func (c *connection) setTorrent(t *Torrent) {
1551         if c.t != nil {
1552                 panic("connection already associated with a torrent")
1553         }
1554         c.t = t
1555         t.reconcileHandshakeStats(c)
1556 }