return
}
+func (c *connection) onReadRequest(r request) error {
+ requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
+ if c.Choked {
+ return nil
+ }
+ if len(c.PeerRequests) >= maxRequests {
+ // TODO: Should we drop them or Choke them instead?
+ return nil
+ }
+ if !c.t.havePiece(r.Index.Int()) {
+ // This isn't necessarily them screwing up. We can drop pieces
+ // from our storage, and can't communicate this to peers
+ // except by reconnecting.
+ requestsReceivedForMissingPieces.Add(1)
+ return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
+ }
+ if c.PeerRequests == nil {
+ c.PeerRequests = make(map[request]struct{}, maxRequests)
+ }
+ c.PeerRequests[r] = struct{}{}
+ c.tickleWriter()
+ return nil
+}
+
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit. Returning will end the connection.
func (c *connection) mainReadLoop() error {
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
- if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
+ if c.deleteRequest(newRequestFromMessage(&msg)) {
c.updateRequests()
}
case pp.Unchoke:
case pp.Have:
err = c.peerSentHave(int(msg.Index))
case pp.Request:
- requestedChunkLengths.Add(strconv.FormatUint(msg.Length.Uint64(), 10), 1)
- if c.Choked {
- break
- }
- if len(c.PeerRequests) >= maxRequests {
- // TODO: Should we drop them or Choke them instead?
- break
- }
- if !t.havePiece(msg.Index.Int()) {
- // This isn't necessarily them screwing up. We can drop pieces
- // from our storage, and can't communicate this to peers
- // except by reconnecting.
- requestsReceivedForMissingPieces.Add(1)
- err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
- break
- }
- if c.PeerRequests == nil {
- c.PeerRequests = make(map[request]struct{}, maxRequests)
- }
- c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
- c.tickleWriter()
+ r := newRequestFromMessage(&msg)
+ err = c.onReadRequest(r)
case pp.Cancel:
- req := newRequest(msg.Index, msg.Begin, msg.Length)
+ req := newRequestFromMessage(&msg)
if !c.PeerCancel(req) {
unexpectedCancels.Add(1)
}
cl := t.cl
chunksReceived.Add(1)
- req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
+ req := newRequestFromMessage(msg)
// Request has been satisfied.
if c.deleteRequest(req) {
t: &Torrent{},
}
require.False(t, cn.t.haveInfo())
- assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{}) })
+ assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
cn.t.info = &metainfo.Info{}
require.True(t, cn.t.haveInfo())
- assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{}) })
+ assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
}
return request{index, chunkSpec{begin, length}}
}
+func newRequestFromMessage(msg *pp.Message) request {
+ switch msg.Type {
+ case pp.Request:
+ return newRequest(msg.Index, msg.Begin, msg.Length)
+ case pp.Piece:
+ return newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
+ default:
+ panic(msg.Type)
+ }
+}
+
// The size in bytes of a metadata extension piece.
func metadataPieceSize(totalSize int, piece int) int {
ret := totalSize - piece*(1<<14)