]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'master' into pull-writer
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Sep 2017 08:32:40 +0000 (18:32 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 12 Sep 2017 08:32:40 +0000 (18:32 +1000)
client.go
client_test.go
cmd/torrent/main.go
connection.go
connection_test.go
global.go
peer_protocol/protocol.go
ratelimitreader.go
torrent.go

index d81697b85423c2bd186c7f6177c7f1653ba3f346..41fa89e289561008198d600d9530d444b90aeb3f 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1035,26 +1035,6 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
        }
 }
 
-func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
-       conn.updateRequests()
-}
-
-func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
-       ok = cn.Cancel(r)
-       if ok {
-               postedCancels.Add(1)
-       }
-       return
-}
-
-func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
-       if !cn.RequestPending(r) {
-               return false
-       }
-       delete(cn.Requests, r)
-       return true
-}
-
 // Process incoming ut_metadata message.
 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
        var d map[string]int
@@ -1386,6 +1366,7 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) {
                PeerChoked:      true,
                PeerMaxRequests: 250,
        }
+       c.writerCond.L = &cl.mu
        c.setRW(connStatsReadWriter{nc, &cl.mu, c})
        c.r = rateLimitedReader{cl.downloadLimit, c.r}
        return
index e786b3994257fb437f618e1616d2a8b0f21373c2..91ea329c93643ecea095e26da26dc2003524c0d8 100644 (file)
@@ -167,9 +167,7 @@ func TestReducedDialTimeout(t *testing.T) {
 
 func TestUTPRawConn(t *testing.T) {
        l, err := NewUtpSocket("udp", "")
-       if err != nil {
-               t.Fatal(err)
-       }
+       require.NoError(t, err)
        defer l.Close()
        go func() {
                for {
@@ -180,17 +178,14 @@ func TestUTPRawConn(t *testing.T) {
                }
        }()
        // Connect a UTP peer to see if the RawConn will still work.
-       s, _ := NewUtpSocket("udp", "")
+       s, err := NewUtpSocket("udp", "")
+       require.NoError(t, err)
        defer s.Close()
        utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
-       if err != nil {
-               t.Fatalf("error dialing utp listener: %s", err)
-       }
+       require.NoError(t, err)
        defer utpPeer.Close()
        peer, err := net.ListenPacket("udp", ":0")
-       if err != nil {
-               t.Fatal(err)
-       }
+       require.NoError(t, err)
        defer peer.Close()
 
        msgsReceived := 0
@@ -204,27 +199,19 @@ func TestUTPRawConn(t *testing.T) {
                b := make([]byte, 500)
                for i := 0; i < N; i++ {
                        n, _, err := l.ReadFrom(b)
-                       if err != nil {
-                               t.Fatalf("error reading from raw conn: %s", err)
-                       }
+                       require.NoError(t, err)
                        msgsReceived++
                        var d int
                        fmt.Sscan(string(b[:n]), &d)
-                       if d != i {
-                               log.Printf("got wrong number: expected %d, got %d", i, d)
-                       }
+                       assert.Equal(t, i, d)
                }
        }()
        udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
-       if err != nil {
-               t.Fatal(err)
-       }
+       require.NoError(t, err)
        for i := 0; i < N; i++ {
                _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
-               if err != nil {
-                       t.Fatal(err)
-               }
-               time.Sleep(time.Microsecond)
+               require.NoError(t, err)
+               time.Sleep(time.Millisecond)
        }
        select {
        case <-readerStopped:
index bf21b22b5110c328bdec31cd0d88f5276133abb1..bb614264a0bd26986e123079d55bbaee075da61a 100644 (file)
@@ -2,6 +2,7 @@
 package main
 
 import (
+       "expvar"
        "fmt"
        "log"
        "net"
@@ -11,7 +12,7 @@ import (
        "time"
 
        "github.com/anacrolix/dht"
-       "github.com/anacrolix/envpprof"
+       "github.com/anacrolix/envpprof"
        "github.com/anacrolix/tagflag"
        "github.com/dustin/go-humanize"
        "github.com/gosuri/uiprogress"
@@ -139,6 +140,7 @@ var flags = struct {
        Addr         *net.TCPAddr   `help:"network listen addr"`
        UploadRate   tagflag.Bytes  `help:"max piece bytes to send per second"`
        DownloadRate tagflag.Bytes  `help:"max bytes per second down from peers"`
+       Debug        bool
        tagflag.StartPos
        Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
 }{
@@ -191,4 +193,8 @@ func main() {
        if flags.Seed {
                select {}
        }
+       expvar.Do(func(kv expvar.KeyValue) {
+               fmt.Printf("%s: %s\n", kv.Key, kv.Value)
+       })
+       envpprof.Stop()
 }
index 0b1fed8173834269fa438307604eac33ed2bc858..f519ac6e3850c5b9fbc52c0f484fa750f13a1bec 100644 (file)
@@ -3,9 +3,7 @@ package torrent
 import (
        "bufio"
        "bytes"
-       "container/list"
        "errors"
-       "expvar"
        "fmt"
        "io"
        "log"
@@ -25,8 +23,6 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
-var optimizedCancels = expvar.NewInt("optimizedCancels")
-
 type peerSource string
 
 const (
@@ -67,7 +63,7 @@ type connection struct {
        // Stuff controlled by the local peer.
        Interested       bool
        Choked           bool
-       Requests         map[request]struct{}
+       requests         map[request]struct{}
        requestsLowWater int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
@@ -99,8 +95,8 @@ type connection struct {
        pieceInclination  []int
        pieceRequestOrder prioritybitmap.PriorityBitmap
 
-       outgoingUnbufferedMessages         *list.List
-       outgoingUnbufferedMessagesNotEmpty missinggo.Event
+       postedBuffer bytes.Buffer
+       writerCond   sync.Cond
 }
 
 func (cn *connection) mu() sync.Locker {
@@ -205,7 +201,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                cn.UsefulChunksReceived,
                cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
                cn.chunksSent,
-               len(cn.Requests),
+               cn.numLocalRequests(),
                len(cn.PeerRequests),
                cn.statusFlags(),
        )
@@ -238,28 +234,9 @@ func (cn *connection) PeerHasPiece(piece int) bool {
 }
 
 func (cn *connection) Post(msg pp.Message) {
-       switch msg.Type {
-       case pp.Cancel:
-               for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
-                       elemMsg := e.Value.(pp.Message)
-                       if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
-                               cn.outgoingUnbufferedMessages.Remove(e)
-                               optimizedCancels.Add(1)
-                               return
-                       }
-               }
-       }
-       if cn.outgoingUnbufferedMessages == nil {
-               cn.outgoingUnbufferedMessages = list.New()
-       }
-       cn.outgoingUnbufferedMessages.PushBack(msg)
-       cn.outgoingUnbufferedMessagesNotEmpty.Set()
-       postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
-}
-
-func (cn *connection) RequestPending(r request) bool {
-       _, ok := cn.Requests[r]
-       return ok
+       messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+       cn.postedBuffer.Write(msg.MustMarshalBinary())
+       cn.tickleWriter()
 }
 
 func (cn *connection) requestMetadataPiece(index int) {
@@ -303,50 +280,6 @@ func (cn *connection) nominalMaxRequests() (ret int) {
        return
 }
 
-// Returns true if more requests can be sent.
-func (cn *connection) Request(chunk request) bool {
-       if len(cn.Requests) >= cn.nominalMaxRequests() {
-               return false
-       }
-       if !cn.PeerHasPiece(int(chunk.Index)) {
-               return true
-       }
-       if cn.RequestPending(chunk) {
-               return true
-       }
-       cn.SetInterested(true)
-       if cn.PeerChoked {
-               return false
-       }
-       if cn.Requests == nil {
-               cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
-       }
-       cn.Requests[chunk] = struct{}{}
-       cn.requestsLowWater = len(cn.Requests) / 2
-       cn.Post(pp.Message{
-               Type:   pp.Request,
-               Index:  chunk.Index,
-               Begin:  chunk.Begin,
-               Length: chunk.Length,
-       })
-       return true
-}
-
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) Cancel(r request) bool {
-       if !cn.RequestPending(r) {
-               return false
-       }
-       delete(cn.Requests, r)
-       cn.Post(pp.Message{
-               Type:   pp.Cancel,
-               Index:  r.Index,
-               Begin:  r.Begin,
-               Length: r.Length,
-       })
-       return true
-}
-
 // Returns true if an unsatisfied request was canceled.
 func (cn *connection) PeerCancel(r request) bool {
        if cn.PeerRequests == nil {
@@ -380,11 +313,13 @@ func (cn *connection) Unchoke() {
        cn.Choked = false
 }
 
-func (cn *connection) SetInterested(interested bool) {
+func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
        if cn.Interested == interested {
-               return
+               return true
        }
-       cn.Post(pp.Message{
+       cn.Interested = interested
+       // log.Printf("%p: setting interest: %v", cn, interested)
+       return msg(pp.Message{
                Type: func() pp.MessageType {
                        if interested {
                                return pp.Interested
@@ -393,66 +328,105 @@ func (cn *connection) SetInterested(interested bool) {
                        }
                }(),
        })
-       cn.Interested = interested
 }
 
-var (
-       // Track connection writer buffer writes and flushes, to determine its
-       // efficiency.
-       connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
-       connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
-)
+func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
+       numFillBuffers.Add(1)
+       cancel, new, i := cn.desiredRequestState()
+       if !cn.SetInterested(i, msg) {
+               return
+       }
+       if cancel && len(cn.requests) != 0 {
+               fillBufferSentCancels.Add(1)
+               for r := range cn.requests {
+                       cn.deleteRequest(r)
+                       // log.Printf("%p: cancelling request: %v", cn, r)
+                       if !msg(pp.Message{
+                               Type:   pp.Cancel,
+                               Index:  r.Index,
+                               Begin:  r.Begin,
+                               Length: r.Length,
+                       }) {
+                               return
+                       }
+               }
+       }
+       if len(new) != 0 {
+               fillBufferSentRequests.Add(1)
+               for _, r := range new {
+                       if cn.requests == nil {
+                               cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
+                       }
+                       cn.requests[r] = struct{}{}
+                       // log.Printf("%p: requesting %v", cn, r)
+                       if !msg(pp.Message{
+                               Type:   pp.Request,
+                               Index:  r.Index,
+                               Begin:  r.Begin,
+                               Length: r.Length,
+                       }) {
+                               return
+                       }
+               }
+               // If we didn't completely top up the requests, we shouldn't mark the
+               // low water, since we'll want to top up the requests as soon as we
+               // have more write buffer space.
+               cn.requestsLowWater = len(cn.requests) / 2
+       }
+}
 
 // Writes buffers to the socket from the write channel.
 func (cn *connection) writer(keepAliveTimeout time.Duration) {
-       defer func() {
+       var (
+               buf       bytes.Buffer
+               lastWrite time.Time = time.Now()
+       )
+       var keepAliveTimer *time.Timer
+       keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
                cn.mu().Lock()
                defer cn.mu().Unlock()
-               cn.Close()
-       }()
-       // Reduce write syscalls.
-       buf := bufio.NewWriter(cn.w)
-       keepAliveTimer := time.NewTimer(keepAliveTimeout)
+               if time.Since(lastWrite) >= keepAliveTimeout {
+                       cn.tickleWriter()
+               }
+               keepAliveTimer.Reset(keepAliveTimeout)
+       })
+       cn.mu().Lock()
+       defer cn.mu().Unlock()
+       defer cn.Close()
+       defer keepAliveTimer.Stop()
        for {
-               cn.mu().Lock()
-               for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
-                       msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
-                       cn.mu().Unlock()
-                       b, err := msg.MarshalBinary()
-                       if err != nil {
-                               panic(err)
-                       }
-                       connectionWriterWrite.Add(1)
-                       n, err := buf.Write(b)
-                       if err != nil {
-                               return
-                       }
-                       keepAliveTimer.Reset(keepAliveTimeout)
-                       if n != len(b) {
-                               panic("short write")
-                       }
-                       cn.mu().Lock()
-                       cn.wroteMsg(&msg)
+               buf.Write(cn.postedBuffer.Bytes())
+               cn.postedBuffer.Reset()
+               if buf.Len() == 0 {
+                       cn.fillWriteBuffer(func(msg pp.Message) bool {
+                               cn.wroteMsg(&msg)
+                               buf.Write(msg.MustMarshalBinary())
+                               return buf.Len() < 1<<16
+                       })
+               }
+               if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
+                       buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
+                       postedKeepalives.Add(1)
+               }
+               if buf.Len() == 0 {
+                       cn.writerCond.Wait()
+                       continue
                }
-               cn.outgoingUnbufferedMessagesNotEmpty.Clear()
                cn.mu().Unlock()
-               connectionWriterFlush.Add(1)
-               if buf.Buffered() != 0 {
-                       if buf.Flush() != nil {
-                               return
-                       }
+               // log.Printf("writing %d bytes", buf.Len())
+               n, err := cn.w.Write(buf.Bytes())
+               cn.mu().Lock()
+               if n != 0 {
+                       lastWrite = time.Now()
                        keepAliveTimer.Reset(keepAliveTimeout)
                }
-               select {
-               case <-cn.closed.LockedChan(cn.mu()):
+               if err != nil {
                        return
-               case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
-               case <-keepAliveTimer.C:
-                       cn.mu().Lock()
-                       cn.Post(pp.Message{Keepalive: true})
-                       cn.mu().Unlock()
-                       postedKeepalives.Add(1)
                }
+               if n != buf.Len() {
+                       panic("short write")
+               }
+               buf.Reset()
        }
 }
 
@@ -493,58 +467,47 @@ func nextRequestState(
        requestsLowWater int,
        requestsHighWater int,
 ) (
-       requests map[request]struct{},
+       cancelExisting bool,
+       newRequests []request,
        interested bool,
 ) {
        if !networkingEnabled || nextPieces.IsEmpty() {
-               return nil, false
+               return true, nil, false
        }
        if peerChoking || len(currentRequests) > requestsLowWater {
-               return currentRequests, true
-       }
-       requests = make(map[request]struct{}, requestsHighWater)
-       for r := range currentRequests {
-               requests[r] = struct{}{}
+               return false, nil, !nextPieces.IsEmpty()
        }
        nextPieces.IterTyped(func(piece int) bool {
                return pendingChunks(piece, func(cs chunkSpec) bool {
-                       if len(requests) >= requestsHighWater {
-                               return false
-                       }
                        r := request{pp.Integer(piece), cs}
-                       requests[r] = struct{}{}
-                       return true
+                       if _, ok := currentRequests[r]; !ok {
+                               if newRequests == nil {
+                                       newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
+                               }
+                               newRequests = append(newRequests, r)
+                       }
+                       return len(currentRequests)+len(newRequests) < requestsHighWater
                })
        })
-       return requests, true
+       return false, newRequests, true
 }
 
 func (cn *connection) updateRequests() {
-       rs, i := nextRequestState(
+       cn.tickleWriter()
+}
+
+func (cn *connection) desiredRequestState() (bool, []request, bool) {
+       return nextRequestState(
                cn.t.networkingEnabled,
-               cn.Requests,
+               cn.requests,
                cn.PeerChoked,
                &cn.pieceRequestOrder,
                func(piece int, f func(chunkSpec) bool) bool {
                        return undirtiedChunks(piece, cn.t, f)
                },
                cn.requestsLowWater,
-               cn.nominalMaxRequests())
-       for r := range cn.Requests {
-               if _, ok := rs[r]; !ok {
-                       if !cn.Cancel(r) {
-                               panic("wat")
-                       }
-               }
-       }
-       for r := range rs {
-               if _, ok := cn.Requests[r]; !ok {
-                       if !cn.Request(r) {
-                               panic("how")
-                       }
-               }
-       }
-       cn.SetInterested(i)
+               cn.nominalMaxRequests(),
+       )
 }
 
 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
@@ -554,23 +517,22 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
        })
 }
 
-func (cn *connection) stopRequestingPiece(piece int) {
-       cn.pieceRequestOrder.Remove(piece)
-       cn.updateRequests()
+// check callers updaterequests
+func (cn *connection) stopRequestingPiece(piece int) bool {
+       return cn.pieceRequestOrder.Remove(piece)
 }
 
 // This is distinct from Torrent piece priority, which is the user's
 // preference. Connection piece priority is specific to a connection,
 // pseudorandomly avoids connections always requesting the same pieces and
 // thus wasting effort.
-func (cn *connection) updatePiecePriority(piece int) {
+func (cn *connection) updatePiecePriority(piece int) bool {
        tpp := cn.t.piecePriority(piece)
        if !cn.PeerHasPiece(piece) {
                tpp = PiecePriorityNone
        }
        if tpp == PiecePriorityNone {
-               cn.stopRequestingPiece(piece)
-               return
+               return cn.stopRequestingPiece(piece)
        }
        prio := cn.getPieceInclination()[piece]
        switch tpp {
@@ -583,8 +545,7 @@ func (cn *connection) updatePiecePriority(piece int) {
                panic(tpp)
        }
        prio += piece / 3
-       cn.pieceRequestOrder.Set(piece, prio)
-       cn.updateRequests()
+       return cn.pieceRequestOrder.Set(piece, prio)
 }
 
 func (cn *connection) getPieceInclination() []int {
@@ -602,14 +563,16 @@ func (cn *connection) discardPieceInclination() {
        cn.pieceInclination = nil
 }
 
-func (cn *connection) peerHasPieceChanged(piece int) {
-       cn.updatePiecePriority(piece)
-}
-
 func (cn *connection) peerPiecesChanged() {
        if cn.t.haveInfo() {
+               prioritiesChanged := false
                for i := range iter.N(cn.t.numPieces()) {
-                       cn.peerHasPieceChanged(i)
+                       if cn.updatePiecePriority(i) {
+                               prioritiesChanged = true
+                       }
+               }
+               if prioritiesChanged {
+                       cn.updateRequests()
                }
        }
 }
@@ -629,7 +592,9 @@ func (cn *connection) peerSentHave(piece int) error {
        }
        cn.raisePeerMinPieces(piece + 1)
        cn.peerPieces.Set(piece, true)
-       cn.peerHasPieceChanged(piece)
+       if cn.updatePiecePriority(piece) {
+               cn.updateRequests()
+       }
        return nil
 }
 
@@ -690,6 +655,7 @@ func (c *connection) requestPendingMetadata() {
 }
 
 func (cn *connection) wroteMsg(msg *pp.Message) {
+       messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
        cn.stats.wroteMsg(msg)
        cn.t.stats.wroteMsg(msg)
 }
@@ -749,10 +715,15 @@ func (c *connection) mainReadLoop() error {
                Pool:      t.chunkPool,
        }
        for {
-               cl.mu.Unlock()
-               var msg pp.Message
-               err := decoder.Decode(&msg)
-               cl.mu.Lock()
+               var (
+                       msg pp.Message
+                       err error
+               )
+               func() {
+                       cl.mu.Unlock()
+                       defer cl.mu.Lock()
+                       err = decoder.Decode(&msg)
+               }()
                if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
                        return nil
                }
@@ -765,19 +736,20 @@ func (c *connection) mainReadLoop() error {
                        receivedKeepalives.Add(1)
                        continue
                }
-               receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+               messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
                switch msg.Type {
                case pp.Choke:
                        c.PeerChoked = true
-                       c.Requests = nil
+                       c.requests = nil
                        // We can then reset our interest.
                        c.updateRequests()
                case pp.Reject:
-                       cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
-                       c.updateRequests()
+                       if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
+                               c.updateRequests()
+                       }
                case pp.Unchoke:
                        c.PeerChoked = false
-                       cl.peerUnchoked(t, c)
+                       c.tickleWriter()
                case pp.Interested:
                        c.PeerInterested = true
                        c.upload()
@@ -975,8 +947,8 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
 
        // Request has been satisfied.
-       if cl.connDeleteRequest(t, c, req) {
-               defer c.updateRequests()
+       if c.deleteRequest(req) {
+               c.updateRequests()
        } else {
                unexpectedChunksReceived.Add(1)
        }
@@ -1004,9 +976,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        // Cancel pending requests for this chunk.
        for c := range t.conns {
-               if cl.connCancel(t, c, req) {
-                       c.updateRequests()
-               }
+               c.updateRequests()
        }
 
        cl.mu.Unlock()
@@ -1111,3 +1081,18 @@ func (cn *connection) netGoodPiecesDirtied() int {
 func (c *connection) peerHasWantedPieces() bool {
        return !c.pieceRequestOrder.IsEmpty()
 }
+
+func (c *connection) numLocalRequests() int {
+       return len(c.requests)
+}
+
+func (c *connection) deleteRequest(r request) bool {
+       if _, ok := c.requests[r]; !ok {
+               return false
+       }
+       delete(c.requests, r)
+       return true
+}
+func (c *connection) tickleWriter() {
+       c.writerCond.Broadcast()
+}
index 151fb57c00899fb833b10b3f24b60f43a55127ee..4f1d715b5196082a9e070a1f674dded9df03cdd3 100644 (file)
@@ -1,15 +1,11 @@
 package torrent
 
 import (
-       "container/list"
        "io"
-       "io/ioutil"
-       "net"
        "sync"
        "testing"
        "time"
 
-       "github.com/anacrolix/missinggo/bitmap"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
        "github.com/stretchr/testify/assert"
@@ -20,49 +16,6 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-func TestCancelRequestOptimized(t *testing.T) {
-       r, w := io.Pipe()
-       c := &connection{
-               PeerMaxRequests: 1,
-               peerPieces: func() bitmap.Bitmap {
-                       var bm bitmap.Bitmap
-                       bm.Set(1, true)
-                       return bm
-               }(),
-               w:    w,
-               conn: new(net.TCPConn),
-               // For the locks
-               t: &Torrent{cl: &Client{}},
-       }
-       assert.Len(t, c.Requests, 0)
-       c.Request(newRequest(1, 2, 3))
-       require.Len(t, c.Requests, 1)
-       // Posting this message should removing the pending Request.
-       require.True(t, c.Cancel(newRequest(1, 2, 3)))
-       assert.Len(t, c.Requests, 0)
-       // Check that write optimization filters out the Request, due to the
-       // Cancel. We should have received an Interested, due to the initial
-       // request, and then keep-alives until we close the connection.
-       go c.writer(0)
-       b := make([]byte, 9)
-       n, err := io.ReadFull(r, b)
-       require.NoError(t, err)
-       require.EqualValues(t, len(b), n)
-       require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
-       time.Sleep(time.Millisecond)
-       c.mu().Lock()
-       c.Close()
-       c.mu().Unlock()
-       w.Close()
-       b, err = ioutil.ReadAll(r)
-       require.NoError(t, err)
-       // A single keep-alive may have gone through, as writer would be stuck
-       // trying to flush it, and then promptly close.
-       if s := string(b); s != "\x00\x00\x00\x00" && s != "" {
-               t.Logf("expected zero or one keepalives, got %q", s)
-       }
-}
-
 // Ensure that no race exists between sending a bitfield, and a subsequent
 // Have that would potentially alter it.
 func TestSendBitfieldThenHave(t *testing.T) {
@@ -73,8 +26,8 @@ func TestSendBitfieldThenHave(t *testing.T) {
                },
                r: r,
                w: w,
-               outgoingUnbufferedMessages: list.New(),
        }
+       c.writerCond.L = &c.t.cl.mu
        go c.writer(time.Minute)
        c.mu().Lock()
        c.Bitfield([]bool{false, true, false})
index 0d2411283568081ead4d4e04ecba14f83691c696..fe1901cbbfa94b33e1de5f26359fef688d3fc626 100644 (file)
--- a/global.go
+++ b/global.go
@@ -63,7 +63,6 @@ var (
 
        uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
        unexpectedCancels  = expvar.NewInt("unexpectedCancels")
-       postedCancels      = expvar.NewInt("postedCancels")
 
        pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
        pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
@@ -81,16 +80,22 @@ var (
        connsToSelf = expvar.NewInt("connsToSelf")
        // Number of completed connections to a client we're already connected with.
        duplicateClientConns       = expvar.NewInt("duplicateClientConns")
-       receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
        receivedKeepalives         = expvar.NewInt("receivedKeepalives")
        supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
-       postedMessageTypes         = expvar.NewMap("postedMessageTypes")
        postedKeepalives           = expvar.NewInt("postedKeepalives")
        // Requests received for pieces we don't have.
        requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces")
 
+       messageTypesReceived = expvar.NewMap("messageTypesReceived")
+       messageTypesSent     = expvar.NewMap("messageTypesSent")
+       messageTypesPosted   = expvar.NewMap("messageTypesPosted")
+
        // Track the effectiveness of Torrent.connPieceInclinationPool.
        pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
        pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
        pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
+
+       fillBufferSentCancels  = expvar.NewInt("fillBufferSentCancels")
+       fillBufferSentRequests = expvar.NewInt("fillBufferSentRequests")
+       numFillBuffers         = expvar.NewInt("numFillBuffers")
 )
index f1f392f9a9c414d3ebb5702b38f77db29ff85c62..19db34fb259d49cc4a09b600ab31ef3443aa0096 100644 (file)
@@ -68,6 +68,14 @@ type Message struct {
        Port                 uint16
 }
 
+func (msg Message) MustMarshalBinary() []byte {
+       b, err := msg.MarshalBinary()
+       if err != nil {
+               panic(err)
+       }
+       return b
+}
+
 func (msg Message) MarshalBinary() (data []byte, err error) {
        buf := &bytes.Buffer{}
        if !msg.Keepalive {
index a8e3483b14556b0090cb21009e0abe07bef2526e..d7939a3d3f6c08207ebaf2acecbc6c7cbd40dd96 100644 (file)
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "fmt"
        "io"
        "time"
 
@@ -14,12 +15,18 @@ type rateLimitedReader struct {
 }
 
 func (me rateLimitedReader) Read(b []byte) (n int, err error) {
+       // Wait until we can read at all.
        if err := me.l.WaitN(context.Background(), 1); err != nil {
                panic(err)
        }
+       // Limit the read to within the burst.
+       if me.l.Limit() != rate.Inf && len(b) > me.l.Burst() {
+               b = b[:me.l.Burst()]
+       }
        n, err = me.r.Read(b)
+       // Pay the piper.
        if !me.l.ReserveN(time.Now(), n-1).OK() {
-               panic(n - 1)
+               panic(fmt.Sprintf("burst exceeded?: %d", n-1))
        }
        return
 }
index eb4f7a7da06b8e7d3fdb602533f01ff1085a4187..7f04e5f3aae1c62568fef25a2cefc1e4124517f8 100644 (file)
@@ -798,7 +798,9 @@ func (t *Torrent) maybeNewConns() {
 
 func (t *Torrent) piecePriorityChanged(piece int) {
        for c := range t.conns {
-               c.updatePiecePriority(piece)
+               if c.updatePiecePriority(piece) {
+                       c.updateRequests()
+               }
        }
        t.maybeNewConns()
        t.publishPieceChange(piece)
@@ -1422,16 +1424,18 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
        }
 }
 
+func (t *Torrent) cancelRequestsForPiece(piece int) {
+       for cn := range t.conns {
+               cn.tickleWriter()
+       }
+}
+
 func (t *Torrent) onPieceCompleted(piece int) {
        t.pendingPieces.Remove(piece)
        t.pendAllChunkSpecs(piece)
+       t.cancelRequestsForPiece(piece)
        for conn := range t.conns {
                conn.Have(piece)
-               for r := range conn.Requests {
-                       if int(r.Index) == piece {
-                               conn.Cancel(r)
-                       }
-               }
                // Could check here if peer doesn't have piece, but due to caching
                // some peers may have said they have a piece but they don't.
                conn.upload()