}
c.conn.SetWriteDeadline(time.Time{})
c.r = deadlineReader{c.conn, c.r}
- completedHandshakeConnectionFlags.Add(c.ConnectionFlags(), 1)
+ completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
if connIsIpv6(c.conn) {
torrent.Add("completed handshake over ipv6", 1)
}
conn: nc,
writeBuffer: new(bytes.Buffer),
}
- c.PeerImpl = c
+ c.peerImpl = c
c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
return fmt.Sprintf("%v: %s", c, m.Text())
})
--- /dev/null
+package torrent
+
+import (
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+// Contains implementation details that differ between peer types, like Webseeds and regular
+// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
+// legacy PeerConn methods.
+type peerImpl interface {
+ updateRequests()
+ writeInterested(interested bool) bool
+ cancel(request) bool
+ // Return true if there's room for more activity.
+ request(request) bool
+ connectionFlags() string
+ _close()
+ _postCancel(request)
+ onGotInfo(*metainfo.Info)
+ drop()
+}
Trusted bool
}
-func (me PeerInfo) Equal(other PeerInfo) bool {
+func (me PeerInfo) equal(other PeerInfo) bool {
return me.Id == other.Id &&
me.Addr.String() == other.Addr.String() &&
me.Source == other.Source &&
PeerSourcePex = "X"
)
-type PeerImpl interface {
- UpdateRequests()
- WriteInterested(interested bool) bool
- Cancel(request) bool
- // Return true if there's room for more activity.
- Request(request) bool
- ConnectionFlags() string
- Close()
- PostCancel(request)
- onGotInfo(*metainfo.Info)
- Drop()
-}
-
type peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
t *Torrent
- PeerImpl
+ peerImpl
connString string
outgoing bool
return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
}
-func (cn *PeerConn) ConnectionFlags() (ret string) {
+func (cn *PeerConn) connectionFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
c('c')
}
c('-')
- ret += cn.ConnectionFlags()
+ ret += cn.connectionFlags()
c('-')
if cn.peerInterested {
c('i')
}
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
- cn.PeerImpl.Close()
+ cn.peerImpl._close()
}
-func (cn *PeerConn) Close() {
+func (cn *PeerConn) _close() {
if cn.pex.IsEnabled() {
cn.pex.Close()
}
}
cn.updateExpectingChunks()
// log.Printf("%p: setting interest: %v", cn, interested)
- return cn.WriteInterested(interested)
+ return cn.writeInterested(interested)
}
-func (pc *PeerConn) WriteInterested(interested bool) bool {
+func (pc *PeerConn) writeInterested(interested bool) bool {
return pc.write(pp.Message{
Type: func() pp.MessageType {
if interested {
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
- return cn.PeerImpl.Request(r)
+ return cn.peerImpl.request(r)
}
-func (me *PeerConn) Request(r request) bool {
+func (me *PeerConn) request(r request) bool {
return me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
})
}
-func (me *PeerConn) Cancel(r request) bool {
+func (me *PeerConn) cancel(r request) bool {
return me.write(makeCancelMessage(r))
}
for r := range cn.requests {
cn.deleteRequest(r)
// log.Printf("%p: cancelling request: %v", cn, r)
- if !cn.PeerImpl.Cancel(r) {
+ if !cn.peerImpl.cancel(r) {
return false
}
}
cn.sentHaves = cn.t._completedPieces.Copy()
}
-func (cn *PeerConn) UpdateRequests() {
+func (cn *PeerConn) updateRequests() {
// log.Print("update requests")
cn.tickleWriter()
}
}
}
if prioritiesChanged {
- cn.UpdateRequests()
+ cn.updateRequests()
}
}
}
cn.raisePeerMinPieces(piece + 1)
cn._peerPieces.Set(bitmap.BitIndex(piece), true)
if cn.updatePiecePriority(piece) {
- cn.UpdateRequests()
+ cn.updateRequests()
}
return nil
}
c.deleteAllRequests()
}
// We can then reset our interest.
- c.UpdateRequests()
+ c.updateRequests()
c.updateExpectingChunks()
case pp.Unchoke:
c.peerChoking = false
case pp.Suggest:
torrent.Add("suggests received", 1)
log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger)
- c.UpdateRequests()
+ c.updateRequests()
case pp.HaveAll:
err = c.onPeerSentHaveAll()
case pp.HaveNone:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
c.peerAllowedFast.Add(int(msg.Index))
- c.UpdateRequests()
+ c.updateRequests()
case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
default:
// Cancel pending requests for this chunk.
for c := range t.conns {
- c.PostCancel(req)
+ c._postCancel(req)
}
err := func() error {
return c.choke(msg)
}
-func (cn *PeerConn) Drop() {
+func (cn *PeerConn) drop() {
cn.t.dropConnection(cn)
}
if n < 0 {
panic(n)
}
- c.UpdateRequests()
+ c.updateRequests()
c.t.iterPeers(func(_c *peer) {
if !_c.interested && _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
- _c.UpdateRequests()
+ _c.updateRequests()
}
})
return true
if !c.deleteRequest(r) {
return false
}
- c.PeerImpl.PostCancel(r)
+ c.peerImpl._postCancel(r)
return true
}
-func (c *PeerConn) PostCancel(r request) {
+func (c *PeerConn) _postCancel(r request) {
c.post(makeCancelMessage(r))
}
}
if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
torrent.Add("peers replaced", 1)
- if !replaced.Equal(p) {
+ if !replaced.equal(p) {
t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
added = true
}
t.iterPeers(func(c *peer) {
if c.updatePiecePriority(piece) {
// log.Print("conn piece priority changed")
- c.UpdateRequests()
+ c.updateRequests()
}
})
t.maybeNewConns()
if len(bannableTouchers) >= 1 {
c := bannableTouchers[0]
t.cl.banPeerIP(c.remoteIp())
- c.Drop()
+ c.drop()
}
}
t.onIncompletePiece(piece)
// }
t.iterPeers(func(conn *peer) {
if conn.peerHasPiece(piece) {
- conn.UpdateRequests()
+ conn.updateRequests()
}
})
}
defer cb.t.cl.unlock()
cb.t.iterPeers(func(cn *peer) {
if cn.peerHasPiece(pieceIndex(r.Index)) {
- cn.UpdateRequests()
+ cn.updateRequests()
}
})
log.Printf("disallowing data download")
t.dataDownloadDisallowed = true
t.iterPeers(func(c *peer) {
- c.UpdateRequests()
+ c.updateRequests()
})
}
log.Printf("AllowDataDownload")
t.dataDownloadDisallowed = false
t.iterPeers(func(c *peer) {
- c.UpdateRequests()
+ c.updateRequests()
})
}
},
requests: make(map[request]webseed.Request, maxRequests),
}
- ws.peer.PeerImpl = &ws
+ ws.peer.peerImpl = &ws
if t.haveInfo() {
ws.onGotInfo(t.info)
}
return request{i, chunkSpec{b, l}}
}
-// Check the given Request is correct for various torrent offsets.
+// Check the given request is correct for various torrent offsets.
func TestTorrentRequest(t *testing.T) {
const s = 472183431 // Length of torrent.
for _, _case := range []struct {
off int64 // An offset into the torrent.
- req request // The expected Request. The zero value means !ok.
+ req request // The expected request. The zero value means !ok.
}{
// Invalid offset.
{-1, request{}},
peer peer
}
-var _ PeerImpl = (*webSeed)(nil)
+var _ peerImpl = (*webSeed)(nil)
func (ws *webSeed) onGotInfo(info *metainfo.Info) {
ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
ws.client.Info = info
}
-func (ws *webSeed) PostCancel(r request) {
- ws.Cancel(r)
+func (ws *webSeed) _postCancel(r request) {
+ ws.cancel(r)
}
-func (ws *webSeed) WriteInterested(interested bool) bool {
+func (ws *webSeed) writeInterested(interested bool) bool {
return true
}
-func (ws *webSeed) Cancel(r request) bool {
+func (ws *webSeed) cancel(r request) bool {
ws.requests[r].Cancel()
return true
}
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
}
-func (ws *webSeed) Request(r request) bool {
+func (ws *webSeed) request(r request) bool {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.requests[r] = webseedRequest
go ws.requestResultHandler(r, webseedRequest)
return true
}
-func (ws *webSeed) ConnectionFlags() string {
+func (ws *webSeed) connectionFlags() string {
return "WS"
}
-func (ws *webSeed) Drop() {
+func (ws *webSeed) drop() {
}
-func (ws *webSeed) UpdateRequests() {
+func (ws *webSeed) updateRequests() {
ws.peer.doRequestState()
}
-func (ws *webSeed) Close() {}
+func (ws *webSeed) _close() {}
func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result