]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Add mse.CryptoMethod type
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/log"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/mse"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "Tr"
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg"
34         peerSourceDHTAnnouncePeer = "Ha"
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t *Torrent
41         // The actual Conn, used for closing, and setting socket options.
42         conn net.Conn
43         // The Reader and Writer for this Conn, with hooks installed for stats,
44         // limiting, deadlines etc.
45         w io.Writer
46         r io.Reader
47         // True if the connection is operating over MSE obfuscation.
48         headerEncrypted bool
49         cryptoMethod    mse.CryptoMethod
50         Discovery       peerSource
51         uTP             bool
52         closed          missinggo.Event
53
54         stats ConnStats
55
56         lastMessageReceived     time.Time
57         completedHandshake      time.Time
58         lastUsefulChunkReceived time.Time
59         lastChunkSent           time.Time
60
61         // Stuff controlled by the local peer.
62         Interested           bool
63         lastBecameInterested time.Time
64         priorInterest        time.Duration
65
66         Choked           bool
67         requests         map[request]struct{}
68         requestsLowWater int
69         // Indexed by metadata piece, set to true if posted and pending a
70         // response.
71         metadataRequests []bool
72         sentHaves        bitmap.Bitmap
73
74         // Stuff controlled by the remote peer.
75         PeerID             PeerID
76         PeerInterested     bool
77         PeerChoked         bool
78         PeerRequests       map[request]struct{}
79         PeerExtensionBytes peerExtensionBytes
80         // The pieces the peer has claimed to have.
81         peerPieces bitmap.Bitmap
82         // The peer has everything. This can occur due to a special message, when
83         // we may not even know the number of pieces in the torrent yet.
84         peerSentHaveAll bool
85         // The highest possible number of pieces the torrent could have based on
86         // communication with the peer. Generally only useful until we have the
87         // torrent info.
88         peerMinPieces int
89         // Pieces we've accepted chunks for from the peer.
90         peerTouchedPieces map[int]struct{}
91         peerAllowedFast   bitmap.Bitmap
92
93         PeerMaxRequests  int // Maximum pending requests the peer allows.
94         PeerExtensionIDs map[string]byte
95         PeerClientName   string
96
97         pieceInclination  []int
98         pieceRequestOrder prioritybitmap.PriorityBitmap
99
100         writeBuffer *bytes.Buffer
101         uploadTimer *time.Timer
102         writerCond  sync.Cond
103 }
104
105 func (cn *connection) cumInterest() time.Duration {
106         ret := cn.priorInterest
107         if cn.Interested {
108                 ret += time.Since(cn.lastBecameInterested)
109         }
110         return ret
111 }
112
113 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
114         if cn.peerSentHaveAll {
115                 return true, true
116         }
117         if !cn.t.haveInfo() {
118                 return false, false
119         }
120         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
121 }
122
123 func (cn *connection) mu() sync.Locker {
124         return &cn.t.cl.mu
125 }
126
127 func (cn *connection) remoteAddr() net.Addr {
128         return cn.conn.RemoteAddr()
129 }
130
131 func (cn *connection) localAddr() net.Addr {
132         return cn.conn.LocalAddr()
133 }
134
135 func (cn *connection) supportsExtension(ext string) bool {
136         _, ok := cn.PeerExtensionIDs[ext]
137         return ok
138 }
139
140 // The best guess at number of pieces in the torrent for this peer.
141 func (cn *connection) bestPeerNumPieces() int {
142         if cn.t.haveInfo() {
143                 return cn.t.numPieces()
144         }
145         return cn.peerMinPieces
146 }
147
148 func (cn *connection) completedString() string {
149         have := cn.peerPieces.Len()
150         if cn.peerSentHaveAll {
151                 have = cn.bestPeerNumPieces()
152         }
153         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
154 }
155
156 // Correct the PeerPieces slice length. Return false if the existing slice is
157 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
158 // messages.
159 func (cn *connection) setNumPieces(num int) error {
160         cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
161         cn.peerPiecesChanged()
162         return nil
163 }
164
165 func eventAgeString(t time.Time) string {
166         if t.IsZero() {
167                 return "never"
168         }
169         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
170 }
171
172 func (cn *connection) connectionFlags() (ret string) {
173         c := func(b byte) {
174                 ret += string([]byte{b})
175         }
176         if cn.cryptoMethod == mse.CryptoMethodRC4 {
177                 c('E')
178         } else if cn.headerEncrypted {
179                 c('e')
180         }
181         ret += string(cn.Discovery)
182         if cn.uTP {
183                 c('U')
184         }
185         return
186 }
187
188 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
189 func (cn *connection) statusFlags() (ret string) {
190         c := func(b byte) {
191                 ret += string([]byte{b})
192         }
193         if cn.Interested {
194                 c('i')
195         }
196         if cn.Choked {
197                 c('c')
198         }
199         c('-')
200         ret += cn.connectionFlags()
201         c('-')
202         if cn.PeerInterested {
203                 c('i')
204         }
205         if cn.PeerChoked {
206                 c('c')
207         }
208         return
209 }
210
211 // func (cn *connection) String() string {
212 //      var buf bytes.Buffer
213 //      cn.WriteStatus(&buf, nil)
214 //      return buf.String()
215 // }
216
217 func (cn *connection) downloadRate() float64 {
218         return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
219 }
220
221 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
222         // \t isn't preserved in <pre> blocks?
223         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
224         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
225                 eventAgeString(cn.lastMessageReceived),
226                 eventAgeString(cn.completedHandshake),
227                 eventAgeString(cn.lastHelpful()),
228                 cn.cumInterest(),
229         )
230         fmt.Fprintf(w,
231                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
232                 cn.completedString(),
233                 len(cn.peerTouchedPieces),
234                 cn.stats.ChunksReadUseful,
235                 cn.stats.ChunksRead,
236                 cn.stats.ChunksWritten,
237                 cn.requestsLowWater,
238                 cn.numLocalRequests(),
239                 cn.nominalMaxRequests(),
240                 len(cn.PeerRequests),
241                 cn.statusFlags(),
242                 cn.downloadRate()/(1<<10),
243         )
244         roi := cn.pieceRequestOrderIter()
245         fmt.Fprintf(w, "    next pieces: %v%s\n",
246                 iter.ToSlice(iter.Head(10, roi)),
247                 func() string {
248                         if cn.shouldRequestWithoutBias() {
249                                 return " (fastest)"
250                         } else {
251                                 return ""
252                         }
253                 }())
254 }
255
256 func (cn *connection) Close() {
257         if !cn.closed.Set() {
258                 return
259         }
260         cn.tickleWriter()
261         cn.discardPieceInclination()
262         cn.pieceRequestOrder.Clear()
263         if cn.conn != nil {
264                 go cn.conn.Close()
265         }
266 }
267
268 func (cn *connection) PeerHasPiece(piece int) bool {
269         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
270 }
271
272 // Writes a message into the write buffer.
273 func (cn *connection) Post(msg pp.Message) {
274         messageTypesPosted.Add(msg.Type.String(), 1)
275         // We don't need to track bytes here because a connection.w Writer wrapper
276         // takes care of that (although there's some delay between us recording
277         // the message, and the connection writer flushing it out.).
278         cn.writeBuffer.Write(msg.MustMarshalBinary())
279         // Last I checked only Piece messages affect stats, and we don't post
280         // those.
281         cn.wroteMsg(&msg)
282         cn.tickleWriter()
283 }
284
285 func (cn *connection) requestMetadataPiece(index int) {
286         eID := cn.PeerExtensionIDs["ut_metadata"]
287         if eID == 0 {
288                 return
289         }
290         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
291                 return
292         }
293         cn.Post(pp.Message{
294                 Type:       pp.Extended,
295                 ExtendedID: eID,
296                 ExtendedPayload: func() []byte {
297                         b, err := bencode.Marshal(map[string]int{
298                                 "msg_type": pp.RequestMetadataExtensionMsgType,
299                                 "piece":    index,
300                         })
301                         if err != nil {
302                                 panic(err)
303                         }
304                         return b
305                 }(),
306         })
307         for index >= len(cn.metadataRequests) {
308                 cn.metadataRequests = append(cn.metadataRequests, false)
309         }
310         cn.metadataRequests[index] = true
311 }
312
313 func (cn *connection) requestedMetadataPiece(index int) bool {
314         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
315 }
316
317 func clamp(min, value, max int64) int64 {
318         if min > max {
319                 panic("harumph")
320         }
321         if value < min {
322                 value = min
323         }
324         if value > max {
325                 value = max
326         }
327         return value
328 }
329
330 func max(as ...int64) int64 {
331         ret := as[0]
332         for _, a := range as[1:] {
333                 if a > ret {
334                         ret = a
335                 }
336         }
337         return ret
338 }
339
340 // The actual value to use as the maximum outbound requests.
341 func (cn *connection) nominalMaxRequests() (ret int) {
342         return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
343 }
344
345 func (cn *connection) onPeerSentCancel(r request) {
346         if _, ok := cn.PeerRequests[r]; !ok {
347                 torrent.Add("unexpected cancels received", 1)
348                 return
349         }
350         if cn.fastEnabled() {
351                 cn.reject(r)
352         } else {
353                 delete(cn.PeerRequests, r)
354         }
355 }
356
357 func (cn *connection) Choke(msg messageWriter) (more bool) {
358         if cn.Choked {
359                 return true
360         }
361         cn.Choked = true
362         more = msg(pp.Message{
363                 Type: pp.Choke,
364         })
365         if cn.fastEnabled() {
366                 for r := range cn.PeerRequests {
367                         // TODO: Don't reject pieces in allowed fast set.
368                         cn.reject(r)
369                 }
370         } else {
371                 cn.PeerRequests = nil
372         }
373         return
374 }
375
376 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
377         if !cn.Choked {
378                 return true
379         }
380         cn.Choked = false
381         return msg(pp.Message{
382                 Type: pp.Unchoke,
383         })
384 }
385
386 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
387         if cn.Interested == interested {
388                 return true
389         }
390         cn.Interested = interested
391         if interested {
392                 cn.lastBecameInterested = time.Now()
393         } else if !cn.lastBecameInterested.IsZero() {
394                 cn.priorInterest += time.Since(cn.lastBecameInterested)
395         }
396         // log.Printf("%p: setting interest: %v", cn, interested)
397         return msg(pp.Message{
398                 Type: func() pp.MessageType {
399                         if interested {
400                                 return pp.Interested
401                         } else {
402                                 return pp.NotInterested
403                         }
404                 }(),
405         })
406 }
407
408 // The function takes a message to be sent, and returns true if more messages
409 // are okay.
410 type messageWriter func(pp.Message) bool
411
412 // Proxies the messageWriter's response.
413 func (cn *connection) request(r request, mw messageWriter) bool {
414         if cn.requests == nil {
415                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
416         }
417         if _, ok := cn.requests[r]; ok {
418                 panic("chunk already requested")
419         }
420         if !cn.PeerHasPiece(r.Index.Int()) {
421                 panic("requesting piece peer doesn't have")
422         }
423         if _, ok := cn.t.conns[cn]; !ok {
424                 panic("requesting but not in active conns")
425         }
426         if cn.closed.IsSet() {
427                 panic("requesting when connection is closed")
428         }
429         if cn.PeerChoked {
430                 if cn.peerAllowedFast.Get(int(r.Index)) {
431                         torrent.Add("allowed fast requests sent", 1)
432                 } else {
433                         panic("requesting while choked and not allowed fast")
434                 }
435         }
436         cn.requests[r] = struct{}{}
437         cn.t.pendingRequests[r]++
438         return mw(pp.Message{
439                 Type:   pp.Request,
440                 Index:  r.Index,
441                 Begin:  r.Begin,
442                 Length: r.Length,
443         })
444 }
445
446 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
447         numFillBuffers.Add(1)
448         cancel, new, i := cn.desiredRequestState()
449         if !cn.SetInterested(i, msg) {
450                 return
451         }
452         if cancel && len(cn.requests) != 0 {
453                 fillBufferSentCancels.Add(1)
454                 for r := range cn.requests {
455                         cn.deleteRequest(r)
456                         // log.Printf("%p: cancelling request: %v", cn, r)
457                         if !msg(makeCancelMessage(r)) {
458                                 return
459                         }
460                 }
461         }
462         if len(new) != 0 {
463                 fillBufferSentRequests.Add(1)
464                 for _, r := range new {
465                         if !cn.request(r, msg) {
466                                 // If we didn't completely top up the requests, we shouldn't
467                                 // mark the low water, since we'll want to top up the requests
468                                 // as soon as we have more write buffer space.
469                                 return
470                         }
471                 }
472                 cn.requestsLowWater = len(cn.requests) / 2
473         }
474         cn.upload(msg)
475 }
476
477 // Routine that writes to the peer. Some of what to write is buffered by
478 // activity elsewhere in the Client, and some is determined locally when the
479 // connection is writable.
480 func (cn *connection) writer(keepAliveTimeout time.Duration) {
481         var (
482                 lastWrite      time.Time = time.Now()
483                 keepAliveTimer *time.Timer
484         )
485         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
486                 cn.mu().Lock()
487                 defer cn.mu().Unlock()
488                 if time.Since(lastWrite) >= keepAliveTimeout {
489                         cn.tickleWriter()
490                 }
491                 keepAliveTimer.Reset(keepAliveTimeout)
492         })
493         cn.mu().Lock()
494         defer cn.mu().Unlock()
495         defer cn.Close()
496         defer keepAliveTimer.Stop()
497         frontBuf := new(bytes.Buffer)
498         for {
499                 if cn.closed.IsSet() {
500                         return
501                 }
502                 if cn.writeBuffer.Len() == 0 {
503                         cn.fillWriteBuffer(func(msg pp.Message) bool {
504                                 cn.wroteMsg(&msg)
505                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
506                                 return cn.writeBuffer.Len() < 1<<16
507                         })
508                 }
509                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
510                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
511                         postedKeepalives.Add(1)
512                 }
513                 if cn.writeBuffer.Len() == 0 {
514                         // TODO: Minimize wakeups....
515                         cn.writerCond.Wait()
516                         continue
517                 }
518                 // Flip the buffers.
519                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
520                 cn.mu().Unlock()
521                 n, err := cn.w.Write(frontBuf.Bytes())
522                 cn.mu().Lock()
523                 if n != 0 {
524                         lastWrite = time.Now()
525                         keepAliveTimer.Reset(keepAliveTimeout)
526                 }
527                 if err != nil {
528                         return
529                 }
530                 if n != frontBuf.Len() {
531                         panic("short write")
532                 }
533                 frontBuf.Reset()
534         }
535 }
536
537 func (cn *connection) Have(piece int) {
538         if cn.sentHaves.Get(piece) {
539                 return
540         }
541         cn.Post(pp.Message{
542                 Type:  pp.Have,
543                 Index: pp.Integer(piece),
544         })
545         cn.sentHaves.Add(piece)
546 }
547
548 func (cn *connection) PostBitfield() {
549         if cn.sentHaves.Len() != 0 {
550                 panic("bitfield must be first have-related message sent")
551         }
552         if !cn.t.haveAnyPieces() {
553                 return
554         }
555         cn.Post(pp.Message{
556                 Type:     pp.Bitfield,
557                 Bitfield: cn.t.bitfield(),
558         })
559         cn.sentHaves = cn.t.completedPieces.Copy()
560 }
561
562 // Determines interest and requests to send to a connected peer.
563 func nextRequestState(
564         networkingEnabled bool,
565         currentRequests map[request]struct{},
566         peerChoking bool,
567         iterPendingRequests func(f func(request) bool),
568         requestsLowWater int,
569         requestsHighWater int,
570         allowedFast bitmap.Bitmap,
571 ) (
572         cancelExisting bool, // Cancel all our pending requests
573         newRequests []request, // Chunks to request that we currently aren't
574         interested bool, // Whether we should indicate interest, even if we don't request anything
575 ) {
576         if !networkingEnabled {
577                 return true, nil, false
578         }
579         if len(currentRequests) > requestsLowWater {
580                 return false, nil, true
581         }
582         iterPendingRequests(func(r request) bool {
583                 interested = true
584                 if peerChoking {
585                         if allowedFast.IsEmpty() {
586                                 return false
587                         }
588                         if !allowedFast.Get(int(r.Index)) {
589                                 return true
590                         }
591                 }
592                 if len(currentRequests)+len(newRequests) >= requestsHighWater {
593                         return false
594                 }
595                 if _, ok := currentRequests[r]; !ok {
596                         if newRequests == nil {
597                                 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
598                         }
599                         newRequests = append(newRequests, r)
600                 }
601                 return true
602         })
603         return
604 }
605
606 func (cn *connection) updateRequests() {
607         // log.Print("update requests")
608         cn.tickleWriter()
609 }
610
611 // Emits the indices in the Bitmaps bms in order, never repeating any index.
612 // skip is mutated during execution, and its initial values will never be
613 // emitted.
614 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
615         return func(cb iter.Callback) {
616                 for _, bm := range bms {
617                         if !iter.All(func(i interface{}) bool {
618                                 skip.Add(i.(int))
619                                 return cb(i)
620                         }, bitmap.Sub(bm, *skip).Iter) {
621                                 return
622                         }
623                 }
624         }
625 }
626
627 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
628         now, readahead := cn.t.readerPiecePriorities()
629         var skip bitmap.Bitmap
630         if !cn.peerSentHaveAll {
631                 // Pieces to skip include pieces the peer doesn't have
632                 skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
633         }
634         // And pieces that we already have.
635         skip.Union(cn.t.completedPieces)
636         // Return an iterator over the different priority classes, minus the skip
637         // pieces.
638         return iter.Chain(
639                 iterBitmapsDistinct(&skip, now, readahead),
640                 func(cb iter.Callback) {
641                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
642                                 if skip.Contains(piece) {
643                                         return true
644                                 }
645                                 more := cb(piece)
646                                 skip.Add(piece)
647                                 return more
648                         })
649                 },
650         )
651 }
652
653 // The connection should download highest priority pieces first, without any
654 // inclination toward avoiding wastage. Generally we might do this if there's
655 // a single connection, or this is the fastest connection, and we have active
656 // readers that signal an ordering preference. It's conceivable that the best
657 // connection should do this, since it's least likely to waste our time if
658 // assigned to the highest priority pieces, and assigning more than one this
659 // role would cause significant wasted bandwidth.
660 func (cn *connection) shouldRequestWithoutBias() bool {
661         if cn.t.requestStrategy != 2 {
662                 return false
663         }
664         if len(cn.t.readers) == 0 {
665                 return false
666         }
667         if len(cn.t.conns) == 1 {
668                 return true
669         }
670         if cn == cn.t.fastestConn {
671                 return true
672         }
673         return false
674 }
675
676 func (cn *connection) pieceRequestOrderIter() iter.Func {
677         if cn.shouldRequestWithoutBias() {
678                 return cn.unbiasedPieceRequestOrder()
679         } else {
680                 return cn.pieceRequestOrder.Iter
681         }
682 }
683
684 func (cn *connection) iterPendingRequests(f func(request) bool) {
685         cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
686                 piece := _piece.(int)
687                 return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
688                         r := request{pp.Integer(piece), cs}
689                         // log.Println(r, cn.t.pendingRequests[r], cn.requests)
690                         // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
691                         //      return true
692                         // }
693                         return f(r)
694                 })
695         })
696 }
697
698 func (cn *connection) desiredRequestState() (bool, []request, bool) {
699         return nextRequestState(
700                 cn.t.networkingEnabled,
701                 cn.requests,
702                 cn.PeerChoked,
703                 cn.iterPendingRequests,
704                 cn.requestsLowWater,
705                 cn.nominalMaxRequests(),
706                 cn.peerAllowedFast,
707         )
708 }
709
710 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
711         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
712         // TODO: Use "math/rand".Shuffle >= Go 1.10
713         return iter.ForPerm(len(chunkIndices), func(i int) bool {
714                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
715         })
716 }
717
718 // check callers updaterequests
719 func (cn *connection) stopRequestingPiece(piece int) bool {
720         return cn.pieceRequestOrder.Remove(piece)
721 }
722
723 // This is distinct from Torrent piece priority, which is the user's
724 // preference. Connection piece priority is specific to a connection and is
725 // used to pseudorandomly avoid connections always requesting the same pieces
726 // and thus wasting effort.
727 func (cn *connection) updatePiecePriority(piece int) bool {
728         tpp := cn.t.piecePriority(piece)
729         if !cn.PeerHasPiece(piece) {
730                 tpp = PiecePriorityNone
731         }
732         if tpp == PiecePriorityNone {
733                 return cn.stopRequestingPiece(piece)
734         }
735         prio := cn.getPieceInclination()[piece]
736         switch cn.t.requestStrategy {
737         case 1:
738                 switch tpp {
739                 case PiecePriorityNormal:
740                 case PiecePriorityReadahead:
741                         prio -= cn.t.numPieces()
742                 case PiecePriorityNext, PiecePriorityNow:
743                         prio -= 2 * cn.t.numPieces()
744                 default:
745                         panic(tpp)
746                 }
747                 prio += piece / 3
748         default:
749         }
750         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
751 }
752
753 func (cn *connection) getPieceInclination() []int {
754         if cn.pieceInclination == nil {
755                 cn.pieceInclination = cn.t.getConnPieceInclination()
756         }
757         return cn.pieceInclination
758 }
759
760 func (cn *connection) discardPieceInclination() {
761         if cn.pieceInclination == nil {
762                 return
763         }
764         cn.t.putPieceInclination(cn.pieceInclination)
765         cn.pieceInclination = nil
766 }
767
768 func (cn *connection) peerPiecesChanged() {
769         if cn.t.haveInfo() {
770                 prioritiesChanged := false
771                 for i := range iter.N(cn.t.numPieces()) {
772                         if cn.updatePiecePriority(i) {
773                                 prioritiesChanged = true
774                         }
775                 }
776                 if prioritiesChanged {
777                         cn.updateRequests()
778                 }
779         }
780 }
781
782 func (cn *connection) raisePeerMinPieces(newMin int) {
783         if newMin > cn.peerMinPieces {
784                 cn.peerMinPieces = newMin
785         }
786 }
787
788 func (cn *connection) peerSentHave(piece int) error {
789         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
790                 return errors.New("invalid piece")
791         }
792         if cn.PeerHasPiece(piece) {
793                 return nil
794         }
795         cn.raisePeerMinPieces(piece + 1)
796         cn.peerPieces.Set(piece, true)
797         if cn.updatePiecePriority(piece) {
798                 cn.updateRequests()
799         }
800         return nil
801 }
802
803 func (cn *connection) peerSentBitfield(bf []bool) error {
804         cn.peerSentHaveAll = false
805         if len(bf)%8 != 0 {
806                 panic("expected bitfield length divisible by 8")
807         }
808         // We know that the last byte means that at most the last 7 bits are
809         // wasted.
810         cn.raisePeerMinPieces(len(bf) - 7)
811         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
812                 // Ignore known excess pieces.
813                 bf = bf[:cn.t.numPieces()]
814         }
815         for i, have := range bf {
816                 if have {
817                         cn.raisePeerMinPieces(i + 1)
818                 }
819                 cn.peerPieces.Set(i, have)
820         }
821         cn.peerPiecesChanged()
822         return nil
823 }
824
825 func (cn *connection) onPeerSentHaveAll() error {
826         cn.peerSentHaveAll = true
827         cn.peerPieces.Clear()
828         cn.peerPiecesChanged()
829         return nil
830 }
831
832 func (cn *connection) peerSentHaveNone() error {
833         cn.peerPieces.Clear()
834         cn.peerSentHaveAll = false
835         cn.peerPiecesChanged()
836         return nil
837 }
838
839 func (c *connection) requestPendingMetadata() {
840         if c.t.haveInfo() {
841                 return
842         }
843         if c.PeerExtensionIDs["ut_metadata"] == 0 {
844                 // Peer doesn't support this.
845                 return
846         }
847         // Request metadata pieces that we don't have in a random order.
848         var pending []int
849         for index := 0; index < c.t.metadataPieceCount(); index++ {
850                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
851                         pending = append(pending, index)
852                 }
853         }
854         for _, i := range rand.Perm(len(pending)) {
855                 c.requestMetadataPiece(pending[i])
856         }
857 }
858
859 func (cn *connection) wroteMsg(msg *pp.Message) {
860         messageTypesSent.Add(msg.Type.String(), 1)
861         cn.stats.wroteMsg(msg)
862         cn.t.stats.wroteMsg(msg)
863 }
864
865 func (cn *connection) readMsg(msg *pp.Message) {
866         cn.stats.readMsg(msg)
867         cn.t.stats.readMsg(msg)
868 }
869
870 func (cn *connection) wroteBytes(n int64) {
871         cn.stats.wroteBytes(n)
872         if cn.t != nil {
873                 cn.t.stats.wroteBytes(n)
874         }
875 }
876
877 func (cn *connection) readBytes(n int64) {
878         cn.stats.readBytes(n)
879         if cn.t != nil {
880                 cn.t.stats.readBytes(n)
881         }
882 }
883
884 // Returns whether the connection could be useful to us. We're seeding and
885 // they want data, we don't have metainfo and they can provide it, etc.
886 func (c *connection) useful() bool {
887         t := c.t
888         if c.closed.IsSet() {
889                 return false
890         }
891         if !t.haveInfo() {
892                 return c.supportsExtension("ut_metadata")
893         }
894         if t.seeding() && c.PeerInterested {
895                 return true
896         }
897         if c.peerHasWantedPieces() {
898                 return true
899         }
900         return false
901 }
902
903 func (c *connection) lastHelpful() (ret time.Time) {
904         ret = c.lastUsefulChunkReceived
905         if c.t.seeding() && c.lastChunkSent.After(ret) {
906                 ret = c.lastChunkSent
907         }
908         return
909 }
910
911 func (c *connection) fastEnabled() bool {
912         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
913 }
914
915 func (c *connection) reject(r request) {
916         if !c.fastEnabled() {
917                 panic("fast not enabled")
918         }
919         c.Post(r.ToMsg(pp.Reject))
920         delete(c.PeerRequests, r)
921 }
922
923 func (c *connection) onReadRequest(r request) error {
924         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
925         if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
926                 torrent.Add("bad requests received", 1)
927                 return errors.New("bad request")
928         }
929         if _, ok := c.PeerRequests[r]; ok {
930                 torrent.Add("duplicate requests received", 1)
931                 return nil
932         }
933         if c.Choked {
934                 torrent.Add("requests received while choking", 1)
935                 if c.fastEnabled() {
936                         torrent.Add("requests rejected while choking", 1)
937                         c.reject(r)
938                 }
939                 return nil
940         }
941         if len(c.PeerRequests) >= maxRequests {
942                 torrent.Add("requests received while queue full", 1)
943                 if c.fastEnabled() {
944                         c.reject(r)
945                 }
946                 // BEP 6 says we may close here if we choose.
947                 return nil
948         }
949         if !c.t.havePiece(r.Index.Int()) {
950                 // This isn't necessarily them screwing up. We can drop pieces
951                 // from our storage, and can't communicate this to peers
952                 // except by reconnecting.
953                 requestsReceivedForMissingPieces.Add(1)
954                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
955         }
956         if c.PeerRequests == nil {
957                 c.PeerRequests = make(map[request]struct{}, maxRequests)
958         }
959         c.PeerRequests[r] = struct{}{}
960         c.tickleWriter()
961         return nil
962 }
963
964 // Processes incoming bittorrent messages. The client lock is held upon entry
965 // and exit. Returning will end the connection.
966 func (c *connection) mainReadLoop() (err error) {
967         defer func() {
968                 if err != nil {
969                         torrent.Add("connection.mainReadLoop returned with error", 1)
970                 } else {
971                         torrent.Add("connection.mainReadLoop returned with no error", 1)
972                 }
973         }()
974         t := c.t
975         cl := t.cl
976
977         decoder := pp.Decoder{
978                 R:         bufio.NewReaderSize(c.r, 1<<17),
979                 MaxLength: 256 * 1024,
980                 Pool:      t.chunkPool,
981         }
982         for {
983                 var msg pp.Message
984                 func() {
985                         cl.mu.Unlock()
986                         defer cl.mu.Lock()
987                         err = decoder.Decode(&msg)
988                 }()
989                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
990                         return nil
991                 }
992                 if err != nil {
993                         return err
994                 }
995                 c.readMsg(&msg)
996                 c.lastMessageReceived = time.Now()
997                 if msg.Keepalive {
998                         receivedKeepalives.Add(1)
999                         continue
1000                 }
1001                 messageTypesReceived.Add(msg.Type.String(), 1)
1002                 if msg.Type.FastExtension() && !c.fastEnabled() {
1003                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1004                 }
1005                 switch msg.Type {
1006                 case pp.Choke:
1007                         c.PeerChoked = true
1008                         c.deleteAllRequests()
1009                         // We can then reset our interest.
1010                         c.updateRequests()
1011                 case pp.Reject:
1012                         c.deleteRequest(newRequestFromMessage(&msg))
1013                 case pp.Unchoke:
1014                         c.PeerChoked = false
1015                         c.tickleWriter()
1016                 case pp.Interested:
1017                         c.PeerInterested = true
1018                         c.tickleWriter()
1019                 case pp.NotInterested:
1020                         c.PeerInterested = false
1021                         // TODO: Reject?
1022                         c.PeerRequests = nil
1023                 case pp.Have:
1024                         err = c.peerSentHave(int(msg.Index))
1025                 case pp.Request:
1026                         r := newRequestFromMessage(&msg)
1027                         err = c.onReadRequest(r)
1028                 case pp.Cancel:
1029                         req := newRequestFromMessage(&msg)
1030                         c.onPeerSentCancel(req)
1031                 case pp.Bitfield:
1032                         err = c.peerSentBitfield(msg.Bitfield)
1033                 case pp.HaveAll:
1034                         err = c.onPeerSentHaveAll()
1035                 case pp.HaveNone:
1036                         err = c.peerSentHaveNone()
1037                 case pp.Piece:
1038                         c.receiveChunk(&msg)
1039                         if len(msg.Piece) == int(t.chunkSize) {
1040                                 t.chunkPool.Put(&msg.Piece)
1041                         }
1042                 case pp.Extended:
1043                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1044                 case pp.Port:
1045                         if cl.dHT == nil {
1046                                 break
1047                         }
1048                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1049                         if err != nil {
1050                                 panic(err)
1051                         }
1052                         if msg.Port != 0 {
1053                                 pingAddr.Port = int(msg.Port)
1054                         }
1055                         go cl.dHT.Ping(pingAddr, nil)
1056                 case pp.AllowedFast:
1057                         torrent.Add("allowed fasts received", 1)
1058                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1059                         c.peerAllowedFast.Add(int(msg.Index))
1060                         c.updateRequests()
1061                 case pp.Suggest:
1062                         torrent.Add("suggests received", 1)
1063                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1064                         c.updateRequests()
1065                 default:
1066                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1067                 }
1068                 if err != nil {
1069                         return err
1070                 }
1071         }
1072 }
1073
1074 func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
1075         defer func() {
1076                 // TODO: Should we still do this?
1077                 if err != nil {
1078                         // These clients use their own extension IDs for outgoing message
1079                         // types, which is incorrect.
1080                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1081                                 err = nil
1082                         }
1083                 }
1084         }()
1085         t := c.t
1086         cl := t.cl
1087         switch id {
1088         case pp.HandshakeExtendedID:
1089                 // TODO: Create a bencode struct for this.
1090                 var d map[string]interface{}
1091                 err := bencode.Unmarshal(payload, &d)
1092                 if err != nil {
1093                         return fmt.Errorf("error decoding extended message payload: %s", err)
1094                 }
1095                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1096                 if reqq, ok := d["reqq"]; ok {
1097                         if i, ok := reqq.(int64); ok {
1098                                 c.PeerMaxRequests = int(i)
1099                         }
1100                 }
1101                 if v, ok := d["v"]; ok {
1102                         c.PeerClientName = v.(string)
1103                 }
1104                 if m, ok := d["m"]; ok {
1105                         mTyped, ok := m.(map[string]interface{})
1106                         if !ok {
1107                                 return errors.New("handshake m value is not dict")
1108                         }
1109                         if c.PeerExtensionIDs == nil {
1110                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1111                         }
1112                         for name, v := range mTyped {
1113                                 id, ok := v.(int64)
1114                                 if !ok {
1115                                         log.Printf("bad handshake m item extension ID type: %T", v)
1116                                         continue
1117                                 }
1118                                 if id == 0 {
1119                                         delete(c.PeerExtensionIDs, name)
1120                                 } else {
1121                                         if c.PeerExtensionIDs[name] == 0 {
1122                                                 supportedExtensionMessages.Add(name, 1)
1123                                         }
1124                                         c.PeerExtensionIDs[name] = byte(id)
1125                                 }
1126                         }
1127                 }
1128                 metadata_sizeUntyped, ok := d["metadata_size"]
1129                 if ok {
1130                         metadata_size, ok := metadata_sizeUntyped.(int64)
1131                         if !ok {
1132                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1133                         } else {
1134                                 err = t.setMetadataSize(metadata_size)
1135                                 if err != nil {
1136                                         return fmt.Errorf("error setting metadata size to %d", metadata_size)
1137                                 }
1138                         }
1139                 }
1140                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1141                         c.requestPendingMetadata()
1142                 }
1143                 return nil
1144         case metadataExtendedId:
1145                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1146                 if err != nil {
1147                         return fmt.Errorf("error handling metadata extension message: %s", err)
1148                 }
1149                 return nil
1150         case pexExtendedId:
1151                 if cl.config.DisablePEX {
1152                         // TODO: Maybe close the connection. Check that we're not
1153                         // advertising that we support PEX if it's disabled.
1154                         return nil
1155                 }
1156                 var pexMsg peerExchangeMessage
1157                 err := bencode.Unmarshal(payload, &pexMsg)
1158                 if err != nil {
1159                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1160                 }
1161                 torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
1162                 t.addPeers(pexMsg.AddedPeers())
1163                 return nil
1164         default:
1165                 return fmt.Errorf("unexpected extended message ID: %v", id)
1166         }
1167 }
1168
1169 // Set both the Reader and Writer for the connection from a single ReadWriter.
1170 func (cn *connection) setRW(rw io.ReadWriter) {
1171         cn.r = rw
1172         cn.w = rw
1173 }
1174
1175 // Returns the Reader and Writer as a combined ReadWriter.
1176 func (cn *connection) rw() io.ReadWriter {
1177         return struct {
1178                 io.Reader
1179                 io.Writer
1180         }{cn.r, cn.w}
1181 }
1182
1183 // Handle a received chunk from a peer.
1184 func (c *connection) receiveChunk(msg *pp.Message) {
1185         t := c.t
1186         cl := t.cl
1187         chunksReceived.Add(1)
1188
1189         req := newRequestFromMessage(msg)
1190
1191         // Request has been satisfied.
1192         if c.deleteRequest(req) {
1193                 c.updateRequests()
1194         } else {
1195                 unexpectedChunksReceived.Add(1)
1196         }
1197
1198         if c.PeerChoked {
1199                 torrent.Add("chunks received while choked", 1)
1200                 if c.peerAllowedFast.Get(int(req.Index)) {
1201                         torrent.Add("chunks received due to allowed fast", 1)
1202                 }
1203         }
1204
1205         // Do we actually want this chunk?
1206         if !t.wantPiece(req) {
1207                 unwantedChunksReceived.Add(1)
1208                 c.stats.ChunksReadUnwanted++
1209                 c.t.stats.ChunksReadUnwanted++
1210                 return
1211         }
1212
1213         index := int(req.Index)
1214         piece := &t.pieces[index]
1215
1216         c.stats.ChunksReadUseful++
1217         c.t.stats.ChunksReadUseful++
1218         c.stats.BytesReadUsefulData += int64(len(msg.Piece))
1219         c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
1220         c.lastUsefulChunkReceived = time.Now()
1221         // if t.fastestConn != c {
1222         // log.Printf("setting fastest connection %p", c)
1223         // }
1224         t.fastestConn = c
1225
1226         // Need to record that it hasn't been written yet, before we attempt to do
1227         // anything with it.
1228         piece.incrementPendingWrites()
1229         // Record that we have the chunk, so we aren't trying to download it while
1230         // waiting for it to be written to storage.
1231         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1232
1233         // Cancel pending requests for this chunk.
1234         for c := range t.conns {
1235                 c.postCancel(req)
1236         }
1237
1238         err := func() error {
1239                 cl.mu.Unlock()
1240                 defer cl.mu.Lock()
1241                 // Write the chunk out. Note that the upper bound on chunk writing
1242                 // concurrency will be the number of connections. We write inline with
1243                 // receiving the chunk (with this lock dance), because we want to
1244                 // handle errors synchronously and I haven't thought of a nice way to
1245                 // defer any concurrency to the storage and have that notify the
1246                 // client of errors. TODO: Do that instead.
1247                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1248         }()
1249
1250         piece.decrementPendingWrites()
1251
1252         if err != nil {
1253                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1254                 t.pendRequest(req)
1255                 t.updatePieceCompletion(int(msg.Index))
1256                 return
1257         }
1258
1259         // It's important that the piece is potentially queued before we check if
1260         // the piece is still wanted, because if it is queued, it won't be wanted.
1261         if t.pieceAllDirty(index) {
1262                 t.queuePieceCheck(int(req.Index))
1263                 t.pendAllChunkSpecs(index)
1264         }
1265
1266         c.onDirtiedPiece(index)
1267
1268         cl.event.Broadcast()
1269         t.publishPieceChange(int(req.Index))
1270 }
1271
1272 func (c *connection) onDirtiedPiece(piece int) {
1273         if c.peerTouchedPieces == nil {
1274                 c.peerTouchedPieces = make(map[int]struct{})
1275         }
1276         c.peerTouchedPieces[piece] = struct{}{}
1277         ds := &c.t.pieces[piece].dirtiers
1278         if *ds == nil {
1279                 *ds = make(map[*connection]struct{})
1280         }
1281         (*ds)[c] = struct{}{}
1282 }
1283
1284 func (c *connection) uploadAllowed() bool {
1285         if c.t.cl.config.NoUpload {
1286                 return false
1287         }
1288         if c.t.seeding() {
1289                 return true
1290         }
1291         if !c.peerHasWantedPieces() {
1292                 return false
1293         }
1294         // Don't upload more than 100 KiB more than we download.
1295         if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
1296                 return false
1297         }
1298         return true
1299 }
1300
1301 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1302         if c.uploadTimer == nil {
1303                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1304         } else {
1305                 c.uploadTimer.Reset(delay)
1306         }
1307 }
1308
1309 // Also handles choking and unchoking of the remote peer.
1310 func (c *connection) upload(msg func(pp.Message) bool) bool {
1311         // Breaking or completing this loop means we don't want to upload to the
1312         // peer anymore, and we choke them.
1313 another:
1314         for c.uploadAllowed() {
1315                 // We want to upload to the peer.
1316                 if !c.Unchoke(msg) {
1317                         return false
1318                 }
1319                 for r := range c.PeerRequests {
1320                         res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1321                         if !res.OK() {
1322                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1323                         }
1324                         delay := res.Delay()
1325                         if delay > 0 {
1326                                 res.Cancel()
1327                                 c.setRetryUploadTimer(delay)
1328                                 // Hard to say what to return here.
1329                                 return true
1330                         }
1331                         more, err := c.sendChunk(r, msg)
1332                         if err != nil {
1333                                 i := int(r.Index)
1334                                 if c.t.pieceComplete(i) {
1335                                         c.t.updatePieceCompletion(i)
1336                                         if !c.t.pieceComplete(i) {
1337                                                 // We had the piece, but not anymore.
1338                                                 break another
1339                                         }
1340                                 }
1341                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1342                                 // If we failed to send a chunk, choke the peer to ensure they
1343                                 // flush all their requests. We've probably dropped a piece,
1344                                 // but there's no way to communicate this to the peer. If they
1345                                 // ask for it again, we'll kick them to allow us to send them
1346                                 // an updated bitfield.
1347                                 break another
1348                         }
1349                         delete(c.PeerRequests, r)
1350                         if !more {
1351                                 return false
1352                         }
1353                         goto another
1354                 }
1355                 return true
1356         }
1357         return c.Choke(msg)
1358 }
1359
1360 func (cn *connection) Drop() {
1361         cn.t.dropConnection(cn)
1362 }
1363
1364 func (cn *connection) netGoodPiecesDirtied() int64 {
1365         return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
1366 }
1367
1368 func (c *connection) peerHasWantedPieces() bool {
1369         return !c.pieceRequestOrder.IsEmpty()
1370 }
1371
1372 func (c *connection) numLocalRequests() int {
1373         return len(c.requests)
1374 }
1375
1376 func (c *connection) deleteRequest(r request) bool {
1377         if _, ok := c.requests[r]; !ok {
1378                 return false
1379         }
1380         delete(c.requests, r)
1381         c.t.pendingRequests[r]--
1382         c.updateRequests()
1383         return true
1384 }
1385
1386 func (c *connection) deleteAllRequests() {
1387         for r := range c.requests {
1388                 c.deleteRequest(r)
1389         }
1390         if len(c.requests) != 0 {
1391                 panic(len(c.requests))
1392         }
1393         // for c := range c.t.conns {
1394         //      c.tickleWriter()
1395         // }
1396 }
1397
1398 func (c *connection) tickleWriter() {
1399         c.writerCond.Broadcast()
1400 }
1401
1402 func (c *connection) postCancel(r request) bool {
1403         if !c.deleteRequest(r) {
1404                 return false
1405         }
1406         c.Post(makeCancelMessage(r))
1407         return true
1408 }
1409
1410 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1411         // Count the chunk being sent, even if it isn't.
1412         b := make([]byte, r.Length)
1413         p := c.t.info.Piece(int(r.Index))
1414         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1415         if n != len(b) {
1416                 if err == nil {
1417                         panic("expected error")
1418                 }
1419                 return
1420         } else if err == io.EOF {
1421                 err = nil
1422         }
1423         more = msg(pp.Message{
1424                 Type:  pp.Piece,
1425                 Index: r.Index,
1426                 Begin: r.Begin,
1427                 Piece: b,
1428         })
1429         c.lastChunkSent = time.Now()
1430         return
1431 }
1432
1433 func (c *connection) setTorrent(t *Torrent) {
1434         if c.t != nil {
1435                 panic("connection already associated with a torrent")
1436         }
1437         c.t = t
1438         t.conns[c] = struct{}{}
1439 }