]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'dev'
authorMatt Joiner <anacrolix@gmail.com>
Fri, 22 Jun 2018 07:43:51 +0000 (17:43 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 22 Jun 2018 07:43:51 +0000 (17:43 +1000)
23 files changed:
.circleci/config.yml
bencode/decode.go
client.go
client_test.go
cmd/torrent-metainfo-pprint/main.go
cmd/torrent-pick/main.go
cmd/torrent/main.go
cmd/torrentfs/main.go
config.go
conn_stats.go
connection.go
connection_test.go
fs/torrentfs_test.go
global.go
misc.go
misc_test.go
mse/mse.go
multiless.go [new file with mode: 0644]
network_test.go [new file with mode: 0644]
socket.go
torrent.go
torrent_test.go
worst_conns.go

index f3ac8c0c9fbea5d09c017ef7d78176a67157b52f..d01aafdc68763cb556e933f691ae86442e7c1d6e 100644 (file)
@@ -25,7 +25,7 @@ jobs:
       - run: go test -v -race $PROJECT_GO_PACKAGE/...
       - run: go test -bench . $PROJECT_GO_PACKAGE/...
       - run: CGO_ENABLED=0 go get -t -d -v $PROJECT_GO_PACKAGE/...
-      - run: CGO_ENABLED=0 go test -v $PROJECT_GO_PACKAGE/... || true
+      - run: set +e; CGO_ENABLED=0 go test -v $PROJECT_GO_PACKAGE/...; true
       - run: go get golang.org/x/mobile/cmd/gomobile
       - run: gomobile init
       - run: gomobile build -target=android $PROJECT_GO_PACKAGE
index 43f3b0e3a580adb6bf640f7596e148f104b5fdab..657cebb0a08fa0c5039aa15558fc7c616a4d823b 100644 (file)
@@ -10,6 +10,7 @@ import (
        "runtime"
        "strconv"
        "strings"
+       "unsafe"
 )
 
 type Decoder struct {
@@ -404,7 +405,11 @@ func (d *Decoder) readOneValue() bool {
                if b >= '0' && b <= '9' {
                        start := d.buf.Len() - 1
                        d.readUntil(':')
-                       length, err := strconv.ParseInt(d.buf.String()[start:], 10, 64)
+                       s := reflect.StringHeader{
+                               uintptr(unsafe.Pointer(&d.buf.Bytes()[start])),
+                               d.buf.Len() - start,
+                       }
+                       length, err := strconv.ParseInt(*(*string)(unsafe.Pointer(&s)), 10, 64)
                        checkForIntParseError(err, d.Offset-1)
 
                        d.buf.WriteString(":")
index f5a82300de051abc625a38b3864b77efb6f99147..705354ba7fa0153f5d257731ea9531f506556acc 100644 (file)
--- a/client.go
+++ b/client.go
@@ -14,6 +14,8 @@ import (
        "strings"
        "time"
 
+       "github.com/anacrolix/missinggo/perf"
+
        "github.com/anacrolix/dht"
        "github.com/anacrolix/dht/krpc"
        "github.com/anacrolix/log"
@@ -22,9 +24,9 @@ import (
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/sync"
+       "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        "github.com/google/btree"
-       "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/iplist"
@@ -41,7 +43,7 @@ type Client struct {
        event  sync.Cond
        closed missinggo.Event
 
-       config Config
+       config *ClientConfig
        logger *log.Logger
 
        halfOpenLimit  int
@@ -53,8 +55,6 @@ type Client struct {
        ipBlockList    iplist.Ranger
        // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
        extensionBytes peerExtensionBytes
-       uploadLimit    *rate.Limiter
-       downloadLimit  *rate.Limiter
 
        // Set of addresses that have our client ID. This intentionally will
        // include ourselves if we end up trying to connect to our own address
@@ -62,8 +62,14 @@ type Client struct {
        dopplegangerAddrs map[string]struct{}
        badPeerIPs        map[string]struct{}
        torrents          map[metainfo.Hash]*Torrent
+       // An aggregate of stats over all connections.
+       stats ConnStats
+
+       acceptLimiter map[ipStr]int
 }
 
+type ipStr string
+
 func (cl *Client) BadPeerIPs() []string {
        cl.mu.RLock()
        defer cl.mu.RUnlock()
@@ -111,8 +117,8 @@ func writeDhtServerStatus(w io.Writer, s *dht.Server) {
 // Writes out a human readable status of the client, such as for writing to a
 // HTTP status page.
 func (cl *Client) WriteStatus(_w io.Writer) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.mu.RLock()
+       defer cl.mu.RUnlock()
        w := bufio.NewWriter(_w)
        defer w.Flush()
        fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
@@ -123,6 +129,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
                writeDhtServerStatus(w, s)
        })
+       spew.Fdump(w, cl.stats)
        fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
        fmt.Fprintln(w)
        for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
@@ -163,12 +170,10 @@ func (cl *Client) announceKey() int32 {
        return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
 }
 
-func NewClient(cfg *Config) (cl *Client, err error) {
+func NewClient(cfg *ClientConfig) (cl *Client, err error) {
        if cfg == nil {
-               cfg = &Config{}
+               cfg = NewDefaultClientConfig()
        }
-       cfg.setDefaults()
-
        defer func() {
                if err != nil {
                        cl = nil
@@ -176,10 +181,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
        }()
        cl = &Client{
                halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
-               config:            *cfg,
+               config:            cfg,
                dopplegangerAddrs: make(map[string]struct{}),
                torrents:          make(map[metainfo.Hash]*Torrent),
        }
+       go cl.acceptLimitClearer()
        cl.initLogger()
        defer func() {
                if err == nil {
@@ -187,16 +193,6 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                }
                cl.Close()
        }()
-       if cfg.UploadRateLimiter == nil {
-               cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
-       } else {
-               cl.uploadLimit = cfg.UploadRateLimiter
-       }
-       if cfg.DownloadRateLimiter == nil {
-               cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
-       } else {
-               cl.downloadLimit = cfg.DownloadRateLimiter
-       }
        cl.extensionBytes = defaultPeerExtensionBytes()
        cl.event.L = &cl.mu
        storageImpl := cfg.DefaultStorage
@@ -370,19 +366,24 @@ func (cl *Client) rejectAccepted(conn net.Conn) bool {
        if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
                return true
        }
+       if cl.rateLimitAccept(rip) {
+               return true
+       }
        return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
 }
 
 func (cl *Client) acceptConnections(l net.Listener) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
        for {
-               cl.waitAccept()
-               cl.mu.Unlock()
                conn, err := l.Accept()
                conn = pproffd.WrapNetConn(conn)
-               cl.mu.Lock()
-               if cl.closed.IsSet() {
+               cl.mu.RLock()
+               closed := cl.closed.IsSet()
+               reject := false
+               if conn != nil {
+                       reject = cl.rejectAccepted(conn)
+               }
+               cl.mu.RUnlock()
+               if closed {
                        if conn != nil {
                                conn.Close()
                        }
@@ -394,16 +395,18 @@ func (cl *Client) acceptConnections(l net.Listener) {
                        // routine just fucked off.
                        return
                }
-               log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
-               go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
-               go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
-               go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
-               if cl.rejectAccepted(conn) {
-                       go torrent.Add("rejected accepted connections", 1)
-                       conn.Close()
-               } else {
-                       go cl.incomingConnection(conn)
-               }
+               go func() {
+                       if reject {
+                               torrent.Add("rejected accepted connections", 1)
+                               conn.Close()
+                       } else {
+                               go cl.incomingConnection(conn)
+                       }
+                       log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
+                       torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
+                       torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
+                       torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
+               }()
        }
 }
 
@@ -412,7 +415,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
-       c := cl.newConnection(nc)
+       c := cl.newConnection(nc, false)
        c.Discovery = peerSourceIncoming
        cl.runReceivedConn(c)
 }
@@ -435,9 +438,9 @@ type dialResult struct {
 
 func countDialResult(err error) {
        if err == nil {
-               successfulDials.Add(1)
+               torrent.Add("successful dials", 1)
        } else {
-               unsuccessfulDials.Add(1)
+               torrent.Add("unsuccessful dials", 1)
        }
 }
 
@@ -455,22 +458,6 @@ func (cl *Client) dopplegangerAddr(addr string) bool {
        return ok
 }
 
-func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
-       d := net.Dialer{
-               // Can't bind to the listen address, even though we intend to create an
-               // endpoint pair that is distinct. Oh well.
-
-               // LocalAddr: cl.tcpListener.Addr(),
-       }
-       c, err = d.DialContext(ctx, "tcp"+ipNetworkSuffix(!cl.config.DisableIPv4 && !cl.config.DisableIPv4Peers, !cl.config.DisableIPv6), addr)
-       countDialResult(err)
-       if err == nil {
-               c.(*net.TCPConn).SetLinger(0)
-       }
-       c = pproffd.WrapNetConn(c)
-       return
-}
-
 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
        switch {
        case allowIpv4 && allowIpv6:
@@ -490,7 +477,7 @@ func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err
 
 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
 
-func peerNetworkEnabled(network string, cfg Config) bool {
+func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
        c := func(s string) bool {
                return strings.Contains(network, s)
        }
@@ -519,6 +506,12 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
                left++
                go func() {
                        c, err := f(ctx, addr)
+                       // This is a bit optimistic, but it looks non-trivial to thread
+                       // this through the proxy code. Set it now in case we close the
+                       // connection forthwith.
+                       if tc, ok := c.(*net.TCPConn); ok {
+                               tc.SetLinger(0)
+                       }
                        countDialResult(err)
                        resCh <- dialResult{c}
                }()
@@ -535,9 +528,12 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
        }()
        var res dialResult
        // Wait for a successful connection.
-       for ; left > 0 && res.Conn == nil; left-- {
-               res = <-resCh
-       }
+       func() {
+               defer perf.ScopeTimer()()
+               for ; left > 0 && res.Conn == nil; left-- {
+                       res = <-resCh
+               }
+       }()
        // There are still incompleted dials.
        go func() {
                for ; left > 0; left-- {
@@ -564,7 +560,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
 // Performs initiator handshakes and returns a connection. Returns nil
 // *connection if no connection for valid reasons.
 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
-       c = cl.newConnection(nc)
+       c = cl.newConnection(nc, true)
        c.headerEncrypted = encryptHeader
        ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
        defer cancel()
@@ -609,7 +605,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
                return
        }
        if c != nil {
-               go torrent.Add("initiated conn with preferred header obfuscation", 1)
+               torrent.Add("initiated conn with preferred header obfuscation", 1)
                return
        }
        if cl.config.ForceEncryption {
@@ -623,7 +619,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
        // Try again with encryption if we didn't earlier, or without if we did.
        c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
        if c != nil {
-               go torrent.Add("initiated conn with fallback header obfuscation", 1)
+               torrent.Add("initiated conn with fallback header obfuscation", 1)
        }
        return
 }
@@ -648,7 +644,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
        }
        defer c.Close()
        c.Discovery = ps
-       cl.runHandshookConn(c, t, true)
+       cl.runHandshookConn(c, t)
 }
 
 // The port number for incoming peer connections. 0 if the client isn't
@@ -703,6 +699,7 @@ func (cl *Client) forSkeys(f func([]byte) bool) {
 
 // Do encryption and bittorrent handshakes as receiver.
 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
+       defer perf.ScopeTimerErr(&err)()
        var rw io.ReadWriter
        rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
        c.setRW(rw)
@@ -750,23 +747,36 @@ func (cl *Client) runReceivedConn(c *connection) {
        }
        t, err := cl.receiveHandshakes(c)
        if err != nil {
-               if cl.config.Debug {
-                       log.Printf("error receiving handshakes: %s", err)
-               }
+               log.Fmsg(
+                       "error receiving handshakes: %s", err,
+               ).AddValue(
+                       debugLogValue,
+               ).Add(
+                       "network", c.remoteAddr().Network(),
+               ).Log(cl.logger)
+               torrent.Add("error receiving handshake", 1)
+               cl.mu.Lock()
+               cl.onBadAccept(c.remoteAddr())
+               cl.mu.Unlock()
                return
        }
        if t == nil {
+               torrent.Add("received handshake for unloaded torrent", 1)
+               cl.mu.Lock()
+               cl.onBadAccept(c.remoteAddr())
+               cl.mu.Unlock()
                return
        }
+       torrent.Add("received handshake for loaded torrent", 1)
        cl.mu.Lock()
        defer cl.mu.Unlock()
-       cl.runHandshookConn(c, t, false)
+       cl.runHandshookConn(c, t)
 }
 
-func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
-       t.reconcileHandshakeStats(c)
+func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
+       c.setTorrent(t)
        if c.PeerID == cl.peerID {
-               if outgoing {
+               if c.outgoing {
                        connsToSelf.Add(1)
                        addr := c.conn.RemoteAddr().String()
                        cl.dopplegangerAddrs[addr] = struct{}{}
@@ -784,7 +794,8 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
        if connIsIpv6(c.conn) {
                torrent.Add("completed handshake over ipv6", 1)
        }
-       if !t.addConnection(c, outgoing) {
+       if err := t.addConnection(c); err != nil {
+               log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
                return
        }
        defer t.dropConnection(c)
@@ -899,8 +910,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
                        return fmt.Errorf("data has bad offset in payload: %d", begin)
                }
                t.saveMetadataPiece(piece, payload[begin:])
-               c.stats.ChunksReadUseful++
-               c.t.stats.ChunksReadUseful++
+               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
                c.lastUsefulChunkReceived = time.Now()
                return t.maybeCompleteMetadata()
        case pp.RequestMetadataExtensionMsgType:
@@ -993,11 +1003,13 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
                return
        }
        new = true
+
        t = cl.newTorrent(infoHash, specStorage)
        cl.eachDhtServer(func(s *dht.Server) {
                go t.dhtAnnouncer(s)
        })
        cl.torrents[infoHash] = t
+       cl.clearAcceptLimits()
        t.updateWantPeersEvent()
        // Tickle Client.waitAccept, new torrent may want conns.
        cl.event.Broadcast()
@@ -1141,18 +1153,19 @@ func (cl *Client) banPeerIP(ip net.IP) {
        cl.badPeerIPs[ip.String()] = struct{}{}
 }
 
-func (cl *Client) newConnection(nc net.Conn) (c *connection) {
+func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
        c = &connection{
                conn:            nc,
+               outgoing:        outgoing,
                Choked:          true,
                PeerChoked:      true,
                PeerMaxRequests: 250,
                writeBuffer:     new(bytes.Buffer),
        }
        c.writerCond.L = &cl.mu
-       c.setRW(connStatsReadWriter{nc, &cl.mu, c})
+       c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
-               l: cl.downloadLimit,
+               l: cl.config.DownloadRateLimiter,
                r: c.r,
        }
        return
@@ -1224,9 +1237,50 @@ func (cl *Client) publicAddr(peer net.IP) ipPort {
 }
 
 func (cl *Client) ListenAddrs() (ret []net.Addr) {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
        cl.eachListener(func(l socket) bool {
                ret = append(ret, l.Addr())
                return true
        })
        return
 }
+
+func (cl *Client) onBadAccept(addr net.Addr) {
+       ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
+       if cl.acceptLimiter == nil {
+               cl.acceptLimiter = make(map[ipStr]int)
+       }
+       cl.acceptLimiter[ipStr(ip.String())]++
+}
+
+func maskIpForAcceptLimiting(ip net.IP) net.IP {
+       if ip4 := ip.To4(); ip4 != nil {
+               return ip4.Mask(net.CIDRMask(24, 32))
+       }
+       return ip
+}
+
+func (cl *Client) clearAcceptLimits() {
+       cl.acceptLimiter = nil
+}
+
+func (cl *Client) acceptLimitClearer() {
+       for {
+               select {
+               case <-cl.closed.LockedChan(&cl.mu):
+                       return
+               case <-time.After(15 * time.Minute):
+                       cl.mu.Lock()
+                       cl.clearAcceptLimits()
+                       cl.mu.Unlock()
+               }
+       }
+}
+
+func (cl *Client) rateLimitAccept(ip net.IP) bool {
+       if cl.config.DisableAcceptRateLimiting {
+               return false
+       }
+       return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
+}
index b465279b7e7936cb3f4c94945fa9753f54869d21..7921f90cd6500aebe020547e6539c6745e4eef59 100644 (file)
@@ -1,14 +1,13 @@
 package torrent
 
 import (
-       "context"
        "encoding/binary"
        "fmt"
        "io"
        "io/ioutil"
-       "net"
        "os"
        "path/filepath"
+       "reflect"
        "sync"
        "testing"
        "time"
@@ -29,15 +28,15 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-func TestingConfig() *Config {
-       return &Config{
-               ListenHost:              LoopbackListenHost,
-               NoDHT:                   true,
-               DataDir:                 tempDir(),
-               DisableTrackers:         true,
-               NoDefaultPortForwarding: true,
-               // Debug:           true,
-       }
+func TestingConfig() *ClientConfig {
+       cfg := NewDefaultClientConfig()
+       cfg.ListenHost = LoopbackListenHost
+       cfg.NoDHT = true
+       cfg.DataDir = tempDir()
+       cfg.DisableTrackers = true
+       cfg.NoDefaultPortForwarding = true
+       cfg.DisableAcceptRateLimiting = true
+       return cfg
 }
 
 func TestClientDefault(t *testing.T) {
@@ -104,7 +103,9 @@ func TestPieceHashSize(t *testing.T) {
 func TestTorrentInitialState(t *testing.T) {
        dir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(dir)
-       cl := &Client{}
+       cl := &Client{
+               config: &ClientConfig{},
+       }
        cl.initLogger()
        tor := cl.newTorrent(
                mi.HashInfoBytes(),
@@ -137,8 +138,7 @@ func TestUnmarshalPEXMsg(t *testing.T) {
 }
 
 func TestReducedDialTimeout(t *testing.T) {
-       cfg := &Config{}
-       cfg.setDefaults()
+       cfg := NewDefaultClientConfig()
        for _, _case := range []struct {
                Max             time.Duration
                HalfOpenLimit   int
@@ -163,64 +163,6 @@ func TestReducedDialTimeout(t *testing.T) {
        }
 }
 
-func TestUTPRawConn(t *testing.T) {
-       l, err := NewUtpSocket("udp", "")
-       require.NoError(t, err)
-       defer l.Close()
-       go func() {
-               for {
-                       _, err := l.Accept()
-                       if err != nil {
-                               break
-                       }
-               }
-       }()
-       // Connect a UTP peer to see if the RawConn will still work.
-       s, err := NewUtpSocket("udp", "")
-       require.NoError(t, err)
-       defer s.Close()
-       utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
-       require.NoError(t, err)
-       defer utpPeer.Close()
-       peer, err := net.ListenPacket("udp", ":0")
-       require.NoError(t, err)
-       defer peer.Close()
-
-       msgsReceived := 0
-       // How many messages to send. I've set this to double the channel buffer
-       // size in the raw packetConn.
-       const N = 200
-       readerStopped := make(chan struct{})
-       // The reader goroutine.
-       go func() {
-               defer close(readerStopped)
-               b := make([]byte, 500)
-               for i := 0; i < N; i++ {
-                       n, _, err := l.ReadFrom(b)
-                       require.NoError(t, err)
-                       msgsReceived++
-                       var d int
-                       fmt.Sscan(string(b[:n]), &d)
-                       assert.Equal(t, i, d)
-               }
-       }()
-       udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
-       require.NoError(t, err)
-       for i := 0; i < N; i++ {
-               _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
-               require.NoError(t, err)
-               time.Sleep(time.Millisecond)
-       }
-       select {
-       case <-readerStopped:
-       case <-time.After(time.Second):
-               t.Fatal("reader timed out")
-       }
-       if msgsReceived != N {
-               t.Fatalf("messages received: %d", msgsReceived)
-       }
-}
-
 func TestAddDropManyTorrents(t *testing.T) {
        cl, err := NewClient(TestingConfig())
        require.NoError(t, err)
@@ -356,7 +298,9 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        // Create seeder and a Torrent.
        cfg := TestingConfig()
        cfg.Seed = true
-       cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+       if ps.SeederUploadRateLimiter != nil {
+               cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+       }
        // cfg.ListenAddr = "localhost:4000"
        if ps.SeederStorage != nil {
                cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
@@ -379,12 +323,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        leecherDataDir, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(leecherDataDir)
+       cfg = TestingConfig()
        if ps.LeecherStorage == nil {
                cfg.DataDir = leecherDataDir
        } else {
                cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
        }
-       cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+       if ps.LeecherDownloadRateLimiter != nil {
+               cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+       }
        cfg.Seed = false
        leecher, err := NewClient(cfg)
        require.NoError(t, err)
@@ -414,10 +361,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
                r.SetReadahead(ps.Readahead)
        }
        assertReadAllGreeting(t, r)
-       assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
-       assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
-       assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
-       assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
+
+       seederStats := seederTorrent.Stats()
+       assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+       assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+       leecherStats := leecherTorrent.Stats()
+       assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+       assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
        // Try reading through again for the cases where the torrent data size
        // exceeds the size of the cache.
        assertReadAllGreeting(t, r)
@@ -437,6 +389,7 @@ func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
 func TestSeedAfterDownloading(t *testing.T) {
        greetingTempDir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(greetingTempDir)
+
        cfg := TestingConfig()
        cfg.Seed = true
        cfg.DataDir = greetingTempDir
@@ -448,6 +401,9 @@ func TestSeedAfterDownloading(t *testing.T) {
        require.NoError(t, err)
        assert.True(t, ok)
        seederTorrent.VerifyData()
+
+       cfg = TestingConfig()
+       cfg.Seed = true
        cfg.DataDir, err = ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(cfg.DataDir)
@@ -455,6 +411,8 @@ func TestSeedAfterDownloading(t *testing.T) {
        require.NoError(t, err)
        defer leecher.Close()
        testutil.ExportStatusWriter(leecher, "l")
+
+       cfg = TestingConfig()
        cfg.Seed = false
        cfg.DataDir, err = ioutil.TempDir("", "")
        require.NoError(t, err)
@@ -492,17 +450,8 @@ func TestSeedAfterDownloading(t *testing.T) {
        }()
        done := make(chan struct{})
        defer close(done)
-       go func() {
-               for {
-                       go leecherGreeting.AddClientPeer(seeder)
-                       go leecherGreeting.AddClientPeer(leecherLeecher)
-                       select {
-                       case <-done:
-                               return
-                       case <-time.After(time.Second):
-                       }
-               }
-       }()
+       go leecherGreeting.AddClientPeer(seeder)
+       go leecherGreeting.AddClientPeer(leecherLeecher)
        wg.Add(1)
        go func() {
                defer wg.Done()
@@ -806,7 +755,6 @@ func TestAddMetainfoWithNodes(t *testing.T) {
 }
 
 type testDownloadCancelParams struct {
-       ExportClientStatus        bool
        SetLeecherStorageCapacity bool
        LeecherStorageCapacity    int64
        Cancel                    bool
@@ -821,9 +769,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
        seeder, err := NewClient(cfg)
        require.NoError(t, err)
        defer seeder.Close()
-       if ps.ExportClientStatus {
-               testutil.ExportStatusWriter(seeder, "s")
-       }
+       testutil.ExportStatusWriter(seeder, "s")
        seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
        seederTorrent.VerifyData()
        leecherDataDir, err := ioutil.TempDir("", "")
@@ -838,9 +784,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
        cfg.DataDir = leecherDataDir
        leecher, _ := NewClient(cfg)
        defer leecher.Close()
-       if ps.ExportClientStatus {
-               testutil.ExportStatusWriter(leecher, "l")
-       }
+       testutil.ExportStatusWriter(leecher, "l")
        leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
                ret = TorrentSpecFromMetaInfo(mi)
                ret.ChunkSize = 2
@@ -857,27 +801,24 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
                leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
        }
        leecherGreeting.cl.mu.Unlock()
-
-       leecherGreeting.AddClientPeer(seeder)
+       done := make(chan struct{})
+       defer close(done)
+       go leecherGreeting.AddClientPeer(seeder)
        completes := make(map[int]bool, 3)
-values:
-       for {
-               // started := time.Now()
+       expected := func() map[int]bool {
+               if ps.Cancel {
+                       return map[int]bool{0: false, 1: false, 2: false}
+               } else {
+                       return map[int]bool{0: true, 1: true, 2: true}
+               }
+       }()
+       for !reflect.DeepEqual(completes, expected) {
                select {
                case _v := <-psc.Values:
-                       // log.Print(time.Since(started))
                        v := _v.(PieceStateChange)
                        completes[v.Index] = v.Complete
-               case <-time.After(100 * time.Millisecond):
-                       break values
                }
        }
-       if ps.Cancel {
-               assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
-       } else {
-               assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
-       }
-
 }
 
 func TestTorrentDownloadAll(t *testing.T) {
@@ -993,8 +934,11 @@ func TestSetMaxEstablishedConn(t *testing.T) {
        defer ss.Close()
        var tts []*Torrent
        ih := testutil.GreetingMetaInfo().HashInfoBytes()
+       cfg := TestingConfig()
+       cfg.DisableAcceptRateLimiting = true
+       cfg.dropDuplicatePeerIds = true
        for i := range iter.N(3) {
-               cl, err := NewClient(TestingConfig())
+               cl, err := NewClient(cfg)
                require.NoError(t, err)
                defer cl.Close()
                tt, _ := cl.AddTorrentInfoHash(ih)
index 7320315aa4f3cf01505cfba9e9ca93bec7c5a560..52c6c1069ec323002d974dd07d21eabe797c698a 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bufio"
        "encoding/hex"
        "encoding/json"
        "fmt"
@@ -8,6 +9,7 @@ import (
        "log"
        "os"
 
+       "github.com/anacrolix/envpprof"
        "github.com/anacrolix/tagflag"
        "github.com/bradfitz/iter"
 
@@ -62,8 +64,9 @@ func processReader(r io.Reader) error {
 }
 
 func main() {
+       defer envpprof.Stop()
        tagflag.Parse(&flags)
-       err := processReader(os.Stdin)
+       err := processReader(bufio.NewReader(os.Stdin))
        if err != nil {
                log.Fatal(err)
        }
index 0fb6971d7de0738551054f97665835a45fc58f22..eada56876454a24bbe8db65da74623037eaa933c 100644 (file)
@@ -79,10 +79,12 @@ func dstFileName(picked string) string {
 
 func main() {
        log.SetFlags(log.LstdFlags | log.Lshortfile)
-       var rootGroup struct {
-               Client    torrent.Config `group:"Client Options"`
-               TestPeers []string       `long:"test-peer" description:"address of peer to inject to every torrent"`
-               Pick      string         `long:"pick" description:"filename to pick"`
+       var rootGroup = struct {
+               Client    *torrent.ClientConfig `group:"Client Options"`
+               TestPeers []string              `long:"test-peer" description:"address of peer to inject to every torrent"`
+               Pick      string                `long:"pick" description:"filename to pick"`
+       }{
+               Client: torrent.NewDefaultClientConfig(),
        }
        // Don't pass flags.PrintError because it's inconsistent with printing.
        // https://github.com/jessevdk/go-flags/issues/132
@@ -115,7 +117,7 @@ func main() {
 
        rootGroup.Client.DataDir = tmpdir
 
-       client, err := torrent.NewClient(&rootGroup.Client)
+       client, err := torrent.NewClient(rootGroup.Client)
        if err != nil {
                log.Fatalf("error creating client: %s", err)
        }
index 1d9b4944d568b0e642767fa346e29e9a939fb44c..e97ce9d732225773dfae2f45365201c58ce9bc28 100644 (file)
@@ -155,10 +155,9 @@ func main() {
        log.SetFlags(log.LstdFlags | log.Lshortfile)
        tagflag.Parse(&flags)
        defer envpprof.Stop()
-       clientConfig := torrent.Config{
-               Debug: flags.Debug,
-               Seed:  flags.Seed,
-       }
+       clientConfig := torrent.NewDefaultClientConfig()
+       clientConfig.Debug = flags.Debug
+       clientConfig.Seed = flags.Seed
        if flags.PackedBlocklist != "" {
                blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
                if err != nil {
@@ -180,7 +179,7 @@ func main() {
                clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20)
        }
 
-       client, err := torrent.NewClient(&clientConfig)
+       client, err := torrent.NewClient(clientConfig)
        if err != nil {
                log.Fatalf("error creating client: %s", err)
        }
index 3cc778124e11bf7575ede99c00071adf2f6ece3e..b2d34efe53b3ed45a365655d8a02744fb5fe1b29 100644 (file)
@@ -86,13 +86,12 @@ func mainExitCode() int {
        defer fuse.Unmount(args.MountDir)
        // TODO: Think about the ramifications of exiting not due to a signal.
        defer conn.Close()
-       cfg := torrent.Config{
-               DataDir:         args.DownloadDir,
-               DisableTrackers: args.DisableTrackers,
-               NoUpload:        true, // Ensure that downloads are responsive.
-       }
+       cfg := torrent.NewDefaultClientConfig()
+       cfg.DataDir = args.DownloadDir
+       cfg.DisableTrackers = args.DisableTrackers
+       cfg.NoUpload = true // Ensure that downloads are responsive.
        cfg.SetListenAddr(args.ListenAddr.String())
-       client, err := torrent.NewClient(&cfg)
+       client, err := torrent.NewClient(cfg)
        if err != nil {
                log.Print(err)
                return 1
index f45a1e028912bab75b65dec6d4c755e195e048bc..899a2f062f5deb55df1ccc95c76f9bec894bf9db 100644 (file)
--- a/config.go
+++ b/config.go
@@ -16,20 +16,10 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-var DefaultHTTPClient = &http.Client{
-       Timeout: time.Second * 15,
-       Transport: &http.Transport{
-               Dial: (&net.Dialer{
-                       Timeout: 15 * time.Second,
-               }).Dial,
-               TLSHandshakeTimeout: 15 * time.Second,
-               TLSClientConfig:     &tls.Config{InsecureSkipVerify: true},
-       },
-}
 var DefaultHTTPUserAgent = "Go-Torrent/1.0"
 
-// Override Client defaults.
-type Config struct {
+// Probably not safe to modify this after it's given to a Client.
+type ClientConfig struct {
        // Store torrent file data in this directory unless .DefaultStorage is
        // specified.
        DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"`
@@ -118,9 +108,12 @@ type Config struct {
 
        PublicIp4 net.IP
        PublicIp6 net.IP
+
+       DisableAcceptRateLimiting bool
+       dropDuplicatePeerIds      bool
 }
 
-func (cfg *Config) SetListenAddr(addr string) *Config {
+func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
        host, port, err := missinggo.ParseHostPort(addr)
        expect.Nil(err)
        cfg.ListenHost = func(string) string { return host }
@@ -128,53 +121,35 @@ func (cfg *Config) SetListenAddr(addr string) *Config {
        return cfg
 }
 
-func (cfg *Config) setDefaults() {
-       if cfg.HTTP == nil {
-               cfg.HTTP = DefaultHTTPClient
-               if cfg.ProxyURL != "" {
-                       cfg.setProxyURL()
-               }
-       }
-       if cfg.HTTPUserAgent == "" {
-               cfg.HTTPUserAgent = DefaultHTTPUserAgent
-       }
-       if cfg.ExtendedHandshakeClientVersion == "" {
-               cfg.ExtendedHandshakeClientVersion = "go.torrent dev 20150624"
-       }
-       if cfg.Bep20 == "" {
-               cfg.Bep20 = "-GT0001-"
-       }
-       if cfg.NominalDialTimeout == 0 {
-               cfg.NominalDialTimeout = 30 * time.Second
-       }
-       if cfg.MinDialTimeout == 0 {
-               cfg.MinDialTimeout = 5 * time.Second
-       }
-       if cfg.EstablishedConnsPerTorrent == 0 {
-               cfg.EstablishedConnsPerTorrent = 50
-       }
-       if cfg.HalfOpenConnsPerTorrent == 0 {
-               cfg.HalfOpenConnsPerTorrent = (cfg.EstablishedConnsPerTorrent + 1) / 2
-       }
-       if cfg.TorrentPeersHighWater == 0 {
-               // Memory and freshness are the concern here.
-               cfg.TorrentPeersHighWater = 500
-       }
-       if cfg.TorrentPeersLowWater == 0 {
-               cfg.TorrentPeersLowWater = 2 * cfg.HalfOpenConnsPerTorrent
-       }
-       if cfg.HandshakesTimeout == 0 {
-               cfg.HandshakesTimeout = 20 * time.Second
-       }
-       if cfg.DhtStartingNodes == nil {
-               cfg.DhtStartingNodes = dht.GlobalBootstrapAddrs
-       }
-       if cfg.ListenHost == nil {
-               cfg.ListenHost = func(string) string { return "" }
+func NewDefaultClientConfig() *ClientConfig {
+       return &ClientConfig{
+               HTTP: &http.Client{
+                       Timeout: time.Second * 15,
+                       Transport: &http.Transport{
+                               Dial: (&net.Dialer{
+                                       Timeout: 15 * time.Second,
+                               }).Dial,
+                               TLSHandshakeTimeout: 15 * time.Second,
+                               TLSClientConfig:     &tls.Config{InsecureSkipVerify: true},
+                       }},
+               HTTPUserAgent:                  DefaultHTTPUserAgent,
+               ExtendedHandshakeClientVersion: "go.torrent dev 20150624",
+               Bep20:                      "-GT0001-",
+               NominalDialTimeout:         30 * time.Second,
+               MinDialTimeout:             5 * time.Second,
+               EstablishedConnsPerTorrent: 50,
+               HalfOpenConnsPerTorrent:    25,
+               TorrentPeersHighWater:      500,
+               TorrentPeersLowWater:       50,
+               HandshakesTimeout:          20 * time.Second,
+               DhtStartingNodes:           dht.GlobalBootstrapAddrs,
+               ListenHost:                 func(string) string { return "" },
+               UploadRateLimiter:          unlimited,
+               DownloadRateLimiter:        unlimited,
        }
 }
 
-func (cfg *Config) setProxyURL() {
+func (cfg *ClientConfig) setProxyURL() {
        fixedURL, err := url.Parse(cfg.ProxyURL)
        if err != nil {
                return
index 144ab0cb682e0c960df4f6f40a506dc49c3540bc..67188acb0e1fd10b6c441d0b80b3be8cd411278f 100644 (file)
@@ -1,8 +1,10 @@
 package torrent
 
 import (
+       "fmt"
        "io"
-       "sync"
+       "reflect"
+       "sync/atomic"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
@@ -14,70 +16,98 @@ import (
 // is things sent to the peer, and Read is stuff received from them.
 type ConnStats struct {
        // Total bytes on the wire. Includes handshakes and encryption.
-       BytesWritten     int64
-       BytesWrittenData int64
+       BytesWritten     Count
+       BytesWrittenData Count
 
-       BytesRead           int64
-       BytesReadData       int64
-       BytesReadUsefulData int64
+       BytesRead           Count
+       BytesReadData       Count
+       BytesReadUsefulData Count
 
-       ChunksWritten int64
+       ChunksWritten Count
 
-       ChunksRead         int64
-       ChunksReadUseful   int64
-       ChunksReadUnwanted int64
+       ChunksRead         Count
+       ChunksReadUseful   Count
+       ChunksReadUnwanted Count
 
        // Number of pieces data was written to, that subsequently passed verification.
-       PiecesDirtiedGood int64
+       PiecesDirtiedGood Count
        // Number of pieces data was written to, that subsequently failed
        // verification. Note that a connection may not have been the sole dirtier
        // of a piece.
-       PiecesDirtiedBad int64
+       PiecesDirtiedBad Count
+}
+
+func (me *ConnStats) Copy() (ret ConnStats) {
+       for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
+               n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
+               reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+       }
+       return
+}
+
+type Count struct {
+       n int64
+}
+
+var _ fmt.Stringer = (*Count)(nil)
+
+func (me *Count) Add(n int64) {
+       atomic.AddInt64(&me.n, n)
+}
+
+func (me *Count) Int64() int64 {
+       return atomic.LoadInt64(&me.n)
+}
+
+func (me *Count) String() string {
+       return fmt.Sprintf("%v", me.Int64())
 }
 
 func (cs *ConnStats) wroteMsg(msg *pp.Message) {
        // TODO: Track messages and not just chunks.
        switch msg.Type {
        case pp.Piece:
-               cs.ChunksWritten++
-               cs.BytesWrittenData += int64(len(msg.Piece))
+               cs.ChunksWritten.Add(1)
+               cs.BytesWrittenData.Add(int64(len(msg.Piece)))
        }
 }
 
 func (cs *ConnStats) readMsg(msg *pp.Message) {
        switch msg.Type {
        case pp.Piece:
-               cs.ChunksRead++
-               cs.BytesReadData += int64(len(msg.Piece))
+               cs.ChunksRead.Add(1)
+               cs.BytesReadData.Add(int64(len(msg.Piece)))
        }
 }
 
-func (cs *ConnStats) wroteBytes(n int64) {
-       cs.BytesWritten += n
+func (cs *ConnStats) incrementPiecesDirtiedGood() {
+       cs.PiecesDirtiedGood.Add(1)
+}
+
+func (cs *ConnStats) incrementPiecesDirtiedBad() {
+       cs.PiecesDirtiedBad.Add(1)
 }
 
-func (cs *ConnStats) readBytes(n int64) {
-       cs.BytesRead += n
+func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
+       return func(cs *ConnStats) {
+               p := f(cs)
+               p.Add(n)
+       }
 }
 
 type connStatsReadWriter struct {
        rw io.ReadWriter
-       l  sync.Locker
        c  *connection
 }
 
 func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
        n, err = me.rw.Write(b)
-       me.l.Lock()
        me.c.wroteBytes(int64(n))
-       me.l.Unlock()
        return
 }
 
 func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
        n, err = me.rw.Read(b)
-       me.l.Lock()
        me.c.readBytes(int64(n))
-       me.l.Unlock()
        return
 }
index e07d2f6a20682daf085a54ce993cc4803ad513ac..f58ab28835474b9a26a6aea7d086d51ed98872f4 100644 (file)
@@ -40,7 +40,8 @@ const (
 type connection struct {
        t *Torrent
        // The actual Conn, used for closing, and setting socket options.
-       conn net.Conn
+       conn     net.Conn
+       outgoing bool
        // The Reader and Writer for this Conn, with hooks installed for stats,
        // limiting, deadlines etc.
        w io.Writer
@@ -50,6 +51,9 @@ type connection struct {
        cryptoMethod    mse.CryptoMethod
        Discovery       peerSource
        closed          missinggo.Event
+       // Set true after we've added our ConnStats generated during handshake to
+       // other ConnStat instances as determined when the *Torrent became known.
+       reconciledHandshakeStats bool
 
        stats ConnStats
 
@@ -102,6 +106,32 @@ type connection struct {
        writerCond  sync.Cond
 }
 
+// Returns true if the connection is over IPv6.
+func (cn *connection) ipv6() bool {
+       ip := missinggo.AddrIP(cn.remoteAddr())
+       if ip.To4() != nil {
+               return false
+       }
+       return len(ip) == net.IPv6len
+}
+
+// Returns true the dialer has the lower client peer ID. TODO: Find the
+// specification for this.
+func (cn *connection) isPreferredDirection() bool {
+       return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
+}
+
+// Returns whether the left connection should be preferred over the right one,
+// considering only their networking properties. If ok is false, we can't
+// decide.
+func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
+       var ml multiLess
+       ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
+       ml.NextBool(!l.utp(), !r.utp())
+       ml.NextBool(l.ipv6(), r.ipv6())
+       return ml.FinalOk()
+}
+
 func (cn *connection) cumInterest() time.Duration {
        ret := cn.priorInterest
        if cn.Interested {
@@ -189,7 +219,7 @@ func (cn *connection) utp() bool {
        return strings.Contains(cn.remoteAddr().Network(), "utp")
 }
 
-// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
+// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
 func (cn *connection) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
@@ -219,7 +249,7 @@ func (cn *connection) statusFlags() (ret string) {
 // }
 
 func (cn *connection) downloadRate() float64 {
-       return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
+       return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
 }
 
 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
@@ -232,12 +262,12 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                cn.cumInterest(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
+               "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
-               cn.stats.ChunksReadUseful,
-               cn.stats.ChunksRead,
-               cn.stats.ChunksWritten,
+               &cn.stats.ChunksReadUseful,
+               &cn.stats.ChunksRead,
+               &cn.stats.ChunksWritten,
                cn.requestsLowWater,
                cn.numLocalRequests(),
                cn.nominalMaxRequests(),
@@ -320,7 +350,7 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *connection) nominalMaxRequests() (ret int) {
-       return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
+       return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
 }
 
 func (cn *connection) onPeerSentCancel(r request) {
@@ -839,27 +869,37 @@ func (c *connection) requestPendingMetadata() {
 
 func (cn *connection) wroteMsg(msg *pp.Message) {
        messageTypesSent.Add(msg.Type.String(), 1)
-       cn.stats.wroteMsg(msg)
-       cn.t.stats.wroteMsg(msg)
+       cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
 }
 
 func (cn *connection) readMsg(msg *pp.Message) {
-       cn.stats.readMsg(msg)
-       cn.t.stats.readMsg(msg)
+       cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
 }
 
-func (cn *connection) wroteBytes(n int64) {
-       cn.stats.wroteBytes(n)
-       if cn.t != nil {
-               cn.t.stats.wroteBytes(n)
+// After handshake, we know what Torrent and Client stats to include for a
+// connection.
+func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
+       t := cn.t
+       f(&t.stats)
+       f(&t.cl.stats)
+}
+
+// All ConnStats that include this connection. Some objects are not known
+// until the handshake is complete, after which it's expected to reconcile the
+// differences.
+func (cn *connection) allStats(f func(*ConnStats)) {
+       f(&cn.stats)
+       if cn.reconciledHandshakeStats {
+               cn.postHandshakeStats(f)
        }
 }
 
+func (cn *connection) wroteBytes(n int64) {
+       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
+}
+
 func (cn *connection) readBytes(n int64) {
-       cn.stats.readBytes(n)
-       if cn.t != nil {
-               cn.t.stats.readBytes(n)
-       }
+       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
 }
 
 // Returns whether the connection could be useful to us. We're seeding and
@@ -1164,7 +1204,7 @@ func (cn *connection) rw() io.ReadWriter {
 func (c *connection) receiveChunk(msg *pp.Message) {
        t := c.t
        cl := t.cl
-       chunksReceived.Add(1)
+       torrent.Add("chunks received", 1)
 
        req := newRequestFromMessage(msg)
 
@@ -1172,7 +1212,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        if c.deleteRequest(req) {
                c.updateRequests()
        } else {
-               unexpectedChunksReceived.Add(1)
+               torrent.Add("chunks received unexpected", 1)
        }
 
        if c.PeerChoked {
@@ -1184,19 +1224,16 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
-               unwantedChunksReceived.Add(1)
-               c.stats.ChunksReadUnwanted++
-               c.t.stats.ChunksReadUnwanted++
+               torrent.Add("chunks received unwanted", 1)
+               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
                return
        }
 
        index := int(req.Index)
        piece := &t.pieces[index]
 
-       c.stats.ChunksReadUseful++
-       c.t.stats.ChunksReadUseful++
-       c.stats.BytesReadUsefulData += int64(len(msg.Piece))
-       c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
+       c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+       c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        c.lastUsefulChunkReceived = time.Now()
        // if t.fastestConn != c {
        // log.Printf("setting fastest connection %p", c)
@@ -1272,7 +1309,7 @@ func (c *connection) uploadAllowed() bool {
                return false
        }
        // Don't upload more than 100 KiB more than we download.
-       if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
+       if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
                return false
        }
        return true
@@ -1297,7 +1334,7 @@ another:
                        return false
                }
                for r := range c.PeerRequests {
-                       res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+                       res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
                        if !res.OK() {
                                panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
                        }
@@ -1342,7 +1379,7 @@ func (cn *connection) Drop() {
 }
 
 func (cn *connection) netGoodPiecesDirtied() int64 {
-       return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
+       return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
 }
 
 func (c *connection) peerHasWantedPieces() bool {
@@ -1358,7 +1395,15 @@ func (c *connection) deleteRequest(r request) bool {
                return false
        }
        delete(c.requests, r)
-       c.t.pendingRequests[r]--
+       pr := c.t.pendingRequests
+       pr[r]--
+       n := pr[r]
+       if n == 0 {
+               delete(pr, r)
+       }
+       if n < 0 {
+               panic(n)
+       }
        c.updateRequests()
        return true
 }
@@ -1415,5 +1460,5 @@ func (c *connection) setTorrent(t *Torrent) {
                panic("connection already associated with a torrent")
        }
        c.t = t
-       t.conns[c] = struct{}{}
+       t.reconcileHandshakeStats(c)
 }
index 6b4ccd6b349df5e968d00821f3a7dd190ca58a2d..58a92a61a4da9c2c20388dbf9e328f6b9c4e03e6 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        "io"
+       "net"
        "sync"
        "testing"
        "time"
@@ -20,9 +21,11 @@ import (
 // Have that would potentially alter it.
 func TestSendBitfieldThenHave(t *testing.T) {
        r, w := io.Pipe()
-       var cl Client
+       cl := Client{
+               config: &ClientConfig{DownloadRateLimiter: unlimited},
+       }
        cl.initLogger()
-       c := cl.newConnection(nil)
+       c := cl.newConnection(nil, false)
        c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
        c.t.setInfo(&metainfo.Info{
                Pieces: make([]byte, metainfo.HashSize*3),
@@ -85,7 +88,11 @@ func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
 }
 
 func BenchmarkConnectionMainReadLoop(b *testing.B) {
-       cl := &Client{}
+       cl := &Client{
+               config: &ClientConfig{
+                       DownloadRateLimiter: unlimited,
+               },
+       }
        ts := &torrentStorage{}
        t := &Torrent{
                cl:                cl,
@@ -99,11 +106,9 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        }))
        t.setChunkSize(defaultChunkSize)
        t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
-       r, w := io.Pipe()
-       cn := &connection{
-               t: t,
-               r: r,
-       }
+       r, w := net.Pipe()
+       cn := cl.newConnection(r, true)
+       cn.setTorrent(t)
        mrlErr := make(chan error)
        cl.mu.Lock()
        go func() {
@@ -132,7 +137,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        }
        w.Close()
        require.NoError(b, <-mrlErr)
-       require.EqualValues(b, b.N, cn.stats.ChunksReadUseful)
+       require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
 }
 
 func TestConnectionReceiveBadChunkIndex(t *testing.T) {
index c8cacc5d37b105d378a6907cb7145cc272e92053..8ac69bb70731e7285a6fa7a85c7f995e8ca9e106 100644 (file)
@@ -88,13 +88,13 @@ func TestUnmountWedged(t *testing.T) {
                        t.Log(err)
                }
        }()
-       client, err := torrent.NewClient(&torrent.Config{
-               DataDir:         filepath.Join(layout.BaseDir, "incomplete"),
-               DisableTrackers: true,
-               NoDHT:           true,
-               DisableTCP:      true,
-               DisableUTP:      true,
-       })
+       cfg := torrent.NewDefaultClientConfig()
+       cfg.DataDir = filepath.Join(layout.BaseDir, "incomplete")
+       cfg.DisableTrackers = true
+       cfg.NoDHT = true
+       cfg.DisableTCP = true
+       cfg.DisableUTP = true
+       client, err := torrent.NewClient(cfg)
        require.NoError(t, err)
        defer client.Close()
        tt, err := client.AddTorrent(layout.Metainfo)
@@ -165,14 +165,13 @@ func TestDownloadOnDemand(t *testing.T) {
        layout, err := newGreetingLayout()
        require.NoError(t, err)
        defer layout.Destroy()
-       cfg := torrent.Config{
-               DataDir:         layout.Completed,
-               DisableTrackers: true,
-               NoDHT:           true,
-               Seed:            true,
-               ListenHost:      torrent.LoopbackListenHost,
-       }
-       seeder, err := torrent.NewClient(&cfg)
+       cfg := torrent.NewDefaultClientConfig()
+       cfg.DataDir = layout.Completed
+       cfg.DisableTrackers = true
+       cfg.NoDHT = true
+       cfg.Seed = true
+       cfg.ListenHost = torrent.LoopbackListenHost
+       seeder, err := torrent.NewClient(cfg)
        require.NoError(t, err)
        defer seeder.Close()
        testutil.ExportStatusWriter(seeder, "s")
@@ -185,13 +184,13 @@ func TestDownloadOnDemand(t *testing.T) {
                <-seederTorrent.GotInfo()
                seederTorrent.VerifyData()
        }()
-       leecher, err := torrent.NewClient(&torrent.Config{
-               DisableTrackers: true,
-               NoDHT:           true,
-               DisableTCP:      true,
-               DefaultStorage:  storage.NewMMap(filepath.Join(layout.BaseDir, "download")),
-               ListenHost:      torrent.LoopbackListenHost,
-       })
+       cfg = torrent.NewDefaultClientConfig()
+       cfg.DisableTrackers = true
+       cfg.NoDHT = true
+       cfg.DisableTCP = true
+       cfg.DefaultStorage = storage.NewMMap(filepath.Join(layout.BaseDir, "download"))
+       cfg.ListenHost = torrent.LoopbackListenHost
+       leecher, err := torrent.NewClient(cfg)
        require.NoError(t, err)
        testutil.ExportStatusWriter(leecher, "l")
        defer leecher.Close()
index cdd0351386673f7c6790c7df038c5760ebcd8485..43b84ee192c2b00678bbdfedec2b9e5b05acada4 100644 (file)
--- a/global.go
+++ b/global.go
@@ -23,10 +23,6 @@ func defaultPeerExtensionBytes() peerExtensionBytes {
 // I could move a lot of these counters to their own file, but I suspect they
 // may be attached to a Client someday.
 var (
-       unwantedChunksReceived   = expvar.NewInt("chunksReceivedUnwanted")
-       unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
-       chunksReceived           = expvar.NewInt("chunksReceived")
-
        torrent = expvar.NewMap("torrent")
 
        peersAddedBySource = expvar.NewMap("peersAddedBySource")
@@ -34,15 +30,10 @@ var (
        pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
        pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
 
-       unsuccessfulDials = expvar.NewInt("dialSuccessful")
-       successfulDials   = expvar.NewInt("dialUnsuccessful")
-
        peerExtensions                    = expvar.NewMap("peerExtensions")
        completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
        // Count of connections to peer with same client ID.
-       connsToSelf = expvar.NewInt("connsToSelf")
-       // Number of completed connections to a client we're already connected with.
-       duplicateClientConns       = expvar.NewInt("duplicateClientConns")
+       connsToSelf                = expvar.NewInt("connsToSelf")
        receivedKeepalives         = expvar.NewInt("receivedKeepalives")
        supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
        postedKeepalives           = expvar.NewInt("postedKeepalives")
diff --git a/misc.go b/misc.go
index 6fdb77c0d34d5c3d42ac0d2571ff3fcd2d5453a4..10df12447466360a6715cffd0c775bdab0970122 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -7,6 +7,7 @@ import (
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       "golang.org/x/time/rate"
 )
 
 type chunkSpec struct {
@@ -165,3 +166,5 @@ func min(as ...int64) int64 {
        }
        return ret
 }
+
+var unlimited = rate.NewLimiter(rate.Inf, 0)
index 32e60fa956319a9b1b414a9f411e8ff63e0758d0..b7272151c606de170748d35d4b718ed0ac10e82f 100644 (file)
@@ -1,10 +1,13 @@
 package torrent
 
 import (
+       "reflect"
+       "strings"
        "testing"
 
        "github.com/anacrolix/missinggo/bitmap"
        "github.com/anacrolix/missinggo/iter"
+       "github.com/davecgh/go-spew/spew"
        "github.com/stretchr/testify/assert"
 )
 
@@ -29,3 +32,10 @@ func TestIterBitmapsDistinct(t *testing.T) {
        assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)))
        assert.Equal(t, []int{1}, skip.ToSortedSlice())
 }
+
+func TestSpewConnStats(t *testing.T) {
+       s := spew.Sdump(ConnStats{})
+       t.Logf("\n%s", s)
+       lines := strings.Count(s, "\n")
+       assert.EqualValues(t, 2+reflect.ValueOf(ConnStats{}).NumField(), lines)
+}
index 83454b82e510cb22a03245330393eb94f2c9ed09..b10090c2bb5be2692e7a436b18af2dc54d5830ce 100644 (file)
@@ -18,6 +18,8 @@ import (
        "strconv"
        "sync"
 
+       "github.com/anacrolix/missinggo/perf"
+
        "github.com/bradfitz/iter"
 )
 
@@ -537,6 +539,7 @@ func InitiateHandshake(rw io.ReadWriter, skey []byte, initialPayload []byte, cry
                ia:             initialPayload,
                cryptoProvides: cryptoProvides,
        }
+       defer perf.ScopeTimerErr(&err)()
        return h.Do()
 }
 
diff --git a/multiless.go b/multiless.go
new file mode 100644 (file)
index 0000000..4a48393
--- /dev/null
@@ -0,0 +1,42 @@
+package torrent
+
+func strictCmp(same, less bool) cmper {
+       return func() (bool, bool) { return same, less }
+}
+
+type (
+       cmper     func() (same, less bool)
+       multiLess struct {
+               ok   bool
+               less bool
+       }
+)
+
+func (me *multiLess) Final() bool {
+       if !me.ok {
+               panic("undetermined")
+       }
+       return me.less
+}
+
+func (me *multiLess) FinalOk() (left, ok bool) {
+       return me.less, me.ok
+}
+
+func (me *multiLess) Next(f cmper) {
+       me.StrictNext(f())
+}
+
+func (me *multiLess) StrictNext(same, less bool) {
+       if me.ok {
+               return
+       }
+       if same {
+               return
+       }
+       me.ok, me.less = true, less
+}
+
+func (me *multiLess) NextBool(l, r bool) {
+       me.StrictNext(l == r, l)
+}
diff --git a/network_test.go b/network_test.go
new file mode 100644 (file)
index 0000000..b7dd169
--- /dev/null
@@ -0,0 +1,74 @@
+package torrent
+
+import (
+       "net"
+       "testing"
+
+       "github.com/anacrolix/missinggo"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func testListenerNetwork(
+       t *testing.T,
+       listenFunc func(net, addr string) (net.Listener, error),
+       expectedNet, givenNet, addr string, validIp4 bool,
+) {
+       l, err := listenFunc(givenNet, addr)
+       require.NoError(t, err)
+       defer l.Close()
+       assert.EqualValues(t, expectedNet, l.Addr().Network())
+       ip := missinggo.AddrIP(l.Addr())
+       assert.Equal(t, validIp4, ip.To4() != nil, ip)
+}
+
+func listenUtpListener(net, addr string) (l net.Listener, err error) {
+       l, err = NewUtpSocket(net, addr)
+       return
+}
+
+func testAcceptedConnAddr(
+       t *testing.T,
+       network string, valid4 bool,
+       dial func(addr string) (net.Conn, error),
+       listen func() (net.Listener, error),
+) {
+       l, err := listen()
+       require.NoError(t, err)
+       defer l.Close()
+       done := make(chan struct{})
+       defer close(done)
+       go func() {
+               c, err := dial(l.Addr().String())
+               require.NoError(t, err)
+               <-done
+               c.Close()
+       }()
+       c, err := l.Accept()
+       require.NoError(t, err)
+       defer c.Close()
+       assert.EqualValues(t, network, c.RemoteAddr().Network())
+       assert.Equal(t, valid4, missinggo.AddrIP(c.RemoteAddr()).To4() != nil)
+}
+
+func listenClosure(rawListenFunc func(string, string) (net.Listener, error), network, addr string) func() (net.Listener, error) {
+       return func() (net.Listener, error) {
+               return rawListenFunc(network, addr)
+       }
+}
+
+func dialClosure(f func(net, addr string) (net.Conn, error), network string) func(addr string) (net.Conn, error) {
+       return func(addr string) (net.Conn, error) {
+               return f(network, addr)
+       }
+}
+
+func TestListenLocalhostNetwork(t *testing.T) {
+       testListenerNetwork(t, net.Listen, "tcp", "tcp", "0.0.0.0:0", false)
+       testListenerNetwork(t, net.Listen, "tcp", "tcp", "[::1]:0", false)
+       testListenerNetwork(t, listenUtpListener, "udp", "udp6", "[::1]:0", false)
+       testListenerNetwork(t, listenUtpListener, "udp", "udp6", "[::]:0", false)
+       testListenerNetwork(t, listenUtpListener, "udp", "udp4", "localhost:0", true)
+
+       testAcceptedConnAddr(t, "tcp", false, dialClosure(net.Dial, "tcp"), listenClosure(net.Listen, "tcp6", ":0"))
+}
index e7d54fb0e5cc5532939031f2df286f3d882e63b3..716b0c622b9f2dd62992c34d7b11f98d81c9e955 100644 (file)
--- a/socket.go
+++ b/socket.go
@@ -11,6 +11,7 @@ import (
        "golang.org/x/net/proxy"
 
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/perf"
 )
 
 type dialer interface {
@@ -54,29 +55,39 @@ func listenTcp(network, address, proxyURL string) (s socket, err error) {
        if err != nil {
                return
        }
+       defer func() {
+               if err != nil {
+                       l.Close()
+               }
+       }()
 
        // If we don't need the proxy - then we should return default net.Dialer,
        // otherwise, let's try to parse the proxyURL and return proxy.Dialer
        if len(proxyURL) != 0 {
+               // TODO: The error should be propagated, as proxy may be in use for
+               // security or privacy reasons. Also just pass proxy.Dialer in from
+               // the Config.
                if dialer, err := getProxyDialer(proxyURL); err == nil {
-                       return tcpSocket{l, dialer}, nil
+                       return tcpSocket{l, func(ctx context.Context, addr string) (conn net.Conn, err error) {
+                               defer perf.ScopeTimerErr(&err)()
+                               return dialer.Dial(network, addr)
+                       }}, nil
                }
        }
-
-       return tcpSocket{l, nil}, nil
+       dialer := net.Dialer{}
+       return tcpSocket{l, func(ctx context.Context, addr string) (conn net.Conn, err error) {
+               defer perf.ScopeTimerErr(&err)()
+               return dialer.DialContext(ctx, network, addr)
+       }}, nil
 }
 
 type tcpSocket struct {
        net.Listener
-       d proxy.Dialer
+       d func(ctx context.Context, addr string) (net.Conn, error)
 }
 
 func (me tcpSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
-       if me.d != nil {
-               return me.d.Dial(me.Addr().Network(), addr)
-       }
-
-       return net.Dial(me.Addr().Network(), addr)
+       return me.d(ctx, addr)
 }
 
 func setPort(addr string, port int) string {
@@ -159,7 +170,8 @@ type utpSocketSocket struct {
        d       proxy.Dialer
 }
 
-func (me utpSocketSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
+func (me utpSocketSocket) dial(ctx context.Context, addr string) (conn net.Conn, err error) {
+       defer perf.ScopeTimerErr(&err)()
        if me.d != nil {
                return me.d.Dial(me.network, addr)
        }
index b75c44c5294c7b4b82112904d310a659079d6232..481d4c34ad74fada34d64ee793c89606dff37550 100644 (file)
@@ -135,7 +135,7 @@ type Torrent struct {
        // different pieces.
        connPieceInclinationPool sync.Pool
        // Torrent-level statistics.
-       stats TorrentStats
+       stats ConnStats
 
        // Count of each request across active connections.
        pendingRequests map[request]int
@@ -249,6 +249,9 @@ func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
 func (t *Torrent) addPeer(p Peer) {
        cl := t.cl
        peersAddedBySource.Add(string(p.Source), 1)
+       if t.closed.IsSet() {
+               return
+       }
        if cl.badPeerIPPort(p.IP, p.Port) {
                torrent.Add("peers not added because of bad addr", 1)
                return
@@ -708,15 +711,11 @@ func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
 }
 
 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
-       tr := perf.NewTimer()
-
+       defer perf.ScopeTimerErr(&err)()
        n, err := t.pieces[piece].Storage().WriteAt(data, begin)
        if err == nil && n != len(data) {
                err = io.ErrShortWrite
        }
-       if err == nil {
-               tr.Mark("write chunk")
-       }
        return
 }
 
@@ -848,7 +847,7 @@ func (t *Torrent) worstBadConn() *connection {
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*connection)
-               if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
+               if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
                        return c
                }
                // If the connection is in the worst half of the established
@@ -1219,6 +1218,7 @@ func (t *Torrent) deleteConnection(c *connection) (ret bool) {
        }
        _, ret = t.conns[c]
        delete(t.conns, c)
+       torrent.Add("deleted connections", 1)
        c.deleteAllRequests()
        if len(t.conns) == 0 {
                t.assertNoPendingRequests()
@@ -1227,10 +1227,8 @@ func (t *Torrent) deleteConnection(c *connection) (ret bool) {
 }
 
 func (t *Torrent) assertNoPendingRequests() {
-       for _, num := range t.pendingRequests {
-               if num != 0 {
-                       panic(num)
-               }
+       if len(t.pendingRequests) != 0 {
+               panic(t.pendingRequests)
        }
 }
 
@@ -1282,7 +1280,10 @@ func (t *Torrent) startScrapingTracker(_url string) {
        if _url == "" {
                return
        }
-       u, _ := url.Parse(_url)
+       u, err := url.Parse(_url)
+       if err != nil {
+               log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
+       }
        if u.Scheme == "udp" {
                u.Scheme = "udp4"
                t.startScrapingTracker(u.String())
@@ -1340,9 +1341,9 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
                // The following are vaguely described in BEP 3.
 
                Left:     t.bytesLeftAnnounce(),
-               Uploaded: t.stats.BytesWrittenData,
+               Uploaded: t.stats.BytesWrittenData.Int64(),
                // There's no mention of wasted or unwanted download in the BEP.
-               Downloaded: t.stats.BytesReadUsefulData,
+               Downloaded: t.stats.BytesReadUsefulData.Int64(),
        }
 }
 
@@ -1432,23 +1433,24 @@ func (t *Torrent) addPeers(peers []Peer) {
 }
 
 func (t *Torrent) Stats() TorrentStats {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.mu.RLock()
+       defer t.cl.mu.RUnlock()
        return t.statsLocked()
 }
 
-func (t *Torrent) statsLocked() TorrentStats {
-       t.stats.ActivePeers = len(t.conns)
-       t.stats.HalfOpenPeers = len(t.halfOpen)
-       t.stats.PendingPeers = t.peers.Len()
-       t.stats.TotalPeers = t.numTotalPeers()
-       t.stats.ConnectedSeeders = 0
+func (t *Torrent) statsLocked() (ret TorrentStats) {
+       ret.ActivePeers = len(t.conns)
+       ret.HalfOpenPeers = len(t.halfOpen)
+       ret.PendingPeers = t.peers.Len()
+       ret.TotalPeers = t.numTotalPeers()
+       ret.ConnectedSeeders = 0
        for c := range t.conns {
                if all, ok := c.peerHasAllPieces(); all && ok {
-                       t.stats.ConnectedSeeders++
+                       ret.ConnectedSeeders++
                }
        }
-       return t.stats
+       ret.ConnStats = t.stats.Copy()
+       return
 }
 
 // The total number of peers in the torrent.
@@ -1474,42 +1476,49 @@ func (t *Torrent) numTotalPeers() int {
 // Reconcile bytes transferred before connection was associated with a
 // torrent.
 func (t *Torrent) reconcileHandshakeStats(c *connection) {
-       t.stats.wroteBytes(c.stats.BytesWritten)
-       t.stats.readBytes(c.stats.BytesRead)
+       if c.stats != (ConnStats{
+               // Handshakes should only increment these fields:
+               BytesWritten: c.stats.BytesWritten,
+               BytesRead:    c.stats.BytesRead,
+       }) {
+               panic("bad stats")
+       }
+       c.postHandshakeStats(func(cs *ConnStats) {
+               cs.BytesRead.Add(c.stats.BytesRead.Int64())
+               cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
+       })
+       c.reconciledHandshakeStats = true
 }
 
 // Returns true if the connection is added.
-func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
+func (t *Torrent) addConnection(c *connection) (err error) {
+       defer func() {
+               if err == nil {
+                       torrent.Add("added connections", 1)
+               }
+       }()
        if t.closed.IsSet() {
-               return false
-       }
-       if !t.wantConns() {
-               return false
+               return errors.New("torrent closed")
        }
        for c0 := range t.conns {
-               if c.PeerID == c0.PeerID {
-                       // Already connected to a client with that ID.
-                       duplicateClientConns.Add(1)
-                       lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
-                       // Retain the connection from initiated from lower peer ID to
-                       // higher.
-                       if outgoing == lower {
-                               // Close the other one.
-                               c0.Close()
-                               // TODO: Is it safe to delete from the map while we're
-                               // iterating over it?
-                               t.deleteConnection(c0)
-                       } else {
-                               // Abandon this one.
-                               return false
-                       }
+               if c.PeerID != c0.PeerID {
+                       continue
+               }
+               if !t.cl.config.dropDuplicatePeerIds {
+                       continue
+               }
+               if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
+                       c0.Close()
+                       t.deleteConnection(c0)
+               } else {
+                       return errors.New("existing connection preferred")
                }
        }
+       if !t.wantConns() {
+               return errors.New("don't want conns")
+       }
        if len(t.conns) >= t.maxEstablishedConns {
                c := t.worstBadConn()
-               if c == nil {
-                       return false
-               }
                if t.cl.config.Debug && missinggo.CryHeard() {
                        log.Printf("%s: dropping connection to make room for new one:\n    %v", t, c)
                }
@@ -1519,8 +1528,8 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
        if len(t.conns) >= t.maxEstablishedConns {
                panic(len(t.conns))
        }
-       c.setTorrent(t)
-       return true
+       t.conns[c] = struct{}{}
+       return nil
 }
 
 func (t *Torrent) wantConns() bool {
@@ -1575,10 +1584,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
        }
        if correct {
                if len(touchers) != 0 {
-                       t.stats.PiecesDirtiedGood++
+                       // Don't increment stats above connection-level for every involved
+                       // connection.
+                       t.allStats((*ConnStats).incrementPiecesDirtiedGood)
                }
                for _, c := range touchers {
-                       c.stats.PiecesDirtiedGood++
+                       c.stats.incrementPiecesDirtiedGood()
                }
                err := p.Storage().MarkComplete()
                if err != nil {
@@ -1586,10 +1597,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
                }
        } else {
                if len(touchers) != 0 {
-                       t.stats.PiecesDirtiedBad++
+                       // Don't increment stats above connection-level for every involved
+                       // connection.
+                       t.allStats((*ConnStats).incrementPiecesDirtiedBad)
                        for _, c := range touchers {
                                // Y u do dis peer?!
-                               c.stats.PiecesDirtiedBad++
+                               c.stats.incrementPiecesDirtiedBad()
                        }
                        slices.Sort(touchers, connLessTrusted)
                        if t.cl.config.Debug {
@@ -1746,3 +1759,10 @@ func (t *Torrent) AddClientPeer(cl *Client) {
                return
        }())
 }
+
+// All stats that include this Torrent. Useful when we want to increment
+// ConnStats but not for every connection.
+func (t *Torrent) allStats(f func(*ConnStats)) {
+       f(&t.stats)
+       f(&t.cl.stats)
+}
index c9ea5e42841f8834d1a7882b422f4ed3b95301d3..33eb49035cfc725c704722572cba0a3fc80f5baa 100644 (file)
@@ -80,7 +80,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
                numPieces   = 13410
                pieceLength = 256 << 10
        )
-       cl := &Client{}
+       cl := &Client{config: &ClientConfig{}}
        cl.initLogger()
        t := cl.newTorrent(metainfo.Hash{}, nil)
        require.NoError(b, t.setInfo(&metainfo.Info{
@@ -107,7 +107,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
 // Check that a torrent containing zero-length file(s) will start, and that
 // they're created in the filesystem. The client storage is assumed to be
 // file-based on the native filesystem based.
-func testEmptyFilesAndZeroPieceLength(t *testing.T, cfg *Config) {
+func testEmptyFilesAndZeroPieceLength(t *testing.T, cfg *ClientConfig) {
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
@@ -149,6 +149,7 @@ func TestEmptyFilesAndZeroPieceLengthWithMMapStorage(t *testing.T) {
 func TestPieceHashFailed(t *testing.T) {
        mi := testutil.GreetingMetaInfo()
        cl := new(Client)
+       cl.config = &ClientConfig{}
        cl.initLogger()
        tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
        tt.setChunkSize(2)
index dbbc3c279cb5006c8d4479966a16d24a580fac22..51a23f12b125c32e66964c348d655673aa2ad209 100644 (file)
@@ -3,13 +3,15 @@ package torrent
 import "container/heap"
 
 func worseConn(l, r *connection) bool {
-       if l.useful() != r.useful() {
-               return r.useful()
-       }
-       if !l.lastHelpful().Equal(r.lastHelpful()) {
-               return l.lastHelpful().Before(r.lastHelpful())
-       }
-       return l.completedHandshake.Before(r.completedHandshake)
+       var ml multiLess
+       ml.NextBool(!l.useful(), !r.useful())
+       ml.StrictNext(
+               l.lastHelpful().Equal(r.lastHelpful()),
+               l.lastHelpful().Before(r.lastHelpful()))
+       ml.StrictNext(
+               l.completedHandshake.Equal(r.completedHandshake),
+               l.completedHandshake.Before(r.completedHandshake))
+       return ml.Final()
 }
 
 type worseConnSlice struct {