]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move Client.connectionLoop to connection.mainReadLoop
authorMatt Joiner <anacrolix@gmail.com>
Sun, 11 Sep 2016 04:32:56 +0000 (14:32 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 11 Sep 2016 04:32:56 +0000 (14:32 +1000)
client.go
connection.go

index d0d34ca8ef4be6ef862836ccabd88f8fc705756f..79409d721d1d51e65434e48b4d4174a20108264d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -929,7 +929,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
        defer t.dropConnection(c)
        go c.writer(time.Minute)
        cl.sendInitialMessages(c, t)
-       err := cl.connectionLoop(t, c)
+       err := c.mainReadLoop()
        if err != nil && cl.config.Debug {
                log.Printf("error during connection loop: %s", err)
        }
@@ -1118,214 +1118,6 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
        return nil
 }
 
-// Processes incoming bittorrent messages. The client lock is held upon entry
-// and exit. Returning will end the connection.
-func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
-       decoder := pp.Decoder{
-               R:         bufio.NewReader(c.rw),
-               MaxLength: 256 * 1024,
-       }
-       for {
-               cl.mu.Unlock()
-               var msg pp.Message
-               err := decoder.Decode(&msg)
-               cl.mu.Lock()
-               if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
-                       return nil
-               }
-               if err != nil {
-                       return err
-               }
-               c.readMsg(&msg)
-               c.lastMessageReceived = time.Now()
-               if msg.Keepalive {
-                       receivedKeepalives.Add(1)
-                       continue
-               }
-               receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
-               switch msg.Type {
-               case pp.Choke:
-                       c.PeerChoked = true
-                       c.Requests = nil
-                       // We can then reset our interest.
-                       c.updateRequests()
-               case pp.Reject:
-                       cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
-                       c.updateRequests()
-               case pp.Unchoke:
-                       c.PeerChoked = false
-                       cl.peerUnchoked(t, c)
-               case pp.Interested:
-                       c.PeerInterested = true
-                       cl.upload(t, c)
-               case pp.NotInterested:
-                       c.PeerInterested = false
-                       c.Choke()
-               case pp.Have:
-                       err = c.peerSentHave(int(msg.Index))
-               case pp.Request:
-                       if c.Choked {
-                               break
-                       }
-                       if !c.PeerInterested {
-                               err = errors.New("peer sent request but isn't interested")
-                               break
-                       }
-                       if !t.havePiece(msg.Index.Int()) {
-                               // This isn't necessarily them screwing up. We can drop pieces
-                               // from our storage, and can't communicate this to peers
-                               // except by reconnecting.
-                               requestsReceivedForMissingPieces.Add(1)
-                               err = errors.New("peer requested piece we don't have")
-                               break
-                       }
-                       if c.PeerRequests == nil {
-                               c.PeerRequests = make(map[request]struct{}, maxRequests)
-                       }
-                       c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
-                       cl.upload(t, c)
-               case pp.Cancel:
-                       req := newRequest(msg.Index, msg.Begin, msg.Length)
-                       if !c.PeerCancel(req) {
-                               unexpectedCancels.Add(1)
-                       }
-               case pp.Bitfield:
-                       err = c.peerSentBitfield(msg.Bitfield)
-               case pp.HaveAll:
-                       err = c.peerSentHaveAll()
-               case pp.HaveNone:
-                       err = c.peerSentHaveNone()
-               case pp.Piece:
-                       cl.downloadedChunk(t, c, &msg)
-               case pp.Extended:
-                       switch msg.ExtendedID {
-                       case pp.HandshakeExtendedID:
-                               // TODO: Create a bencode struct for this.
-                               var d map[string]interface{}
-                               err = bencode.Unmarshal(msg.ExtendedPayload, &d)
-                               if err != nil {
-                                       err = fmt.Errorf("error decoding extended message payload: %s", err)
-                                       break
-                               }
-                               // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
-                               if reqq, ok := d["reqq"]; ok {
-                                       if i, ok := reqq.(int64); ok {
-                                               c.PeerMaxRequests = int(i)
-                                       }
-                               }
-                               if v, ok := d["v"]; ok {
-                                       c.PeerClientName = v.(string)
-                               }
-                               m, ok := d["m"]
-                               if !ok {
-                                       err = errors.New("handshake missing m item")
-                                       break
-                               }
-                               mTyped, ok := m.(map[string]interface{})
-                               if !ok {
-                                       err = errors.New("handshake m value is not dict")
-                                       break
-                               }
-                               if c.PeerExtensionIDs == nil {
-                                       c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
-                               }
-                               for name, v := range mTyped {
-                                       id, ok := v.(int64)
-                                       if !ok {
-                                               log.Printf("bad handshake m item extension ID type: %T", v)
-                                               continue
-                                       }
-                                       if id == 0 {
-                                               delete(c.PeerExtensionIDs, name)
-                                       } else {
-                                               if c.PeerExtensionIDs[name] == 0 {
-                                                       supportedExtensionMessages.Add(name, 1)
-                                               }
-                                               c.PeerExtensionIDs[name] = byte(id)
-                                       }
-                               }
-                               metadata_sizeUntyped, ok := d["metadata_size"]
-                               if ok {
-                                       metadata_size, ok := metadata_sizeUntyped.(int64)
-                                       if !ok {
-                                               log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
-                                       } else {
-                                               err = t.setMetadataSize(metadata_size)
-                                               if err != nil {
-                                                       err = fmt.Errorf("error setting metadata size to %d", metadata_size)
-                                                       break
-                                               }
-                                       }
-                               }
-                               if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
-                                       c.requestPendingMetadata()
-                               }
-                       case metadataExtendedId:
-                               err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
-                               if err != nil {
-                                       err = fmt.Errorf("error handling metadata extension message: %s", err)
-                               }
-                       case pexExtendedId:
-                               if cl.config.DisablePEX {
-                                       break
-                               }
-                               var pexMsg peerExchangeMessage
-                               err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
-                               if err != nil {
-                                       err = fmt.Errorf("error unmarshalling PEX message: %s", err)
-                                       break
-                               }
-                               go func() {
-                                       cl.mu.Lock()
-                                       t.addPeers(func() (ret []Peer) {
-                                               for i, cp := range pexMsg.Added {
-                                                       p := Peer{
-                                                               IP:     make([]byte, 4),
-                                                               Port:   cp.Port,
-                                                               Source: peerSourcePEX,
-                                                       }
-                                                       if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
-                                                               p.SupportsEncryption = true
-                                                       }
-                                                       missinggo.CopyExact(p.IP, cp.IP[:])
-                                                       ret = append(ret, p)
-                                               }
-                                               return
-                                       }())
-                                       cl.mu.Unlock()
-                               }()
-                       default:
-                               err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
-                       }
-                       if err != nil {
-                               // That client uses its own extension IDs for outgoing message
-                               // types, which is incorrect.
-                               if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
-                                       strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
-                                       return nil
-                               }
-                       }
-               case pp.Port:
-                       if cl.dHT == nil {
-                               break
-                       }
-                       pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
-                       if err != nil {
-                               panic(err)
-                       }
-                       if msg.Port != 0 {
-                               pingAddr.Port = int(msg.Port)
-                       }
-                       cl.dHT.Ping(pingAddr)
-               default:
-                       err = fmt.Errorf("received unknown message type: %#v", msg.Type)
-               }
-               if err != nil {
-                       return err
-               }
-       }
-}
-
 func (cl *Client) openNewConns(t *Torrent) {
        defer t.updateWantPeersEvent()
        for len(t.peers) != 0 {
index 3e29e3197786cda84c7a8f06190e35de51672cce..d98b1c273001f0e32e697637485c94f493bcde10 100644 (file)
@@ -8,9 +8,11 @@ import (
        "expvar"
        "fmt"
        "io"
+       "log"
        "math/rand"
        "net"
        "strconv"
+       "strings"
        "sync"
        "time"
 
@@ -690,3 +692,213 @@ func (c *connection) lastHelpful() (ret time.Time) {
        }
        return
 }
+
+// Processes incoming bittorrent messages. The client lock is held upon entry
+// and exit. Returning will end the connection.
+func (c *connection) mainReadLoop() error {
+       t := c.t
+       cl := t.cl
+       decoder := pp.Decoder{
+               R:         bufio.NewReader(c.rw),
+               MaxLength: 256 * 1024,
+       }
+       for {
+               cl.mu.Unlock()
+               var msg pp.Message
+               err := decoder.Decode(&msg)
+               cl.mu.Lock()
+               if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+               c.readMsg(&msg)
+               c.lastMessageReceived = time.Now()
+               if msg.Keepalive {
+                       receivedKeepalives.Add(1)
+                       continue
+               }
+               receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+               switch msg.Type {
+               case pp.Choke:
+                       c.PeerChoked = true
+                       c.Requests = nil
+                       // We can then reset our interest.
+                       c.updateRequests()
+               case pp.Reject:
+                       cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
+                       c.updateRequests()
+               case pp.Unchoke:
+                       c.PeerChoked = false
+                       cl.peerUnchoked(t, c)
+               case pp.Interested:
+                       c.PeerInterested = true
+                       cl.upload(t, c)
+               case pp.NotInterested:
+                       c.PeerInterested = false
+                       c.Choke()
+               case pp.Have:
+                       err = c.peerSentHave(int(msg.Index))
+               case pp.Request:
+                       if c.Choked {
+                               break
+                       }
+                       if !c.PeerInterested {
+                               err = errors.New("peer sent request but isn't interested")
+                               break
+                       }
+                       if !t.havePiece(msg.Index.Int()) {
+                               // This isn't necessarily them screwing up. We can drop pieces
+                               // from our storage, and can't communicate this to peers
+                               // except by reconnecting.
+                               requestsReceivedForMissingPieces.Add(1)
+                               err = errors.New("peer requested piece we don't have")
+                               break
+                       }
+                       if c.PeerRequests == nil {
+                               c.PeerRequests = make(map[request]struct{}, maxRequests)
+                       }
+                       c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
+                       cl.upload(t, c)
+               case pp.Cancel:
+                       req := newRequest(msg.Index, msg.Begin, msg.Length)
+                       if !c.PeerCancel(req) {
+                               unexpectedCancels.Add(1)
+                       }
+               case pp.Bitfield:
+                       err = c.peerSentBitfield(msg.Bitfield)
+               case pp.HaveAll:
+                       err = c.peerSentHaveAll()
+               case pp.HaveNone:
+                       err = c.peerSentHaveNone()
+               case pp.Piece:
+                       cl.downloadedChunk(t, c, &msg)
+               case pp.Extended:
+                       switch msg.ExtendedID {
+                       case pp.HandshakeExtendedID:
+                               // TODO: Create a bencode struct for this.
+                               var d map[string]interface{}
+                               err = bencode.Unmarshal(msg.ExtendedPayload, &d)
+                               if err != nil {
+                                       err = fmt.Errorf("error decoding extended message payload: %s", err)
+                                       break
+                               }
+                               // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
+                               if reqq, ok := d["reqq"]; ok {
+                                       if i, ok := reqq.(int64); ok {
+                                               c.PeerMaxRequests = int(i)
+                                       }
+                               }
+                               if v, ok := d["v"]; ok {
+                                       c.PeerClientName = v.(string)
+                               }
+                               m, ok := d["m"]
+                               if !ok {
+                                       err = errors.New("handshake missing m item")
+                                       break
+                               }
+                               mTyped, ok := m.(map[string]interface{})
+                               if !ok {
+                                       err = errors.New("handshake m value is not dict")
+                                       break
+                               }
+                               if c.PeerExtensionIDs == nil {
+                                       c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
+                               }
+                               for name, v := range mTyped {
+                                       id, ok := v.(int64)
+                                       if !ok {
+                                               log.Printf("bad handshake m item extension ID type: %T", v)
+                                               continue
+                                       }
+                                       if id == 0 {
+                                               delete(c.PeerExtensionIDs, name)
+                                       } else {
+                                               if c.PeerExtensionIDs[name] == 0 {
+                                                       supportedExtensionMessages.Add(name, 1)
+                                               }
+                                               c.PeerExtensionIDs[name] = byte(id)
+                                       }
+                               }
+                               metadata_sizeUntyped, ok := d["metadata_size"]
+                               if ok {
+                                       metadata_size, ok := metadata_sizeUntyped.(int64)
+                                       if !ok {
+                                               log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
+                                       } else {
+                                               err = t.setMetadataSize(metadata_size)
+                                               if err != nil {
+                                                       err = fmt.Errorf("error setting metadata size to %d", metadata_size)
+                                                       break
+                                               }
+                                       }
+                               }
+                               if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
+                                       c.requestPendingMetadata()
+                               }
+                       case metadataExtendedId:
+                               err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
+                               if err != nil {
+                                       err = fmt.Errorf("error handling metadata extension message: %s", err)
+                               }
+                       case pexExtendedId:
+                               if cl.config.DisablePEX {
+                                       break
+                               }
+                               var pexMsg peerExchangeMessage
+                               err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
+                               if err != nil {
+                                       err = fmt.Errorf("error unmarshalling PEX message: %s", err)
+                                       break
+                               }
+                               go func() {
+                                       cl.mu.Lock()
+                                       t.addPeers(func() (ret []Peer) {
+                                               for i, cp := range pexMsg.Added {
+                                                       p := Peer{
+                                                               IP:     make([]byte, 4),
+                                                               Port:   cp.Port,
+                                                               Source: peerSourcePEX,
+                                                       }
+                                                       if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
+                                                               p.SupportsEncryption = true
+                                                       }
+                                                       missinggo.CopyExact(p.IP, cp.IP[:])
+                                                       ret = append(ret, p)
+                                               }
+                                               return
+                                       }())
+                                       cl.mu.Unlock()
+                               }()
+                       default:
+                               err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
+                       }
+                       if err != nil {
+                               // That client uses its own extension IDs for outgoing message
+                               // types, which is incorrect.
+                               if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
+                                       strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
+                                       return nil
+                               }
+                       }
+               case pp.Port:
+                       if cl.dHT == nil {
+                               break
+                       }
+                       pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
+                       if err != nil {
+                               panic(err)
+                       }
+                       if msg.Port != 0 {
+                               pingAddr.Port = int(msg.Port)
+                       }
+                       cl.dHT.Ping(pingAddr)
+               default:
+                       err = fmt.Errorf("received unknown message type: %#v", msg.Type)
+               }
+               if err != nil {
+                       return err
+               }
+       }
+}