}
}
-func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
+func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
p := t.info.Piece(int(r.Index))
if err == nil {
panic("expected error")
}
- return err
+ return
}
- c.Post(pp.Message{
+ more = msg(pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
c.chunksSent++
uploadChunksPosted.Add(1)
c.lastChunkSent = time.Now()
- return nil
+ return
}
func (cl *Client) openNewConns(t *Torrent) {
pieceRequestOrder prioritybitmap.PriorityBitmap
postedBuffer bytes.Buffer
+ uploadTimer *time.Timer
writerCond sync.Cond
}
return true
}
-func (cn *connection) Choke() {
+func (cn *connection) Choke(msg func(pp.Message) bool) bool {
if cn.Choked {
- return
+ return true
}
- cn.Post(pp.Message{
- Type: pp.Choke,
- })
cn.PeerRequests = nil
cn.Choked = true
+ return msg(pp.Message{
+ Type: pp.Choke,
+ })
}
-func (cn *connection) Unchoke() {
+func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
if !cn.Choked {
- return
+ return true
}
- cn.Post(pp.Message{
+ cn.Choked = false
+ return msg(pp.Message{
Type: pp.Unchoke,
})
- cn.Choked = false
}
func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
// have more write buffer space.
cn.requestsLowWater = len(cn.requests) / 2
}
+ cn.upload(msg)
}
// Writes buffers to the socket from the write channel.
c.tickleWriter()
case pp.Interested:
c.PeerInterested = true
- c.upload()
+ c.tickleWriter()
case pp.NotInterested:
c.PeerInterested = false
- c.Choke()
+ c.PeerRequests = nil
case pp.Have:
err = c.peerSentHave(int(msg.Index))
case pp.Request:
if c.Choked {
break
}
- if !c.PeerInterested {
- err = errors.New("peer sent request but isn't interested")
- break
- }
- if !t.havePiece(msg.Index.Int()) {
- // This isn't necessarily them screwing up. We can drop pieces
- // from our storage, and can't communicate this to peers
- // except by reconnecting.
- requestsReceivedForMissingPieces.Add(1)
- err = errors.New("peer requested piece we don't have")
+ if len(c.PeerRequests) >= maxRequests {
break
}
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
- c.upload()
+ c.tickleWriter()
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
- c.upload()
-
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
piece.incrementPendingWrites()
}
// Also handles choking and unchoking of the remote peer.
-func (c *connection) upload() {
+func (c *connection) upload(msg func(pp.Message) bool) bool {
t := c.t
cl := t.cl
if cl.config.NoUpload {
- return
+ return true
}
if !c.PeerInterested {
- return
+ return true
}
seeding := t.seeding()
if !seeding && !c.peerHasWantedPieces() {
// There's no reason to upload to this peer.
- return
+ return true
}
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
// We want to upload to the peer.
- c.Unchoke()
+ if !c.Unchoke(msg) {
+ return false
+ }
for r := range c.PeerRequests {
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+ if !res.OK() {
+ panic(res)
+ }
delay := res.Delay()
if delay > 0 {
res.Cancel()
- go func() {
- time.Sleep(delay)
- cl.mu.Lock()
- defer cl.mu.Unlock()
- c.upload()
- }()
- return
+ if c.uploadTimer == nil {
+ c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+ } else {
+ c.uploadTimer.Reset(delay)
+ }
+ // Hard to say what to return here.
+ return true
}
- err := cl.sendChunk(t, c, r)
+ more, err := cl.sendChunk(t, c, r, msg)
if err != nil {
i := int(r.Index)
if t.pieceComplete(i) {
break another
}
delete(c.PeerRequests, r)
+ if !more {
+ return false
+ }
goto another
}
- return
+ return true
}
- c.Choke()
+ return c.Choke(msg)
}
func (cn *connection) Drop() {