defer t.dropConnection(c)
go c.writer(time.Minute)
cl.sendInitialMessages(c, t)
- err := cl.connectionLoop(t, c)
+ err := c.mainReadLoop()
if err != nil && cl.config.Debug {
log.Printf("error during connection loop: %s", err)
}
return nil
}
-// Processes incoming bittorrent messages. The client lock is held upon entry
-// and exit. Returning will end the connection.
-func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
- decoder := pp.Decoder{
- R: bufio.NewReader(c.rw),
- MaxLength: 256 * 1024,
- }
- for {
- cl.mu.Unlock()
- var msg pp.Message
- err := decoder.Decode(&msg)
- cl.mu.Lock()
- if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- c.readMsg(&msg)
- c.lastMessageReceived = time.Now()
- if msg.Keepalive {
- receivedKeepalives.Add(1)
- continue
- }
- receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
- switch msg.Type {
- case pp.Choke:
- c.PeerChoked = true
- c.Requests = nil
- // We can then reset our interest.
- c.updateRequests()
- case pp.Reject:
- cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
- c.updateRequests()
- case pp.Unchoke:
- c.PeerChoked = false
- cl.peerUnchoked(t, c)
- case pp.Interested:
- c.PeerInterested = true
- cl.upload(t, c)
- case pp.NotInterested:
- c.PeerInterested = false
- c.Choke()
- case pp.Have:
- err = c.peerSentHave(int(msg.Index))
- case pp.Request:
- if c.Choked {
- break
- }
- if !c.PeerInterested {
- err = errors.New("peer sent request but isn't interested")
- break
- }
- if !t.havePiece(msg.Index.Int()) {
- // This isn't necessarily them screwing up. We can drop pieces
- // from our storage, and can't communicate this to peers
- // except by reconnecting.
- requestsReceivedForMissingPieces.Add(1)
- err = errors.New("peer requested piece we don't have")
- break
- }
- if c.PeerRequests == nil {
- c.PeerRequests = make(map[request]struct{}, maxRequests)
- }
- c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
- cl.upload(t, c)
- case pp.Cancel:
- req := newRequest(msg.Index, msg.Begin, msg.Length)
- if !c.PeerCancel(req) {
- unexpectedCancels.Add(1)
- }
- case pp.Bitfield:
- err = c.peerSentBitfield(msg.Bitfield)
- case pp.HaveAll:
- err = c.peerSentHaveAll()
- case pp.HaveNone:
- err = c.peerSentHaveNone()
- case pp.Piece:
- cl.downloadedChunk(t, c, &msg)
- case pp.Extended:
- switch msg.ExtendedID {
- case pp.HandshakeExtendedID:
- // TODO: Create a bencode struct for this.
- var d map[string]interface{}
- err = bencode.Unmarshal(msg.ExtendedPayload, &d)
- if err != nil {
- err = fmt.Errorf("error decoding extended message payload: %s", err)
- break
- }
- // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
- if reqq, ok := d["reqq"]; ok {
- if i, ok := reqq.(int64); ok {
- c.PeerMaxRequests = int(i)
- }
- }
- if v, ok := d["v"]; ok {
- c.PeerClientName = v.(string)
- }
- m, ok := d["m"]
- if !ok {
- err = errors.New("handshake missing m item")
- break
- }
- mTyped, ok := m.(map[string]interface{})
- if !ok {
- err = errors.New("handshake m value is not dict")
- break
- }
- if c.PeerExtensionIDs == nil {
- c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
- }
- for name, v := range mTyped {
- id, ok := v.(int64)
- if !ok {
- log.Printf("bad handshake m item extension ID type: %T", v)
- continue
- }
- if id == 0 {
- delete(c.PeerExtensionIDs, name)
- } else {
- if c.PeerExtensionIDs[name] == 0 {
- supportedExtensionMessages.Add(name, 1)
- }
- c.PeerExtensionIDs[name] = byte(id)
- }
- }
- metadata_sizeUntyped, ok := d["metadata_size"]
- if ok {
- metadata_size, ok := metadata_sizeUntyped.(int64)
- if !ok {
- log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
- } else {
- err = t.setMetadataSize(metadata_size)
- if err != nil {
- err = fmt.Errorf("error setting metadata size to %d", metadata_size)
- break
- }
- }
- }
- if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
- c.requestPendingMetadata()
- }
- case metadataExtendedId:
- err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
- if err != nil {
- err = fmt.Errorf("error handling metadata extension message: %s", err)
- }
- case pexExtendedId:
- if cl.config.DisablePEX {
- break
- }
- var pexMsg peerExchangeMessage
- err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
- if err != nil {
- err = fmt.Errorf("error unmarshalling PEX message: %s", err)
- break
- }
- go func() {
- cl.mu.Lock()
- t.addPeers(func() (ret []Peer) {
- for i, cp := range pexMsg.Added {
- p := Peer{
- IP: make([]byte, 4),
- Port: cp.Port,
- Source: peerSourcePEX,
- }
- if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
- p.SupportsEncryption = true
- }
- missinggo.CopyExact(p.IP, cp.IP[:])
- ret = append(ret, p)
- }
- return
- }())
- cl.mu.Unlock()
- }()
- default:
- err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
- }
- if err != nil {
- // That client uses its own extension IDs for outgoing message
- // types, which is incorrect.
- if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
- strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
- return nil
- }
- }
- case pp.Port:
- if cl.dHT == nil {
- break
- }
- pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
- if err != nil {
- panic(err)
- }
- if msg.Port != 0 {
- pingAddr.Port = int(msg.Port)
- }
- cl.dHT.Ping(pingAddr)
- default:
- err = fmt.Errorf("received unknown message type: %#v", msg.Type)
- }
- if err != nil {
- return err
- }
- }
-}
-
func (cl *Client) openNewConns(t *Torrent) {
defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
"expvar"
"fmt"
"io"
+ "log"
"math/rand"
"net"
"strconv"
+ "strings"
"sync"
"time"
}
return
}
+
+// Processes incoming bittorrent messages. The client lock is held upon entry
+// and exit. Returning will end the connection.
+func (c *connection) mainReadLoop() error {
+ t := c.t
+ cl := t.cl
+ decoder := pp.Decoder{
+ R: bufio.NewReader(c.rw),
+ MaxLength: 256 * 1024,
+ }
+ for {
+ cl.mu.Unlock()
+ var msg pp.Message
+ err := decoder.Decode(&msg)
+ cl.mu.Lock()
+ if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ c.readMsg(&msg)
+ c.lastMessageReceived = time.Now()
+ if msg.Keepalive {
+ receivedKeepalives.Add(1)
+ continue
+ }
+ receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+ switch msg.Type {
+ case pp.Choke:
+ c.PeerChoked = true
+ c.Requests = nil
+ // We can then reset our interest.
+ c.updateRequests()
+ case pp.Reject:
+ cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
+ c.updateRequests()
+ case pp.Unchoke:
+ c.PeerChoked = false
+ cl.peerUnchoked(t, c)
+ case pp.Interested:
+ c.PeerInterested = true
+ cl.upload(t, c)
+ case pp.NotInterested:
+ c.PeerInterested = false
+ c.Choke()
+ case pp.Have:
+ err = c.peerSentHave(int(msg.Index))
+ case pp.Request:
+ if c.Choked {
+ break
+ }
+ if !c.PeerInterested {
+ err = errors.New("peer sent request but isn't interested")
+ break
+ }
+ if !t.havePiece(msg.Index.Int()) {
+ // This isn't necessarily them screwing up. We can drop pieces
+ // from our storage, and can't communicate this to peers
+ // except by reconnecting.
+ requestsReceivedForMissingPieces.Add(1)
+ err = errors.New("peer requested piece we don't have")
+ break
+ }
+ if c.PeerRequests == nil {
+ c.PeerRequests = make(map[request]struct{}, maxRequests)
+ }
+ c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
+ cl.upload(t, c)
+ case pp.Cancel:
+ req := newRequest(msg.Index, msg.Begin, msg.Length)
+ if !c.PeerCancel(req) {
+ unexpectedCancels.Add(1)
+ }
+ case pp.Bitfield:
+ err = c.peerSentBitfield(msg.Bitfield)
+ case pp.HaveAll:
+ err = c.peerSentHaveAll()
+ case pp.HaveNone:
+ err = c.peerSentHaveNone()
+ case pp.Piece:
+ cl.downloadedChunk(t, c, &msg)
+ case pp.Extended:
+ switch msg.ExtendedID {
+ case pp.HandshakeExtendedID:
+ // TODO: Create a bencode struct for this.
+ var d map[string]interface{}
+ err = bencode.Unmarshal(msg.ExtendedPayload, &d)
+ if err != nil {
+ err = fmt.Errorf("error decoding extended message payload: %s", err)
+ break
+ }
+ // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
+ if reqq, ok := d["reqq"]; ok {
+ if i, ok := reqq.(int64); ok {
+ c.PeerMaxRequests = int(i)
+ }
+ }
+ if v, ok := d["v"]; ok {
+ c.PeerClientName = v.(string)
+ }
+ m, ok := d["m"]
+ if !ok {
+ err = errors.New("handshake missing m item")
+ break
+ }
+ mTyped, ok := m.(map[string]interface{})
+ if !ok {
+ err = errors.New("handshake m value is not dict")
+ break
+ }
+ if c.PeerExtensionIDs == nil {
+ c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
+ }
+ for name, v := range mTyped {
+ id, ok := v.(int64)
+ if !ok {
+ log.Printf("bad handshake m item extension ID type: %T", v)
+ continue
+ }
+ if id == 0 {
+ delete(c.PeerExtensionIDs, name)
+ } else {
+ if c.PeerExtensionIDs[name] == 0 {
+ supportedExtensionMessages.Add(name, 1)
+ }
+ c.PeerExtensionIDs[name] = byte(id)
+ }
+ }
+ metadata_sizeUntyped, ok := d["metadata_size"]
+ if ok {
+ metadata_size, ok := metadata_sizeUntyped.(int64)
+ if !ok {
+ log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
+ } else {
+ err = t.setMetadataSize(metadata_size)
+ if err != nil {
+ err = fmt.Errorf("error setting metadata size to %d", metadata_size)
+ break
+ }
+ }
+ }
+ if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
+ c.requestPendingMetadata()
+ }
+ case metadataExtendedId:
+ err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
+ if err != nil {
+ err = fmt.Errorf("error handling metadata extension message: %s", err)
+ }
+ case pexExtendedId:
+ if cl.config.DisablePEX {
+ break
+ }
+ var pexMsg peerExchangeMessage
+ err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
+ if err != nil {
+ err = fmt.Errorf("error unmarshalling PEX message: %s", err)
+ break
+ }
+ go func() {
+ cl.mu.Lock()
+ t.addPeers(func() (ret []Peer) {
+ for i, cp := range pexMsg.Added {
+ p := Peer{
+ IP: make([]byte, 4),
+ Port: cp.Port,
+ Source: peerSourcePEX,
+ }
+ if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
+ p.SupportsEncryption = true
+ }
+ missinggo.CopyExact(p.IP, cp.IP[:])
+ ret = append(ret, p)
+ }
+ return
+ }())
+ cl.mu.Unlock()
+ }()
+ default:
+ err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
+ }
+ if err != nil {
+ // That client uses its own extension IDs for outgoing message
+ // types, which is incorrect.
+ if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
+ strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
+ return nil
+ }
+ }
+ case pp.Port:
+ if cl.dHT == nil {
+ break
+ }
+ pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
+ if err != nil {
+ panic(err)
+ }
+ if msg.Port != 0 {
+ pingAddr.Port = int(msg.Port)
+ }
+ cl.dHT.Ping(pingAddr)
+ default:
+ err = fmt.Errorf("received unknown message type: %#v", msg.Type)
+ }
+ if err != nil {
+ return err
+ }
+ }
+}