PeerSourceDirect = "M"
)
+type peerRequestState struct {
+ data []byte
+}
+
type peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
// Stuff controlled by the remote peer.
peerInterested bool
peerChoking bool
- peerRequests map[request]struct{}
+ peerRequests map[request]*peerRequestState
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
PeerListenPort int
// The pieces the peer has claimed to have.
return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
}
-// Writes a message into the write buffer.
-func (cn *PeerConn) post(msg pp.Message) {
+// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
+// https://github.com/pion/datachannel/issues/59 is fixed.
+const writeBufferHighWaterLen = 1 << 15
+
+// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is
+// done asynchronously, so it may be that we're not able to honour backpressure from this method. It
+// might be possible to merge this with PeerConn.write down the track? They seem to be very similar.
+func (cn *PeerConn) post(msg pp.Message) bool {
torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
- // We don't need to track bytes here because a connection.w Writer wrapper
- // takes care of that (although there's some delay between us recording
- // the message, and the connection writer flushing it out.).
+ // We don't need to track bytes here because a connection.w Writer wrapper takes care of that
+ // (although there's some delay between us recording the message, and the connection writer
+ // flushing it out.).
cn.writeBuffer.Write(msg.MustMarshalBinary())
- // Last I checked only Piece messages affect stats, and we don't post
- // those.
+ // Last I checked only Piece messages affect stats, and we don't post those.
cn.wroteMsg(&msg)
cn.tickleWriter()
+ return cn.writeBuffer.Len() < writeBufferHighWaterLen
}
// Returns true if there's room to write more.
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
- // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
- // when https://github.com/pion/datachannel/issues/59 is fixed.
- return cn.writeBuffer.Len() < 1<<15
+ return cn.writeBuffer.Len() < writeBufferHighWaterLen
}
func (cn *PeerConn) requestMetadataPiece(index int) {
return errors.New("bad request")
}
if c.peerRequests == nil {
- c.peerRequests = make(map[request]struct{}, maxRequests)
+ c.peerRequests = make(map[request]*peerRequestState, maxRequests)
}
- c.peerRequests[r] = struct{}{}
- c.tickleWriter()
+ value := &peerRequestState{}
+ c.peerRequests[r] = value
+ go c.peerRequestDataReader(r, value)
+ //c.tickleWriter()
return nil
}
+func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
+ b, err := readPeerRequestData(r, c)
+ c.locker().Lock()
+ defer c.locker().Unlock()
+ if err != nil {
+ c.peerRequestDataReadFailed(err, r)
+ } else {
+ if b == nil {
+ panic("data must be non-nil to trigger send")
+ }
+ prs.data = b
+ c.tickleWriter()
+ }
+}
+
+// If this is maintained correctly, we might be able to support optional synchronous reading for
+// chunk sending, the way it used to work.
+func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
+ c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err)
+ i := pieceIndex(r.Index)
+ if c.t.pieceComplete(i) {
+ // There used to be more code here that just duplicated the following break. Piece
+ // completions are currently cached, so I'm not sure how helpful this update is, except to
+ // pull any completion changes pushed to the storage backend in failed reads that got us
+ // here.
+ c.t.updatePieceCompletion(i)
+ }
+ // If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've
+ // probably dropped a piece from storage, but there's no way to communicate this to the peer. If
+ // they ask for it again, we'll kick them to allow us to send them an updated bitfield on the
+ // next connect. TODO: Support rejecting here too.
+ if c.choking {
+ c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+ }
+ c.choke(c.post)
+}
+
+func readPeerRequestData(r request, c *PeerConn) ([]byte, error) {
+ b := make([]byte, r.Length)
+ p := c.t.info.Piece(int(r.Index))
+ n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
+ if n == len(b) {
+ if err == io.EOF {
+ err = nil
+ }
+ } else {
+ if err == nil {
+ panic("expected error")
+ }
+ }
+ return b, err
+}
+
func runSafeExtraneous(f func()) {
if true {
go f()
if !c.unchoke(msg) {
return false
}
- for r := range c.peerRequests {
+ for r, state := range c.peerRequests {
+ if state.data == nil {
+ continue
+ }
res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
if !res.OK() {
panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
// Hard to say what to return here.
return true
}
- more, err := c.sendChunk(r, msg)
- if err != nil {
- c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err)
- i := pieceIndex(r.Index)
- if c.t.pieceComplete(i) {
- // There used to be more code here that just duplicated the following break.
- // Piece completions are currently cached, so I'm not sure how helpful this
- // update is, except to pull any completion changes pushed to the storage
- // backend in failed reads that got us here.
- c.t.updatePieceCompletion(i)
- }
- // If we failed to send a chunk, choke the peer by breaking out of the loop here to
- // ensure they flush all their requests. We've probably dropped a piece from
- // storage, but there's no way to communicate this to the peer. If they ask for it
- // again, we'll kick them to allow us to send them an updated bitfield on the next
- // connect.
- if c.choking {
- c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
- }
- break another
- }
+ more := c.sendChunk(r, msg, state)
delete(c.peerRequests, r)
if !more {
return false
// }
}
+// This is called when something has changed that should wake the writer, such as putting stuff into
+// the writeBuffer, or changing some state that the writer can act on.
func (c *PeerConn) tickleWriter() {
c.writerCond.Broadcast()
}
c.post(makeCancelMessage(r))
}
-func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
- b := make([]byte, r.Length)
- p := c.t.info.Piece(int(r.Index))
- n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
- if n != len(b) {
- if err == nil {
- panic("expected error")
- }
- return
- } else if err == io.EOF {
- err = nil
- }
- more = msg(pp.Message{
+func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
+ c.lastChunkSent = time.Now()
+ return msg(pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
- Piece: b,
+ Piece: state.data,
})
- c.lastChunkSent = time.Now()
- return
}
func (c *PeerConn) setTorrent(t *Torrent) {