X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peer.go;h=cf79b79969a0582e215f120f7228e4805b2f1f7f;hb=refs%2Fheads%2Fbtrtrc;hp=733aa018ef29727d10df21a44e5f3aa54e033cfb;hpb=bcd7fd4a6d83f4aca1e9e3d4c35997ef6aca405d;p=btrtrc.git diff --git a/peer.go b/peer.go index 733aa018..3e9da44f 100644 --- a/peer.go +++ b/peer.go @@ -31,7 +31,8 @@ type ( t *Torrent - peerImpl + legacyPeerImpl + peerImpl newHotPeerImpl callbacks *Callbacks outgoing bool @@ -57,7 +58,7 @@ type ( lastChunkSent time.Time // Stuff controlled by the local peer. - needRequestUpdate string + needRequestUpdate updateRequestReason requestState request_strategy.PeerRequestState updateRequestsTimer *time.Timer lastRequestUpdate time.Time @@ -111,6 +112,8 @@ type ( } peerRequests = orderedBitmap[RequestIndex] + + updateRequestReason string ) const ( @@ -124,6 +127,15 @@ const ( PeerSourceDirect = "M" ) +// These are grouped because we might vary update request behaviour depending on the reason. I'm not +// sure about the fact that multiple reasons can be triggered before an update runs, and only the +// first will count. Possibly we should be signalling what behaviours are appropriate in the next +// update instead. +const ( + peerUpdateRequestsPeerCancelReason updateRequestReason = "Peer.cancel" + peerUpdateRequestsRemoteRejectReason updateRequestReason = "Peer.remoteRejectedRequest" +) + // Returns the Torrent a Peer belongs to. Shouldn't change for the lifetime of the Peer. May be nil // if we are the receiving end of a connection and the handshake hasn't been received or accepted // yet. @@ -131,6 +143,16 @@ func (p *Peer) Torrent() *Torrent { return p.t } +func (p *Peer) BtrtrcStats() (ret PeerStats) { + p.locker().RLock() + defer p.locker().RUnlock() + ret.ConnStats = p._stats.Copy() + ret.DownloadRate = p.downloadRate() + ret.LastWriteUploadRate = p.peerImpl.lastWriteUploadRate() + ret.RemotePieceCount = p.remotePieceCount() + return +} + func (p *Peer) initRequestState() { p.requestState.Requests = &peerRequests{} } @@ -199,12 +221,17 @@ func (cn *Peer) bestPeerNumPieces() pieceIndex { return cn.peerMinPieces } -func (cn *Peer) completedString() string { +// How many pieces we think the peer has. +func (cn *Peer) remotePieceCount() pieceIndex { have := pieceIndex(cn.peerPieces().GetCardinality()) if all, _ := cn.peerHasAllPieces(); all { have = cn.bestPeerNumPieces() } - return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces()) + return have +} + +func (cn *Peer) completedString() string { + return fmt.Sprintf("%d/%d", cn.remotePieceCount(), cn.bestPeerNumPieces()) } func eventAgeString(t time.Time) string { @@ -237,6 +264,10 @@ func (cn *Peer) statusFlags() (ret string) { return } +func (cn *Peer) StatusFlags() string { + return cn.statusFlags() +} + func (cn *Peer) downloadRate() float64 { num := cn._stats.BytesReadUsefulData.Int64() if num == 0 { @@ -245,11 +276,14 @@ func (cn *Peer) downloadRate() float64 { return float64(num) / cn.totalExpectingTime().Seconds() } -func (p *Peer) DownloadRate() float64 { - p.locker().RLock() - defer p.locker().RUnlock() - - return p.downloadRate() +func (cn *Peer) UploadRate() float64 { + cn.locker().RLock() + defer cn.locker().RUnlock() + num := cn._stats.BytesWrittenData.Int64() + if num == 0 { + return 0 + } + return float64(num) / time.Now().Sub(cn.completedHandshake).Seconds() } func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) { @@ -325,7 +359,7 @@ func (p *Peer) close() { for _, prs := range p.peerRequests { prs.allocReservation.Drop() } - p.peerImpl.onClose() + p.legacyPeerImpl.onClose() if p.t != nil { p.t.decPeerPieceAvailability(p) } @@ -465,7 +499,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { for _, f := range cn.callbacks.SentRequest { f(PeerRequestEvent{cn, ppReq}) } - return cn.peerImpl._request(ppReq), nil + return cn.legacyPeerImpl._request(ppReq), nil } func (me *Peer) cancel(r RequestIndex) { @@ -480,12 +514,12 @@ func (me *Peer) cancel(r RequestIndex) { } me.decPeakRequests() if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") + me.updateRequests(peerUpdateRequestsPeerCancelReason) } } // Sets a reason to update requests, and if there wasn't already one, handle it. -func (cn *Peer) updateRequests(reason string) { +func (cn *Peer) updateRequests(reason updateRequestReason) { if cn.needRequestUpdate != "" { return } @@ -520,7 +554,7 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { // connection. func (cn *Peer) postHandshakeStats(f func(*ConnStats)) { t := cn.t - f(&t.stats) + f(&t.connStats) f(&t.cl.connStats) } @@ -534,6 +568,14 @@ func (cn *Peer) allStats(f func(*ConnStats)) { } } +func (cn *Peer) Stats() *ConnStats { + return cn.stats() +} + +func (cn *Peer) CompletedString() string { + return cn.completedString() +} + func (cn *Peer) readBytes(n int64) { cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } @@ -574,7 +616,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return false } if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") + c.updateRequests(peerUpdateRequestsRemoteRejectReason) } c.decExpectedChunkReceive(r) return true @@ -597,7 +639,7 @@ func (c *Peer) doChunkReadStats(size int64) { // Handle a received chunk from a peer. func (c *Peer) receiveChunk(msg *pp.Message) error { - chunksReceived.Add("total", 1) + ChunksReceived.Add("total", 1) ppReq := newRequestFromMessage(msg) t := c.t @@ -617,17 +659,17 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { defer recordBlockForSmartBan() if c.peerChoking { - chunksReceived.Add("while choked", 1) + ChunksReceived.Add("while choked", 1) } if c.validReceiveChunks[req] <= 0 { - chunksReceived.Add("unexpected", 1) + ChunksReceived.Add("unexpected", 1) return errors.New("received unexpected chunk") } c.decExpectedChunkReceive(req) if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) { - chunksReceived.Add("due to allowed fast", 1) + ChunksReceived.Add("due to allowed fast", 1) } // The request needs to be deleted immediately to prevent cancels occurring asynchronously when @@ -650,7 +692,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.updateRequests("Peer.receiveChunk deleted request") } } else { - chunksReceived.Add("unintended", 1) + ChunksReceived.Add("unintended", 1) } } @@ -659,7 +701,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Do we actually want this chunk? if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) - chunksReceived.Add("redundant", 1) + ChunksReceived.Add("redundant", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } @@ -786,7 +828,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { return true } -func (c *Peer) deleteAllRequests(reason string) { +func (c *Peer) deleteAllRequests(reason updateRequestReason) { if c.requestState.Requests.IsEmpty() { return } @@ -843,8 +885,8 @@ type connectionTrust struct { NetGoodPiecesDirted int64 } -func (l connectionTrust) Less(r connectionTrust) bool { - return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less() +func (l connectionTrust) Cmp(r connectionTrust) int { + return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).OrderingInt() } // Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims. @@ -866,7 +908,7 @@ func (cn *Peer) stats() *ConnStats { } func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { - pc, ok := p.peerImpl.(*PeerConn) + pc, ok := p.legacyPeerImpl.(*PeerConn) return pc, ok }