17 "github.com/anacrolix/missinggo"
18 "github.com/anacrolix/missinggo/bitmap"
19 "github.com/anacrolix/missinggo/iter"
20 "github.com/anacrolix/missinggo/prioritybitmap"
22 "github.com/anacrolix/torrent/bencode"
23 "github.com/anacrolix/torrent/mse"
24 pp "github.com/anacrolix/torrent/peer_protocol"
27 type peerSource string
30 peerSourceTracker = "T" // It's the default.
31 peerSourceIncoming = "I"
32 peerSourceDHTGetPeers = "Hg"
33 peerSourceDHTAnnouncePeer = "Ha"
37 // Maintains the state of a connection with a peer.
38 type connection struct {
40 // The actual Conn, used for closing, and setting socket options.
42 // The Reader and Writer for this Conn, with hooks installed for stats,
43 // limiting, deadlines etc.
46 // True if the connection is operating over MSE obfuscation.
51 closed missinggo.Event
54 UnwantedChunksReceived int
55 UsefulChunksReceived int
60 lastMessageReceived time.Time
61 completedHandshake time.Time
62 lastUsefulChunkReceived time.Time
63 lastChunkSent time.Time
65 // Stuff controlled by the local peer.
68 requests map[request]struct{}
70 // Indexed by metadata piece, set to true if posted and pending a
72 metadataRequests []bool
75 // Stuff controlled by the remote peer.
79 PeerRequests map[request]struct{}
80 PeerExtensionBytes peerExtensionBytes
81 // The pieces the peer has claimed to have.
82 peerPieces bitmap.Bitmap
83 // The peer has everything. This can occur due to a special message, when
84 // we may not even know the number of pieces in the torrent yet.
86 // The highest possible number of pieces the torrent could have based on
87 // communication with the peer. Generally only useful until we have the
90 // Pieces we've accepted chunks for from the peer.
91 peerTouchedPieces map[int]struct{}
93 PeerMaxRequests int // Maximum pending requests the peer allows.
94 PeerExtensionIDs map[string]byte
97 pieceInclination []int
98 pieceRequestOrder prioritybitmap.PriorityBitmap
100 postedBuffer bytes.Buffer
101 uploadTimer *time.Timer
105 func (cn *connection) mu() sync.Locker {
109 func (cn *connection) remoteAddr() net.Addr {
110 return cn.conn.RemoteAddr()
113 func (cn *connection) localAddr() net.Addr {
114 return cn.conn.LocalAddr()
117 func (cn *connection) supportsExtension(ext string) bool {
118 _, ok := cn.PeerExtensionIDs[ext]
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
125 return cn.t.numPieces()
127 return cn.peerMinPieces
130 func (cn *connection) completedString() string {
131 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
137 func (cn *connection) setNumPieces(num int) error {
138 cn.peerPieces.RemoveRange(num, -1)
139 cn.peerPiecesChanged()
143 func eventAgeString(t time.Time) string {
147 return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
150 func (cn *connection) connectionFlags() (ret string) {
152 ret += string([]byte{b})
154 if cn.cryptoMethod == mse.CryptoMethodRC4 {
156 } else if cn.headerEncrypted {
159 ret += string(cn.Discovery)
166 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
167 func (cn *connection) statusFlags() (ret string) {
169 ret += string([]byte{b})
178 ret += cn.connectionFlags()
180 if cn.PeerInterested {
189 func (cn *connection) String() string {
191 cn.WriteStatus(&buf, nil)
195 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
196 // \t isn't preserved in <pre> blocks?
197 fmt.Fprintf(w, "%-40s: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
198 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
199 eventAgeString(cn.lastMessageReceived),
200 eventAgeString(cn.completedHandshake),
201 eventAgeString(cn.lastUsefulChunkReceived))
203 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
204 cn.completedString(),
205 len(cn.peerTouchedPieces),
206 cn.UsefulChunksReceived,
207 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
209 cn.numLocalRequests(),
210 len(cn.PeerRequests),
213 roi := cn.pieceRequestOrderIter()
214 fmt.Fprintf(w, " next pieces: %v%s\n",
215 iter.ToSlice(iter.Head(10, roi)),
217 if cn.shouldRequestWithoutBias() {
225 func (cn *connection) Close() {
226 if !cn.closed.Set() {
229 cn.discardPieceInclination()
230 cn.pieceRequestOrder.Clear()
236 func (cn *connection) PeerHasPiece(piece int) bool {
237 return cn.peerHasAll || cn.peerPieces.Contains(piece)
240 func (cn *connection) Post(msg pp.Message) {
241 messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
242 cn.postedBuffer.Write(msg.MustMarshalBinary())
246 func (cn *connection) requestMetadataPiece(index int) {
247 eID := cn.PeerExtensionIDs["ut_metadata"]
251 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
257 ExtendedPayload: func() []byte {
258 b, err := bencode.Marshal(map[string]int{
259 "msg_type": pp.RequestMetadataExtensionMsgType,
268 for index >= len(cn.metadataRequests) {
269 cn.metadataRequests = append(cn.metadataRequests, false)
271 cn.metadataRequests[index] = true
274 func (cn *connection) requestedMetadataPiece(index int) bool {
275 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
278 // The actual value to use as the maximum outbound requests.
279 func (cn *connection) nominalMaxRequests() (ret int) {
280 ret = cn.PeerMaxRequests
287 // Returns true if an unsatisfied request was canceled.
288 func (cn *connection) PeerCancel(r request) bool {
289 if cn.PeerRequests == nil {
292 if _, ok := cn.PeerRequests[r]; !ok {
295 delete(cn.PeerRequests, r)
299 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
303 cn.PeerRequests = nil
305 return msg(pp.Message{
310 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
315 return msg(pp.Message{
320 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
321 if cn.Interested == interested {
324 cn.Interested = interested
325 // log.Printf("%p: setting interest: %v", cn, interested)
326 return msg(pp.Message{
327 Type: func() pp.MessageType {
331 return pp.NotInterested
337 // The function takes a message to be sent, and returns true if more messages
339 type messageWriter func(pp.Message) bool
341 func (cn *connection) request(r request, mw messageWriter) bool {
342 if cn.requests == nil {
343 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
345 if _, ok := cn.requests[r]; ok {
346 panic("chunk already requested")
348 if !cn.PeerHasPiece(r.Index.Int()) {
349 panic("requesting piece peer doesn't have")
351 cn.requests[r] = struct{}{}
352 return mw(pp.Message{
360 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
361 numFillBuffers.Add(1)
362 cancel, new, i := cn.desiredRequestState()
363 if !cn.SetInterested(i, msg) {
366 if cancel && len(cn.requests) != 0 {
367 fillBufferSentCancels.Add(1)
368 for r := range cn.requests {
370 // log.Printf("%p: cancelling request: %v", cn, r)
371 if !msg(makeCancelMessage(r)) {
377 fillBufferSentRequests.Add(1)
378 for _, r := range new {
379 if !cn.request(r, msg) {
380 // If we didn't completely top up the requests, we shouldn't
381 // mark the low water, since we'll want to top up the requests
382 // as soon as we have more write buffer space.
386 cn.requestsLowWater = len(cn.requests) / 2
391 // Routine that writes to the peer. Some of what to write is buffered by
392 // activity elsewhere in the Client, and some is determined locally when the
393 // connection is writable.
394 func (cn *connection) writer(keepAliveTimeout time.Duration) {
397 lastWrite time.Time = time.Now()
399 var keepAliveTimer *time.Timer
400 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
402 defer cn.mu().Unlock()
403 if time.Since(lastWrite) >= keepAliveTimeout {
406 keepAliveTimer.Reset(keepAliveTimeout)
409 defer cn.mu().Unlock()
411 defer keepAliveTimer.Stop()
413 buf.Write(cn.postedBuffer.Bytes())
414 cn.postedBuffer.Reset()
416 cn.fillWriteBuffer(func(msg pp.Message) bool {
418 buf.Write(msg.MustMarshalBinary())
419 return buf.Len() < 1<<16
422 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
423 buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
424 postedKeepalives.Add(1)
427 // TODO: Minimize wakeups....
432 // log.Printf("writing %d bytes", buf.Len())
433 n, err := cn.w.Write(buf.Bytes())
436 lastWrite = time.Now()
437 keepAliveTimer.Reset(keepAliveTimeout)
449 func (cn *connection) Have(piece int) {
450 for piece >= len(cn.sentHaves) {
451 cn.sentHaves = append(cn.sentHaves, false)
453 if cn.sentHaves[piece] {
458 Index: pp.Integer(piece),
460 cn.sentHaves[piece] = true
463 func (cn *connection) Bitfield(haves []bool) {
464 if cn.sentHaves != nil {
465 panic("bitfield must be first have-related message sent")
471 // Make a copy of haves, as that's read when the message is marshalled
472 // without the lock. Also it obviously shouldn't change in the Msg due to
473 // changes in .sentHaves.
474 cn.sentHaves = append([]bool(nil), haves...)
477 func nextRequestState(
478 networkingEnabled bool,
479 currentRequests map[request]struct{},
481 requestPieces iter.Func,
482 pendingChunks func(piece int, f func(chunkSpec) bool) bool,
483 requestsLowWater int,
484 requestsHighWater int,
486 cancelExisting bool, // Cancel all our pending requests
487 newRequests []request, // Chunks to request that we currently aren't
488 interested bool, // Whether we should indicate interest, even if we don't request anything
490 if !networkingEnabled {
491 return true, nil, false
493 if len(currentRequests) > requestsLowWater {
494 return false, nil, true
496 requestPieces(func(_piece interface{}) bool {
501 piece := _piece.(int)
502 return pendingChunks(piece, func(cs chunkSpec) bool {
503 r := request{pp.Integer(piece), cs}
504 if _, ok := currentRequests[r]; !ok {
505 if newRequests == nil {
506 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
508 newRequests = append(newRequests, r)
510 return len(currentRequests)+len(newRequests) < requestsHighWater
516 func (cn *connection) updateRequests() {
520 // Emits the indices in the Bitmaps bms in order, never repeating any index.
521 // skip is mutated during execution, and its initial values will never be
523 func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
524 return func(cb iter.Callback) {
525 for _, bm := range bms {
526 if !iter.All(func(i interface{}) bool {
529 }, bitmap.Sub(bm, skip).Iter) {
536 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
537 now, readahead := cn.t.readerPiecePriorities()
538 // Pieces to skip include pieces the peer doesn't have
539 skip := bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
540 // And pieces that we already have.
541 skip.Union(cn.t.completedPieces)
542 // Return an iterator over the different priority classes, minus the skip
545 iterBitmapsDistinct(skip, now, readahead),
546 func(cb iter.Callback) {
547 cn.t.pendingPieces.IterTyped(func(piece int) bool {
548 if skip.Contains(piece) {
559 // The connection should download highest priority pieces first, without any
560 // inclination toward avoiding wastage. Generally we might do this if there's
561 // a single connection, or this is the fastest connection, and we have active
562 // readers that signal an ordering preference. It's conceivable that the best
563 // connection should do this, since it's least likely to waste our time if
564 // assigned to the highest priority pieces, and assigning more than one this
565 // role would cause significant wasted bandwidth.
566 func (cn *connection) shouldRequestWithoutBias() bool {
567 if cn.t.requestStrategy != 2 {
570 if len(cn.t.readers) == 0 {
573 if len(cn.t.conns) == 1 {
576 if cn == cn.t.fastestConn {
582 func (cn *connection) pieceRequestOrderIter() iter.Func {
583 if cn.shouldRequestWithoutBias() {
584 return cn.unbiasedPieceRequestOrder()
586 return cn.pieceRequestOrder.Iter
590 func (cn *connection) desiredRequestState() (bool, []request, bool) {
591 return nextRequestState(
592 cn.t.networkingEnabled,
595 cn.pieceRequestOrderIter(),
596 func(piece int, f func(chunkSpec) bool) bool {
597 return undirtiedChunks(piece, cn.t, f)
600 cn.nominalMaxRequests(),
604 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
605 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
606 return iter.ForPerm(len(chunkIndices), func(i int) bool {
607 return f(t.chunkIndexSpec(chunkIndices[i], piece))
611 // check callers updaterequests
612 func (cn *connection) stopRequestingPiece(piece int) bool {
613 return cn.pieceRequestOrder.Remove(piece)
616 // This is distinct from Torrent piece priority, which is the user's
617 // preference. Connection piece priority is specific to a connection and is
618 // used to pseudorandomly avoid connections always requesting the same pieces
619 // and thus wasting effort.
620 func (cn *connection) updatePiecePriority(piece int) bool {
621 tpp := cn.t.piecePriority(piece)
622 if !cn.PeerHasPiece(piece) {
623 tpp = PiecePriorityNone
625 if tpp == PiecePriorityNone {
626 return cn.stopRequestingPiece(piece)
628 prio := cn.getPieceInclination()[piece]
629 switch cn.t.requestStrategy {
632 case PiecePriorityNormal:
633 case PiecePriorityReadahead:
634 prio -= cn.t.numPieces()
635 case PiecePriorityNext, PiecePriorityNow:
636 prio -= 2 * cn.t.numPieces()
643 return cn.pieceRequestOrder.Set(piece, prio)
646 func (cn *connection) getPieceInclination() []int {
647 if cn.pieceInclination == nil {
648 cn.pieceInclination = cn.t.getConnPieceInclination()
650 return cn.pieceInclination
653 func (cn *connection) discardPieceInclination() {
654 if cn.pieceInclination == nil {
657 cn.t.putPieceInclination(cn.pieceInclination)
658 cn.pieceInclination = nil
661 func (cn *connection) peerPiecesChanged() {
663 prioritiesChanged := false
664 for i := range iter.N(cn.t.numPieces()) {
665 if cn.updatePiecePriority(i) {
666 prioritiesChanged = true
669 if prioritiesChanged {
675 func (cn *connection) raisePeerMinPieces(newMin int) {
676 if newMin > cn.peerMinPieces {
677 cn.peerMinPieces = newMin
681 func (cn *connection) peerSentHave(piece int) error {
682 if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
683 return errors.New("invalid piece")
685 if cn.PeerHasPiece(piece) {
688 cn.raisePeerMinPieces(piece + 1)
689 cn.peerPieces.Set(piece, true)
690 if cn.updatePiecePriority(piece) {
696 func (cn *connection) peerSentBitfield(bf []bool) error {
697 cn.peerHasAll = false
699 panic("expected bitfield length divisible by 8")
701 // We know that the last byte means that at most the last 7 bits are
703 cn.raisePeerMinPieces(len(bf) - 7)
704 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
705 // Ignore known excess pieces.
706 bf = bf[:cn.t.numPieces()]
708 for i, have := range bf {
710 cn.raisePeerMinPieces(i + 1)
712 cn.peerPieces.Set(i, have)
714 cn.peerPiecesChanged()
718 func (cn *connection) peerSentHaveAll() error {
720 cn.peerPieces.Clear()
721 cn.peerPiecesChanged()
725 func (cn *connection) peerSentHaveNone() error {
726 cn.peerPieces.Clear()
727 cn.peerHasAll = false
728 cn.peerPiecesChanged()
732 func (c *connection) requestPendingMetadata() {
736 if c.PeerExtensionIDs["ut_metadata"] == 0 {
737 // Peer doesn't support this.
740 // Request metadata pieces that we don't have in a random order.
742 for index := 0; index < c.t.metadataPieceCount(); index++ {
743 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
744 pending = append(pending, index)
747 for _, i := range rand.Perm(len(pending)) {
748 c.requestMetadataPiece(pending[i])
752 func (cn *connection) wroteMsg(msg *pp.Message) {
753 messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
754 cn.stats.wroteMsg(msg)
755 cn.t.stats.wroteMsg(msg)
758 func (cn *connection) readMsg(msg *pp.Message) {
759 cn.stats.readMsg(msg)
760 cn.t.stats.readMsg(msg)
763 func (cn *connection) wroteBytes(n int64) {
764 cn.stats.wroteBytes(n)
766 cn.t.stats.wroteBytes(n)
770 func (cn *connection) readBytes(n int64) {
771 cn.stats.readBytes(n)
773 cn.t.stats.readBytes(n)
777 // Returns whether the connection is currently useful to us. We're seeding and
778 // they want data, we don't have metainfo and they can provide it, etc.
779 func (c *connection) useful() bool {
781 if c.closed.IsSet() {
785 return c.supportsExtension("ut_metadata")
788 return c.PeerInterested
790 return c.peerHasWantedPieces()
793 func (c *connection) lastHelpful() (ret time.Time) {
794 ret = c.lastUsefulChunkReceived
795 if c.t.seeding() && c.lastChunkSent.After(ret) {
796 ret = c.lastChunkSent
801 // Processes incoming bittorrent messages. The client lock is held upon entry
802 // and exit. Returning will end the connection.
803 func (c *connection) mainReadLoop() error {
807 decoder := pp.Decoder{
808 R: bufio.NewReaderSize(c.r, 1<<17),
809 MaxLength: 256 * 1024,
820 err = decoder.Decode(&msg)
822 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
829 c.lastMessageReceived = time.Now()
831 receivedKeepalives.Add(1)
834 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
839 // We can then reset our interest.
842 if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
849 c.PeerInterested = true
851 case pp.NotInterested:
852 c.PeerInterested = false
855 err = c.peerSentHave(int(msg.Index))
860 if len(c.PeerRequests) >= maxRequests {
861 // TODO: Should we drop them or Choke them instead?
864 if !t.havePiece(msg.Index.Int()) {
865 // This isn't necessarily them screwing up. We can drop pieces
866 // from our storage, and can't communicate this to peers
867 // except by reconnecting.
868 requestsReceivedForMissingPieces.Add(1)
869 err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
872 if c.PeerRequests == nil {
873 c.PeerRequests = make(map[request]struct{}, maxRequests)
875 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
878 req := newRequest(msg.Index, msg.Begin, msg.Length)
879 if !c.PeerCancel(req) {
880 unexpectedCancels.Add(1)
883 err = c.peerSentBitfield(msg.Bitfield)
885 err = c.peerSentHaveAll()
887 err = c.peerSentHaveNone()
890 if len(msg.Piece) == int(t.chunkSize) {
891 t.chunkPool.Put(&msg.Piece)
894 switch msg.ExtendedID {
895 case pp.HandshakeExtendedID:
896 // TODO: Create a bencode struct for this.
897 var d map[string]interface{}
898 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
900 err = fmt.Errorf("error decoding extended message payload: %s", err)
903 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
904 if reqq, ok := d["reqq"]; ok {
905 if i, ok := reqq.(int64); ok {
906 c.PeerMaxRequests = int(i)
909 if v, ok := d["v"]; ok {
910 c.PeerClientName = v.(string)
912 if m, ok := d["m"]; ok {
913 mTyped, ok := m.(map[string]interface{})
915 err = errors.New("handshake m value is not dict")
918 if c.PeerExtensionIDs == nil {
919 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
921 for name, v := range mTyped {
924 log.Printf("bad handshake m item extension ID type: %T", v)
928 delete(c.PeerExtensionIDs, name)
930 if c.PeerExtensionIDs[name] == 0 {
931 supportedExtensionMessages.Add(name, 1)
933 c.PeerExtensionIDs[name] = byte(id)
937 metadata_sizeUntyped, ok := d["metadata_size"]
939 metadata_size, ok := metadata_sizeUntyped.(int64)
941 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
943 err = t.setMetadataSize(metadata_size)
945 err = fmt.Errorf("error setting metadata size to %d", metadata_size)
950 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
951 c.requestPendingMetadata()
953 case metadataExtendedId:
954 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
956 err = fmt.Errorf("error handling metadata extension message: %s", err)
959 if cl.config.DisablePEX {
962 var pexMsg peerExchangeMessage
963 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
965 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
970 t.addPeers(func() (ret []Peer) {
971 for i, cp := range pexMsg.Added {
975 Source: peerSourcePEX,
977 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
978 p.SupportsEncryption = true
980 missinggo.CopyExact(p.IP, cp.IP[:])
988 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
991 // That client uses its own extension IDs for outgoing message
992 // types, which is incorrect.
993 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
994 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1002 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1007 pingAddr.Port = int(msg.Port)
1009 go cl.dHT.Ping(pingAddr, nil)
1011 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1019 // Set both the Reader and Writer for the connection from a single ReadWriter.
1020 func (cn *connection) setRW(rw io.ReadWriter) {
1025 // Returns the Reader and Writer as a combined ReadWriter.
1026 func (cn *connection) rw() io.ReadWriter {
1033 // Handle a received chunk from a peer.
1034 func (c *connection) receiveChunk(msg *pp.Message) {
1037 chunksReceived.Add(1)
1039 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1041 // Request has been satisfied.
1042 if c.deleteRequest(req) {
1045 unexpectedChunksReceived.Add(1)
1048 // Do we actually want this chunk?
1049 if !t.wantPiece(req) {
1050 unwantedChunksReceived.Add(1)
1051 c.UnwantedChunksReceived++
1055 index := int(req.Index)
1056 piece := &t.pieces[index]
1058 c.UsefulChunksReceived++
1059 c.lastUsefulChunkReceived = time.Now()
1060 // if t.fastestConn != c {
1061 // log.Printf("setting fastest connection %p", c)
1065 // Need to record that it hasn't been written yet, before we attempt to do
1066 // anything with it.
1067 piece.incrementPendingWrites()
1068 // Record that we have the chunk, so we aren't trying to download it while
1069 // waiting for it to be written to storage.
1070 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1072 // Cancel pending requests for this chunk.
1073 for c := range t.conns {
1077 err := func() error {
1080 // Write the chunk out. Note that the upper bound on chunk writing
1081 // concurrency will be the number of connections. We write inline with
1082 // receiving the chunk (with this lock dance), because we want to
1083 // handle errors synchronously and I haven't thought of a nice way to
1084 // defer any concurrency to the storage and have that notify the
1085 // client of errors. TODO: Do that instead.
1086 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1089 piece.decrementPendingWrites()
1092 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1094 t.updatePieceCompletion(int(msg.Index))
1098 // It's important that the piece is potentially queued before we check if
1099 // the piece is still wanted, because if it is queued, it won't be wanted.
1100 if t.pieceAllDirty(index) {
1101 t.queuePieceCheck(int(req.Index))
1102 t.pendAllChunkSpecs(index)
1105 if c.peerTouchedPieces == nil {
1106 c.peerTouchedPieces = make(map[int]struct{})
1108 c.peerTouchedPieces[index] = struct{}{}
1110 cl.event.Broadcast()
1111 t.publishPieceChange(int(req.Index))
1114 // Also handles choking and unchoking of the remote peer.
1115 func (c *connection) upload(msg func(pp.Message) bool) bool {
1118 if cl.config.NoUpload {
1121 if !c.PeerInterested {
1124 seeding := t.seeding()
1125 if !seeding && !c.peerHasWantedPieces() {
1126 // There's no reason to upload to this peer.
1129 // Breaking or completing this loop means we don't want to upload to the
1130 // peer anymore, and we choke them.
1132 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1133 // We want to upload to the peer.
1134 if !c.Unchoke(msg) {
1137 for r := range c.PeerRequests {
1138 res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1140 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1142 delay := res.Delay()
1145 if c.uploadTimer == nil {
1146 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1148 c.uploadTimer.Reset(delay)
1150 // Hard to say what to return here.
1153 more, err := cl.sendChunk(t, c, r, msg)
1156 if t.pieceComplete(i) {
1157 t.updatePieceCompletion(i)
1158 if !t.pieceComplete(i) {
1159 // We had the piece, but not anymore.
1163 log.Printf("error sending chunk %+v to peer: %s", r, err)
1164 // If we failed to send a chunk, choke the peer to ensure they
1165 // flush all their requests. We've probably dropped a piece,
1166 // but there's no way to communicate this to the peer. If they
1167 // ask for it again, we'll kick them to allow us to send them
1168 // an updated bitfield.
1171 delete(c.PeerRequests, r)
1182 func (cn *connection) Drop() {
1183 cn.t.dropConnection(cn)
1186 func (cn *connection) netGoodPiecesDirtied() int {
1187 return cn.goodPiecesDirtied - cn.badPiecesDirtied
1190 func (c *connection) peerHasWantedPieces() bool {
1191 return !c.pieceRequestOrder.IsEmpty()
1194 func (c *connection) numLocalRequests() int {
1195 return len(c.requests)
1198 func (c *connection) deleteRequest(r request) bool {
1199 if _, ok := c.requests[r]; !ok {
1202 delete(c.requests, r)
1206 func (c *connection) tickleWriter() {
1207 c.writerCond.Broadcast()
1210 func (c *connection) postCancel(r request) bool {
1211 if !c.deleteRequest(r) {
1214 c.Post(makeCancelMessage(r))