// are okay.
type messageWriter func(pp.Message) bool
-func (cn *Peer) request(r Request) (more bool, err error) {
+func (cn *Peer) request(r Request) error {
if _, ok := cn.requests[r]; ok {
- return true, nil
+ return nil
}
if cn.numLocalRequests() >= cn.nominalMaxRequests() {
- return true, errors.New("too many outstanding requests")
+ return errors.New("too many outstanding requests")
}
if !cn.peerHasPiece(pieceIndex(r.Index)) {
- return true, errors.New("requesting piece peer doesn't have")
+ return errors.New("requesting piece peer doesn't have")
}
if !cn.t.peerIsActive(cn) {
panic("requesting but not in active conns")
if cn.closed.IsSet() {
panic("requesting when connection is closed")
}
- if cn.peerChoking {
- if cn.peerAllowedFast.Get(int(r.Index)) {
- torrent.Add("allowed fast requests sent", 1)
- } else {
- return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
- }
- }
if cn.t.hashingPiece(pieceIndex(r.Index)) {
panic("piece is being hashed")
}
if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
panic("piece is queued for hash")
}
+ if !cn.setInterested(true) {
+ return errors.New("write buffer full after expressing interest")
+ }
+ if cn.peerChoking {
+ if cn.peerAllowedFast.Get(int(r.Index)) {
+ torrent.Add("allowed fast requests sent", 1)
+ } else {
+ errors.New("peer choking and piece not in allowed fast set")
+ }
+ }
if cn.requests == nil {
cn.requests = make(map[Request]struct{})
}
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
- return cn.peerImpl.request(r), nil
+ cn.peerImpl._request(r)
+ return nil
}
-func (me *PeerConn) request(r Request) bool {
- return me.write(pp.Message{
+func (me *PeerConn) _request(r Request) {
+ me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
})
}
-func (me *PeerConn) cancel(r Request) bool {
- return me.write(makeCancelMessage(r))
+func (me *Peer) cancel(r Request) {
+ if me.deleteRequest(r) {
+ me.peerImpl._cancel(r)
+ }
+}
+
+func (me *PeerConn) _cancel(r Request) {
+ me.write(makeCancelMessage(r))
}
func (cn *PeerConn) fillWriteBuffer() {
if p == c {
return
}
- p.postCancel(req)
+ p.cancel(req)
})
err := func() error {
if n < 0 {
panic(n)
}
- // If a request fails, updating the requests for the current peer first may miss the opportunity
- // to try other peers for that request instead, depending on the request strategy. This might
- // only affect webseed peers though, since they synchronously issue new requests: PeerConns do
- // it in the writer routine.
- const updateCurrentConnRequestsFirst = false
- if updateCurrentConnRequestsFirst {
- c.updateRequests()
- }
- // Give other conns a chance to pick up the request.
- c.t.iterPeers(func(_c *Peer) {
- // We previously checked that the peer wasn't interested to to only wake connections that
- // were unable to issue requests due to starvation by the request strategy. There could be
- // performance ramifications.
- if _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
- _c.updateRequests()
- }
- })
- if !updateCurrentConnRequestsFirst {
- c.updateRequests()
- }
return true
}
c.writerCond.Broadcast()
}
-func (c *Peer) postCancel(r Request) bool {
- if !c.deleteRequest(r) {
- return false
- }
- c.peerImpl._postCancel(r)
- return true
-}
-
-func (c *PeerConn) _postCancel(r Request) {
- c.post(makeCancelMessage(r))
-}
-
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now()
return msg(pp.Message{