- run: go test -v -race $PROJECT_GO_PACKAGE/...
- run: go test -bench . $PROJECT_GO_PACKAGE/...
- run: CGO_ENABLED=0 go get -t -d -v $PROJECT_GO_PACKAGE/...
- - run: CGO_ENABLED=0 go test -v $PROJECT_GO_PACKAGE/... || true
+ - run: set +e; CGO_ENABLED=0 go test -v $PROJECT_GO_PACKAGE/...; true
- run: go get golang.org/x/mobile/cmd/gomobile
- run: gomobile init
- run: gomobile build -target=android $PROJECT_GO_PACKAGE
"runtime"
"strconv"
"strings"
+ "unsafe"
)
type Decoder struct {
if b >= '0' && b <= '9' {
start := d.buf.Len() - 1
d.readUntil(':')
- length, err := strconv.ParseInt(d.buf.String()[start:], 10, 64)
+ s := reflect.StringHeader{
+ uintptr(unsafe.Pointer(&d.buf.Bytes()[start])),
+ d.buf.Len() - start,
+ }
+ length, err := strconv.ParseInt(*(*string)(unsafe.Pointer(&s)), 10, 64)
checkForIntParseError(err, d.Offset-1)
d.buf.WriteString(":")
"strings"
"time"
+ "github.com/anacrolix/missinggo/perf"
+
"github.com/anacrolix/dht"
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/sync"
+ "github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
- "golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/iplist"
event sync.Cond
closed missinggo.Event
- config Config
+ config *ClientConfig
logger *log.Logger
halfOpenLimit int
ipBlockList iplist.Ranger
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
extensionBytes peerExtensionBytes
- uploadLimit *rate.Limiter
- downloadLimit *rate.Limiter
// Set of addresses that have our client ID. This intentionally will
// include ourselves if we end up trying to connect to our own address
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
torrents map[metainfo.Hash]*Torrent
+ // An aggregate of stats over all connections.
+ stats ConnStats
+
+ acceptLimiter map[ipStr]int
}
+type ipStr string
+
func (cl *Client) BadPeerIPs() []string {
cl.mu.RLock()
defer cl.mu.RUnlock()
// Writes out a human readable status of the client, such as for writing to a
// HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.mu.RLock()
+ defer cl.mu.RUnlock()
w := bufio.NewWriter(_w)
defer w.Flush()
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s)
})
+ spew.Fdump(w, cl.stats)
fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
fmt.Fprintln(w)
for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
}
-func NewClient(cfg *Config) (cl *Client, err error) {
+func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if cfg == nil {
- cfg = &Config{}
+ cfg = NewDefaultClientConfig()
}
- cfg.setDefaults()
-
defer func() {
if err != nil {
cl = nil
}()
cl = &Client{
halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
- config: *cfg,
+ config: cfg,
dopplegangerAddrs: make(map[string]struct{}),
torrents: make(map[metainfo.Hash]*Torrent),
}
+ go cl.acceptLimitClearer()
cl.initLogger()
defer func() {
if err == nil {
}
cl.Close()
}()
- if cfg.UploadRateLimiter == nil {
- cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
- } else {
- cl.uploadLimit = cfg.UploadRateLimiter
- }
- if cfg.DownloadRateLimiter == nil {
- cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
- } else {
- cl.downloadLimit = cfg.DownloadRateLimiter
- }
cl.extensionBytes = defaultPeerExtensionBytes()
cl.event.L = &cl.mu
storageImpl := cfg.DefaultStorage
if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
return true
}
+ if cl.rateLimitAccept(rip) {
+ return true
+ }
return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
}
func (cl *Client) acceptConnections(l net.Listener) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
for {
- cl.waitAccept()
- cl.mu.Unlock()
conn, err := l.Accept()
conn = pproffd.WrapNetConn(conn)
- cl.mu.Lock()
- if cl.closed.IsSet() {
+ cl.mu.RLock()
+ closed := cl.closed.IsSet()
+ reject := false
+ if conn != nil {
+ reject = cl.rejectAccepted(conn)
+ }
+ cl.mu.RUnlock()
+ if closed {
if conn != nil {
conn.Close()
}
// routine just fucked off.
return
}
- log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
- go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
- go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
- go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
- if cl.rejectAccepted(conn) {
- go torrent.Add("rejected accepted connections", 1)
- conn.Close()
- } else {
- go cl.incomingConnection(conn)
- }
+ go func() {
+ if reject {
+ torrent.Add("rejected accepted connections", 1)
+ conn.Close()
+ } else {
+ go cl.incomingConnection(conn)
+ }
+ log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
+ torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
+ torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
+ torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
+ }()
}
}
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
- c := cl.newConnection(nc)
+ c := cl.newConnection(nc, false)
c.Discovery = peerSourceIncoming
cl.runReceivedConn(c)
}
func countDialResult(err error) {
if err == nil {
- successfulDials.Add(1)
+ torrent.Add("successful dials", 1)
} else {
- unsuccessfulDials.Add(1)
+ torrent.Add("unsuccessful dials", 1)
}
}
return ok
}
-func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
- d := net.Dialer{
- // Can't bind to the listen address, even though we intend to create an
- // endpoint pair that is distinct. Oh well.
-
- // LocalAddr: cl.tcpListener.Addr(),
- }
- c, err = d.DialContext(ctx, "tcp"+ipNetworkSuffix(!cl.config.DisableIPv4 && !cl.config.DisableIPv4Peers, !cl.config.DisableIPv6), addr)
- countDialResult(err)
- if err == nil {
- c.(*net.TCPConn).SetLinger(0)
- }
- c = pproffd.WrapNetConn(c)
- return
-}
-
func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
switch {
case allowIpv4 && allowIpv6:
var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
-func peerNetworkEnabled(network string, cfg Config) bool {
+func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
c := func(s string) bool {
return strings.Contains(network, s)
}
left++
go func() {
c, err := f(ctx, addr)
+ // This is a bit optimistic, but it looks non-trivial to thread
+ // this through the proxy code. Set it now in case we close the
+ // connection forthwith.
+ if tc, ok := c.(*net.TCPConn); ok {
+ tc.SetLinger(0)
+ }
countDialResult(err)
resCh <- dialResult{c}
}()
}()
var res dialResult
// Wait for a successful connection.
- for ; left > 0 && res.Conn == nil; left-- {
- res = <-resCh
- }
+ func() {
+ defer perf.ScopeTimer()()
+ for ; left > 0 && res.Conn == nil; left-- {
+ res = <-resCh
+ }
+ }()
// There are still incompleted dials.
go func() {
for ; left > 0; left-- {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
- c = cl.newConnection(nc)
+ c = cl.newConnection(nc, true)
c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
return
}
if c != nil {
- go torrent.Add("initiated conn with preferred header obfuscation", 1)
+ torrent.Add("initiated conn with preferred header obfuscation", 1)
return
}
if cl.config.ForceEncryption {
// Try again with encryption if we didn't earlier, or without if we did.
c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
if c != nil {
- go torrent.Add("initiated conn with fallback header obfuscation", 1)
+ torrent.Add("initiated conn with fallback header obfuscation", 1)
}
return
}
}
defer c.Close()
c.Discovery = ps
- cl.runHandshookConn(c, t, true)
+ cl.runHandshookConn(c, t)
}
// The port number for incoming peer connections. 0 if the client isn't
// Do encryption and bittorrent handshakes as receiver.
func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
+ defer perf.ScopeTimerErr(&err)()
var rw io.ReadWriter
rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
c.setRW(rw)
}
t, err := cl.receiveHandshakes(c)
if err != nil {
- if cl.config.Debug {
- log.Printf("error receiving handshakes: %s", err)
- }
+ log.Fmsg(
+ "error receiving handshakes: %s", err,
+ ).AddValue(
+ debugLogValue,
+ ).Add(
+ "network", c.remoteAddr().Network(),
+ ).Log(cl.logger)
+ torrent.Add("error receiving handshake", 1)
+ cl.mu.Lock()
+ cl.onBadAccept(c.remoteAddr())
+ cl.mu.Unlock()
return
}
if t == nil {
+ torrent.Add("received handshake for unloaded torrent", 1)
+ cl.mu.Lock()
+ cl.onBadAccept(c.remoteAddr())
+ cl.mu.Unlock()
return
}
+ torrent.Add("received handshake for loaded torrent", 1)
cl.mu.Lock()
defer cl.mu.Unlock()
- cl.runHandshookConn(c, t, false)
+ cl.runHandshookConn(c, t)
}
-func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
- t.reconcileHandshakeStats(c)
+func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
+ c.setTorrent(t)
if c.PeerID == cl.peerID {
- if outgoing {
+ if c.outgoing {
connsToSelf.Add(1)
addr := c.conn.RemoteAddr().String()
cl.dopplegangerAddrs[addr] = struct{}{}
if connIsIpv6(c.conn) {
torrent.Add("completed handshake over ipv6", 1)
}
- if !t.addConnection(c, outgoing) {
+ if err := t.addConnection(c); err != nil {
+ log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
return
}
defer t.dropConnection(c)
return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])
- c.stats.ChunksReadUseful++
- c.t.stats.ChunksReadUseful++
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType:
return
}
new = true
+
t = cl.newTorrent(infoHash, specStorage)
cl.eachDhtServer(func(s *dht.Server) {
go t.dhtAnnouncer(s)
})
cl.torrents[infoHash] = t
+ cl.clearAcceptLimits()
t.updateWantPeersEvent()
// Tickle Client.waitAccept, new torrent may want conns.
cl.event.Broadcast()
cl.badPeerIPs[ip.String()] = struct{}{}
}
-func (cl *Client) newConnection(nc net.Conn) (c *connection) {
+func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
c = &connection{
conn: nc,
+ outgoing: outgoing,
Choked: true,
PeerChoked: true,
PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer),
}
c.writerCond.L = &cl.mu
- c.setRW(connStatsReadWriter{nc, &cl.mu, c})
+ c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
- l: cl.downloadLimit,
+ l: cl.config.DownloadRateLimiter,
r: c.r,
}
return
}
func (cl *Client) ListenAddrs() (ret []net.Addr) {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
cl.eachListener(func(l socket) bool {
ret = append(ret, l.Addr())
return true
})
return
}
+
+func (cl *Client) onBadAccept(addr net.Addr) {
+ ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
+ if cl.acceptLimiter == nil {
+ cl.acceptLimiter = make(map[ipStr]int)
+ }
+ cl.acceptLimiter[ipStr(ip.String())]++
+}
+
+func maskIpForAcceptLimiting(ip net.IP) net.IP {
+ if ip4 := ip.To4(); ip4 != nil {
+ return ip4.Mask(net.CIDRMask(24, 32))
+ }
+ return ip
+}
+
+func (cl *Client) clearAcceptLimits() {
+ cl.acceptLimiter = nil
+}
+
+func (cl *Client) acceptLimitClearer() {
+ for {
+ select {
+ case <-cl.closed.LockedChan(&cl.mu):
+ return
+ case <-time.After(15 * time.Minute):
+ cl.mu.Lock()
+ cl.clearAcceptLimits()
+ cl.mu.Unlock()
+ }
+ }
+}
+
+func (cl *Client) rateLimitAccept(ip net.IP) bool {
+ if cl.config.DisableAcceptRateLimiting {
+ return false
+ }
+ return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
+}
package torrent
import (
- "context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
- "net"
"os"
"path/filepath"
+ "reflect"
"sync"
"testing"
"time"
"github.com/anacrolix/torrent/storage"
)
-func TestingConfig() *Config {
- return &Config{
- ListenHost: LoopbackListenHost,
- NoDHT: true,
- DataDir: tempDir(),
- DisableTrackers: true,
- NoDefaultPortForwarding: true,
- // Debug: true,
- }
+func TestingConfig() *ClientConfig {
+ cfg := NewDefaultClientConfig()
+ cfg.ListenHost = LoopbackListenHost
+ cfg.NoDHT = true
+ cfg.DataDir = tempDir()
+ cfg.DisableTrackers = true
+ cfg.NoDefaultPortForwarding = true
+ cfg.DisableAcceptRateLimiting = true
+ return cfg
}
func TestClientDefault(t *testing.T) {
func TestTorrentInitialState(t *testing.T) {
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
- cl := &Client{}
+ cl := &Client{
+ config: &ClientConfig{},
+ }
cl.initLogger()
tor := cl.newTorrent(
mi.HashInfoBytes(),
}
func TestReducedDialTimeout(t *testing.T) {
- cfg := &Config{}
- cfg.setDefaults()
+ cfg := NewDefaultClientConfig()
for _, _case := range []struct {
Max time.Duration
HalfOpenLimit int
}
}
-func TestUTPRawConn(t *testing.T) {
- l, err := NewUtpSocket("udp", "")
- require.NoError(t, err)
- defer l.Close()
- go func() {
- for {
- _, err := l.Accept()
- if err != nil {
- break
- }
- }
- }()
- // Connect a UTP peer to see if the RawConn will still work.
- s, err := NewUtpSocket("udp", "")
- require.NoError(t, err)
- defer s.Close()
- utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
- require.NoError(t, err)
- defer utpPeer.Close()
- peer, err := net.ListenPacket("udp", ":0")
- require.NoError(t, err)
- defer peer.Close()
-
- msgsReceived := 0
- // How many messages to send. I've set this to double the channel buffer
- // size in the raw packetConn.
- const N = 200
- readerStopped := make(chan struct{})
- // The reader goroutine.
- go func() {
- defer close(readerStopped)
- b := make([]byte, 500)
- for i := 0; i < N; i++ {
- n, _, err := l.ReadFrom(b)
- require.NoError(t, err)
- msgsReceived++
- var d int
- fmt.Sscan(string(b[:n]), &d)
- assert.Equal(t, i, d)
- }
- }()
- udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
- require.NoError(t, err)
- for i := 0; i < N; i++ {
- _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
- require.NoError(t, err)
- time.Sleep(time.Millisecond)
- }
- select {
- case <-readerStopped:
- case <-time.After(time.Second):
- t.Fatal("reader timed out")
- }
- if msgsReceived != N {
- t.Fatalf("messages received: %d", msgsReceived)
- }
-}
-
func TestAddDropManyTorrents(t *testing.T) {
cl, err := NewClient(TestingConfig())
require.NoError(t, err)
// Create seeder and a Torrent.
cfg := TestingConfig()
cfg.Seed = true
- cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+ if ps.SeederUploadRateLimiter != nil {
+ cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+ }
// cfg.ListenAddr = "localhost:4000"
if ps.SeederStorage != nil {
cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(leecherDataDir)
+ cfg = TestingConfig()
if ps.LeecherStorage == nil {
cfg.DataDir = leecherDataDir
} else {
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
}
- cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+ if ps.LeecherDownloadRateLimiter != nil {
+ cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+ }
cfg.Seed = false
leecher, err := NewClient(cfg)
require.NoError(t, err)
r.SetReadahead(ps.Readahead)
}
assertReadAllGreeting(t, r)
- assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
- assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
- assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
- assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
+
+ seederStats := seederTorrent.Stats()
+ assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+ assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+ leecherStats := leecherTorrent.Stats()
+ assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+ assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
// Try reading through again for the cases where the torrent data size
// exceeds the size of the cache.
assertReadAllGreeting(t, r)
func TestSeedAfterDownloading(t *testing.T) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
+
cfg := TestingConfig()
cfg.Seed = true
cfg.DataDir = greetingTempDir
require.NoError(t, err)
assert.True(t, ok)
seederTorrent.VerifyData()
+
+ cfg = TestingConfig()
+ cfg.Seed = true
cfg.DataDir, err = ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(cfg.DataDir)
require.NoError(t, err)
defer leecher.Close()
testutil.ExportStatusWriter(leecher, "l")
+
+ cfg = TestingConfig()
cfg.Seed = false
cfg.DataDir, err = ioutil.TempDir("", "")
require.NoError(t, err)
}()
done := make(chan struct{})
defer close(done)
- go func() {
- for {
- go leecherGreeting.AddClientPeer(seeder)
- go leecherGreeting.AddClientPeer(leecherLeecher)
- select {
- case <-done:
- return
- case <-time.After(time.Second):
- }
- }
- }()
+ go leecherGreeting.AddClientPeer(seeder)
+ go leecherGreeting.AddClientPeer(leecherLeecher)
wg.Add(1)
go func() {
defer wg.Done()
}
type testDownloadCancelParams struct {
- ExportClientStatus bool
SetLeecherStorageCapacity bool
LeecherStorageCapacity int64
Cancel bool
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
- if ps.ExportClientStatus {
- testutil.ExportStatusWriter(seeder, "s")
- }
+ testutil.ExportStatusWriter(seeder, "s")
seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
seederTorrent.VerifyData()
leecherDataDir, err := ioutil.TempDir("", "")
cfg.DataDir = leecherDataDir
leecher, _ := NewClient(cfg)
defer leecher.Close()
- if ps.ExportClientStatus {
- testutil.ExportStatusWriter(leecher, "l")
- }
+ testutil.ExportStatusWriter(leecher, "l")
leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
}
leecherGreeting.cl.mu.Unlock()
-
- leecherGreeting.AddClientPeer(seeder)
+ done := make(chan struct{})
+ defer close(done)
+ go leecherGreeting.AddClientPeer(seeder)
completes := make(map[int]bool, 3)
-values:
- for {
- // started := time.Now()
+ expected := func() map[int]bool {
+ if ps.Cancel {
+ return map[int]bool{0: false, 1: false, 2: false}
+ } else {
+ return map[int]bool{0: true, 1: true, 2: true}
+ }
+ }()
+ for !reflect.DeepEqual(completes, expected) {
select {
case _v := <-psc.Values:
- // log.Print(time.Since(started))
v := _v.(PieceStateChange)
completes[v.Index] = v.Complete
- case <-time.After(100 * time.Millisecond):
- break values
}
}
- if ps.Cancel {
- assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
- } else {
- assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
- }
-
}
func TestTorrentDownloadAll(t *testing.T) {
defer ss.Close()
var tts []*Torrent
ih := testutil.GreetingMetaInfo().HashInfoBytes()
+ cfg := TestingConfig()
+ cfg.DisableAcceptRateLimiting = true
+ cfg.dropDuplicatePeerIds = true
for i := range iter.N(3) {
- cl, err := NewClient(TestingConfig())
+ cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
tt, _ := cl.AddTorrentInfoHash(ih)
package main
import (
+ "bufio"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
+ "github.com/anacrolix/envpprof"
"github.com/anacrolix/tagflag"
"github.com/bradfitz/iter"
}
func main() {
+ defer envpprof.Stop()
tagflag.Parse(&flags)
- err := processReader(os.Stdin)
+ err := processReader(bufio.NewReader(os.Stdin))
if err != nil {
log.Fatal(err)
}
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
- var rootGroup struct {
- Client torrent.Config `group:"Client Options"`
- TestPeers []string `long:"test-peer" description:"address of peer to inject to every torrent"`
- Pick string `long:"pick" description:"filename to pick"`
+ var rootGroup = struct {
+ Client *torrent.ClientConfig `group:"Client Options"`
+ TestPeers []string `long:"test-peer" description:"address of peer to inject to every torrent"`
+ Pick string `long:"pick" description:"filename to pick"`
+ }{
+ Client: torrent.NewDefaultClientConfig(),
}
// Don't pass flags.PrintError because it's inconsistent with printing.
// https://github.com/jessevdk/go-flags/issues/132
rootGroup.Client.DataDir = tmpdir
- client, err := torrent.NewClient(&rootGroup.Client)
+ client, err := torrent.NewClient(rootGroup.Client)
if err != nil {
log.Fatalf("error creating client: %s", err)
}
log.SetFlags(log.LstdFlags | log.Lshortfile)
tagflag.Parse(&flags)
defer envpprof.Stop()
- clientConfig := torrent.Config{
- Debug: flags.Debug,
- Seed: flags.Seed,
- }
+ clientConfig := torrent.NewDefaultClientConfig()
+ clientConfig.Debug = flags.Debug
+ clientConfig.Seed = flags.Seed
if flags.PackedBlocklist != "" {
blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
if err != nil {
clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20)
}
- client, err := torrent.NewClient(&clientConfig)
+ client, err := torrent.NewClient(clientConfig)
if err != nil {
log.Fatalf("error creating client: %s", err)
}
defer fuse.Unmount(args.MountDir)
// TODO: Think about the ramifications of exiting not due to a signal.
defer conn.Close()
- cfg := torrent.Config{
- DataDir: args.DownloadDir,
- DisableTrackers: args.DisableTrackers,
- NoUpload: true, // Ensure that downloads are responsive.
- }
+ cfg := torrent.NewDefaultClientConfig()
+ cfg.DataDir = args.DownloadDir
+ cfg.DisableTrackers = args.DisableTrackers
+ cfg.NoUpload = true // Ensure that downloads are responsive.
cfg.SetListenAddr(args.ListenAddr.String())
- client, err := torrent.NewClient(&cfg)
+ client, err := torrent.NewClient(cfg)
if err != nil {
log.Print(err)
return 1
"github.com/anacrolix/torrent/storage"
)
-var DefaultHTTPClient = &http.Client{
- Timeout: time.Second * 15,
- Transport: &http.Transport{
- Dial: (&net.Dialer{
- Timeout: 15 * time.Second,
- }).Dial,
- TLSHandshakeTimeout: 15 * time.Second,
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
- },
-}
var DefaultHTTPUserAgent = "Go-Torrent/1.0"
-// Override Client defaults.
-type Config struct {
+// Probably not safe to modify this after it's given to a Client.
+type ClientConfig struct {
// Store torrent file data in this directory unless .DefaultStorage is
// specified.
DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"`
PublicIp4 net.IP
PublicIp6 net.IP
+
+ DisableAcceptRateLimiting bool
+ dropDuplicatePeerIds bool
}
-func (cfg *Config) SetListenAddr(addr string) *Config {
+func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
host, port, err := missinggo.ParseHostPort(addr)
expect.Nil(err)
cfg.ListenHost = func(string) string { return host }
return cfg
}
-func (cfg *Config) setDefaults() {
- if cfg.HTTP == nil {
- cfg.HTTP = DefaultHTTPClient
- if cfg.ProxyURL != "" {
- cfg.setProxyURL()
- }
- }
- if cfg.HTTPUserAgent == "" {
- cfg.HTTPUserAgent = DefaultHTTPUserAgent
- }
- if cfg.ExtendedHandshakeClientVersion == "" {
- cfg.ExtendedHandshakeClientVersion = "go.torrent dev 20150624"
- }
- if cfg.Bep20 == "" {
- cfg.Bep20 = "-GT0001-"
- }
- if cfg.NominalDialTimeout == 0 {
- cfg.NominalDialTimeout = 30 * time.Second
- }
- if cfg.MinDialTimeout == 0 {
- cfg.MinDialTimeout = 5 * time.Second
- }
- if cfg.EstablishedConnsPerTorrent == 0 {
- cfg.EstablishedConnsPerTorrent = 50
- }
- if cfg.HalfOpenConnsPerTorrent == 0 {
- cfg.HalfOpenConnsPerTorrent = (cfg.EstablishedConnsPerTorrent + 1) / 2
- }
- if cfg.TorrentPeersHighWater == 0 {
- // Memory and freshness are the concern here.
- cfg.TorrentPeersHighWater = 500
- }
- if cfg.TorrentPeersLowWater == 0 {
- cfg.TorrentPeersLowWater = 2 * cfg.HalfOpenConnsPerTorrent
- }
- if cfg.HandshakesTimeout == 0 {
- cfg.HandshakesTimeout = 20 * time.Second
- }
- if cfg.DhtStartingNodes == nil {
- cfg.DhtStartingNodes = dht.GlobalBootstrapAddrs
- }
- if cfg.ListenHost == nil {
- cfg.ListenHost = func(string) string { return "" }
+func NewDefaultClientConfig() *ClientConfig {
+ return &ClientConfig{
+ HTTP: &http.Client{
+ Timeout: time.Second * 15,
+ Transport: &http.Transport{
+ Dial: (&net.Dialer{
+ Timeout: 15 * time.Second,
+ }).Dial,
+ TLSHandshakeTimeout: 15 * time.Second,
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ }},
+ HTTPUserAgent: DefaultHTTPUserAgent,
+ ExtendedHandshakeClientVersion: "go.torrent dev 20150624",
+ Bep20: "-GT0001-",
+ NominalDialTimeout: 30 * time.Second,
+ MinDialTimeout: 5 * time.Second,
+ EstablishedConnsPerTorrent: 50,
+ HalfOpenConnsPerTorrent: 25,
+ TorrentPeersHighWater: 500,
+ TorrentPeersLowWater: 50,
+ HandshakesTimeout: 20 * time.Second,
+ DhtStartingNodes: dht.GlobalBootstrapAddrs,
+ ListenHost: func(string) string { return "" },
+ UploadRateLimiter: unlimited,
+ DownloadRateLimiter: unlimited,
}
}
-func (cfg *Config) setProxyURL() {
+func (cfg *ClientConfig) setProxyURL() {
fixedURL, err := url.Parse(cfg.ProxyURL)
if err != nil {
return
package torrent
import (
+ "fmt"
"io"
- "sync"
+ "reflect"
+ "sync/atomic"
pp "github.com/anacrolix/torrent/peer_protocol"
)
// is things sent to the peer, and Read is stuff received from them.
type ConnStats struct {
// Total bytes on the wire. Includes handshakes and encryption.
- BytesWritten int64
- BytesWrittenData int64
+ BytesWritten Count
+ BytesWrittenData Count
- BytesRead int64
- BytesReadData int64
- BytesReadUsefulData int64
+ BytesRead Count
+ BytesReadData Count
+ BytesReadUsefulData Count
- ChunksWritten int64
+ ChunksWritten Count
- ChunksRead int64
- ChunksReadUseful int64
- ChunksReadUnwanted int64
+ ChunksRead Count
+ ChunksReadUseful Count
+ ChunksReadUnwanted Count
// Number of pieces data was written to, that subsequently passed verification.
- PiecesDirtiedGood int64
+ PiecesDirtiedGood Count
// Number of pieces data was written to, that subsequently failed
// verification. Note that a connection may not have been the sole dirtier
// of a piece.
- PiecesDirtiedBad int64
+ PiecesDirtiedBad Count
+}
+
+func (me *ConnStats) Copy() (ret ConnStats) {
+ for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
+ n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
+ reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+ }
+ return
+}
+
+type Count struct {
+ n int64
+}
+
+var _ fmt.Stringer = (*Count)(nil)
+
+func (me *Count) Add(n int64) {
+ atomic.AddInt64(&me.n, n)
+}
+
+func (me *Count) Int64() int64 {
+ return atomic.LoadInt64(&me.n)
+}
+
+func (me *Count) String() string {
+ return fmt.Sprintf("%v", me.Int64())
}
func (cs *ConnStats) wroteMsg(msg *pp.Message) {
// TODO: Track messages and not just chunks.
switch msg.Type {
case pp.Piece:
- cs.ChunksWritten++
- cs.BytesWrittenData += int64(len(msg.Piece))
+ cs.ChunksWritten.Add(1)
+ cs.BytesWrittenData.Add(int64(len(msg.Piece)))
}
}
func (cs *ConnStats) readMsg(msg *pp.Message) {
switch msg.Type {
case pp.Piece:
- cs.ChunksRead++
- cs.BytesReadData += int64(len(msg.Piece))
+ cs.ChunksRead.Add(1)
+ cs.BytesReadData.Add(int64(len(msg.Piece)))
}
}
-func (cs *ConnStats) wroteBytes(n int64) {
- cs.BytesWritten += n
+func (cs *ConnStats) incrementPiecesDirtiedGood() {
+ cs.PiecesDirtiedGood.Add(1)
+}
+
+func (cs *ConnStats) incrementPiecesDirtiedBad() {
+ cs.PiecesDirtiedBad.Add(1)
}
-func (cs *ConnStats) readBytes(n int64) {
- cs.BytesRead += n
+func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
+ return func(cs *ConnStats) {
+ p := f(cs)
+ p.Add(n)
+ }
}
type connStatsReadWriter struct {
rw io.ReadWriter
- l sync.Locker
c *connection
}
func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
n, err = me.rw.Write(b)
- me.l.Lock()
me.c.wroteBytes(int64(n))
- me.l.Unlock()
return
}
func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
n, err = me.rw.Read(b)
- me.l.Lock()
me.c.readBytes(int64(n))
- me.l.Unlock()
return
}
type connection struct {
t *Torrent
// The actual Conn, used for closing, and setting socket options.
- conn net.Conn
+ conn net.Conn
+ outgoing bool
// The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc.
w io.Writer
cryptoMethod mse.CryptoMethod
Discovery peerSource
closed missinggo.Event
+ // Set true after we've added our ConnStats generated during handshake to
+ // other ConnStat instances as determined when the *Torrent became known.
+ reconciledHandshakeStats bool
stats ConnStats
writerCond sync.Cond
}
+// Returns true if the connection is over IPv6.
+func (cn *connection) ipv6() bool {
+ ip := missinggo.AddrIP(cn.remoteAddr())
+ if ip.To4() != nil {
+ return false
+ }
+ return len(ip) == net.IPv6len
+}
+
+// Returns true the dialer has the lower client peer ID. TODO: Find the
+// specification for this.
+func (cn *connection) isPreferredDirection() bool {
+ return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
+}
+
+// Returns whether the left connection should be preferred over the right one,
+// considering only their networking properties. If ok is false, we can't
+// decide.
+func (l *connection) hasPreferredNetworkOver(r *connection) (left, ok bool) {
+ var ml multiLess
+ ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
+ ml.NextBool(!l.utp(), !r.utp())
+ ml.NextBool(l.ipv6(), r.ipv6())
+ return ml.FinalOk()
+}
+
func (cn *connection) cumInterest() time.Duration {
ret := cn.priorInterest
if cn.Interested {
return strings.Contains(cn.remoteAddr().Network(), "utp")
}
-// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
+// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
func (cn *connection) statusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
// }
func (cn *connection) downloadRate() float64 {
- return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
+ return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
}
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
cn.cumInterest(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
+ " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
- cn.stats.ChunksReadUseful,
- cn.stats.ChunksRead,
- cn.stats.ChunksWritten,
+ &cn.stats.ChunksReadUseful,
+ &cn.stats.ChunksRead,
+ &cn.stats.ChunksWritten,
cn.requestsLowWater,
cn.numLocalRequests(),
cn.nominalMaxRequests(),
// The actual value to use as the maximum outbound requests.
func (cn *connection) nominalMaxRequests() (ret int) {
- return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
+ return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
}
func (cn *connection) onPeerSentCancel(r request) {
func (cn *connection) wroteMsg(msg *pp.Message) {
messageTypesSent.Add(msg.Type.String(), 1)
- cn.stats.wroteMsg(msg)
- cn.t.stats.wroteMsg(msg)
+ cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
}
func (cn *connection) readMsg(msg *pp.Message) {
- cn.stats.readMsg(msg)
- cn.t.stats.readMsg(msg)
+ cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
}
-func (cn *connection) wroteBytes(n int64) {
- cn.stats.wroteBytes(n)
- if cn.t != nil {
- cn.t.stats.wroteBytes(n)
+// After handshake, we know what Torrent and Client stats to include for a
+// connection.
+func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
+ t := cn.t
+ f(&t.stats)
+ f(&t.cl.stats)
+}
+
+// All ConnStats that include this connection. Some objects are not known
+// until the handshake is complete, after which it's expected to reconcile the
+// differences.
+func (cn *connection) allStats(f func(*ConnStats)) {
+ f(&cn.stats)
+ if cn.reconciledHandshakeStats {
+ cn.postHandshakeStats(f)
}
}
+func (cn *connection) wroteBytes(n int64) {
+ cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
+}
+
func (cn *connection) readBytes(n int64) {
- cn.stats.readBytes(n)
- if cn.t != nil {
- cn.t.stats.readBytes(n)
- }
+ cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
}
// Returns whether the connection could be useful to us. We're seeding and
func (c *connection) receiveChunk(msg *pp.Message) {
t := c.t
cl := t.cl
- chunksReceived.Add(1)
+ torrent.Add("chunks received", 1)
req := newRequestFromMessage(msg)
if c.deleteRequest(req) {
c.updateRequests()
} else {
- unexpectedChunksReceived.Add(1)
+ torrent.Add("chunks received unexpected", 1)
}
if c.PeerChoked {
// Do we actually want this chunk?
if !t.wantPiece(req) {
- unwantedChunksReceived.Add(1)
- c.stats.ChunksReadUnwanted++
- c.t.stats.ChunksReadUnwanted++
+ torrent.Add("chunks received unwanted", 1)
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
return
}
index := int(req.Index)
piece := &t.pieces[index]
- c.stats.ChunksReadUseful++
- c.t.stats.ChunksReadUseful++
- c.stats.BytesReadUsefulData += int64(len(msg.Piece))
- c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
return false
}
// Don't upload more than 100 KiB more than we download.
- if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
+ if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
return false
}
return true
return false
}
for r := range c.PeerRequests {
- res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+ res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
if !res.OK() {
panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
}
}
func (cn *connection) netGoodPiecesDirtied() int64 {
- return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
+ return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
}
func (c *connection) peerHasWantedPieces() bool {
return false
}
delete(c.requests, r)
- c.t.pendingRequests[r]--
+ pr := c.t.pendingRequests
+ pr[r]--
+ n := pr[r]
+ if n == 0 {
+ delete(pr, r)
+ }
+ if n < 0 {
+ panic(n)
+ }
c.updateRequests()
return true
}
panic("connection already associated with a torrent")
}
c.t = t
- t.conns[c] = struct{}{}
+ t.reconcileHandshakeStats(c)
}
import (
"io"
+ "net"
"sync"
"testing"
"time"
// Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) {
r, w := io.Pipe()
- var cl Client
+ cl := Client{
+ config: &ClientConfig{DownloadRateLimiter: unlimited},
+ }
cl.initLogger()
- c := cl.newConnection(nil)
+ c := cl.newConnection(nil, false)
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
c.t.setInfo(&metainfo.Info{
Pieces: make([]byte, metainfo.HashSize*3),
}
func BenchmarkConnectionMainReadLoop(b *testing.B) {
- cl := &Client{}
+ cl := &Client{
+ config: &ClientConfig{
+ DownloadRateLimiter: unlimited,
+ },
+ }
ts := &torrentStorage{}
t := &Torrent{
cl: cl,
}))
t.setChunkSize(defaultChunkSize)
t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
- r, w := io.Pipe()
- cn := &connection{
- t: t,
- r: r,
- }
+ r, w := net.Pipe()
+ cn := cl.newConnection(r, true)
+ cn.setTorrent(t)
mrlErr := make(chan error)
cl.mu.Lock()
go func() {
}
w.Close()
require.NoError(b, <-mrlErr)
- require.EqualValues(b, b.N, cn.stats.ChunksReadUseful)
+ require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
}
func TestConnectionReceiveBadChunkIndex(t *testing.T) {
t.Log(err)
}
}()
- client, err := torrent.NewClient(&torrent.Config{
- DataDir: filepath.Join(layout.BaseDir, "incomplete"),
- DisableTrackers: true,
- NoDHT: true,
- DisableTCP: true,
- DisableUTP: true,
- })
+ cfg := torrent.NewDefaultClientConfig()
+ cfg.DataDir = filepath.Join(layout.BaseDir, "incomplete")
+ cfg.DisableTrackers = true
+ cfg.NoDHT = true
+ cfg.DisableTCP = true
+ cfg.DisableUTP = true
+ client, err := torrent.NewClient(cfg)
require.NoError(t, err)
defer client.Close()
tt, err := client.AddTorrent(layout.Metainfo)
layout, err := newGreetingLayout()
require.NoError(t, err)
defer layout.Destroy()
- cfg := torrent.Config{
- DataDir: layout.Completed,
- DisableTrackers: true,
- NoDHT: true,
- Seed: true,
- ListenHost: torrent.LoopbackListenHost,
- }
- seeder, err := torrent.NewClient(&cfg)
+ cfg := torrent.NewDefaultClientConfig()
+ cfg.DataDir = layout.Completed
+ cfg.DisableTrackers = true
+ cfg.NoDHT = true
+ cfg.Seed = true
+ cfg.ListenHost = torrent.LoopbackListenHost
+ seeder, err := torrent.NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
testutil.ExportStatusWriter(seeder, "s")
<-seederTorrent.GotInfo()
seederTorrent.VerifyData()
}()
- leecher, err := torrent.NewClient(&torrent.Config{
- DisableTrackers: true,
- NoDHT: true,
- DisableTCP: true,
- DefaultStorage: storage.NewMMap(filepath.Join(layout.BaseDir, "download")),
- ListenHost: torrent.LoopbackListenHost,
- })
+ cfg = torrent.NewDefaultClientConfig()
+ cfg.DisableTrackers = true
+ cfg.NoDHT = true
+ cfg.DisableTCP = true
+ cfg.DefaultStorage = storage.NewMMap(filepath.Join(layout.BaseDir, "download"))
+ cfg.ListenHost = torrent.LoopbackListenHost
+ leecher, err := torrent.NewClient(cfg)
require.NoError(t, err)
testutil.ExportStatusWriter(leecher, "l")
defer leecher.Close()
// I could move a lot of these counters to their own file, but I suspect they
// may be attached to a Client someday.
var (
- unwantedChunksReceived = expvar.NewInt("chunksReceivedUnwanted")
- unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
- chunksReceived = expvar.NewInt("chunksReceived")
-
torrent = expvar.NewMap("torrent")
peersAddedBySource = expvar.NewMap("peersAddedBySource")
pieceHashedCorrect = expvar.NewInt("pieceHashedCorrect")
pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
- unsuccessfulDials = expvar.NewInt("dialSuccessful")
- successfulDials = expvar.NewInt("dialUnsuccessful")
-
peerExtensions = expvar.NewMap("peerExtensions")
completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
// Count of connections to peer with same client ID.
- connsToSelf = expvar.NewInt("connsToSelf")
- // Number of completed connections to a client we're already connected with.
- duplicateClientConns = expvar.NewInt("duplicateClientConns")
+ connsToSelf = expvar.NewInt("connsToSelf")
receivedKeepalives = expvar.NewInt("receivedKeepalives")
supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
postedKeepalives = expvar.NewInt("postedKeepalives")
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
+ "golang.org/x/time/rate"
)
type chunkSpec struct {
}
return ret
}
+
+var unlimited = rate.NewLimiter(rate.Inf, 0)
package torrent
import (
+ "reflect"
+ "strings"
"testing"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/iter"
+ "github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
)
assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)))
assert.Equal(t, []int{1}, skip.ToSortedSlice())
}
+
+func TestSpewConnStats(t *testing.T) {
+ s := spew.Sdump(ConnStats{})
+ t.Logf("\n%s", s)
+ lines := strings.Count(s, "\n")
+ assert.EqualValues(t, 2+reflect.ValueOf(ConnStats{}).NumField(), lines)
+}
"strconv"
"sync"
+ "github.com/anacrolix/missinggo/perf"
+
"github.com/bradfitz/iter"
)
ia: initialPayload,
cryptoProvides: cryptoProvides,
}
+ defer perf.ScopeTimerErr(&err)()
return h.Do()
}
--- /dev/null
+package torrent
+
+func strictCmp(same, less bool) cmper {
+ return func() (bool, bool) { return same, less }
+}
+
+type (
+ cmper func() (same, less bool)
+ multiLess struct {
+ ok bool
+ less bool
+ }
+)
+
+func (me *multiLess) Final() bool {
+ if !me.ok {
+ panic("undetermined")
+ }
+ return me.less
+}
+
+func (me *multiLess) FinalOk() (left, ok bool) {
+ return me.less, me.ok
+}
+
+func (me *multiLess) Next(f cmper) {
+ me.StrictNext(f())
+}
+
+func (me *multiLess) StrictNext(same, less bool) {
+ if me.ok {
+ return
+ }
+ if same {
+ return
+ }
+ me.ok, me.less = true, less
+}
+
+func (me *multiLess) NextBool(l, r bool) {
+ me.StrictNext(l == r, l)
+}
--- /dev/null
+package torrent
+
+import (
+ "net"
+ "testing"
+
+ "github.com/anacrolix/missinggo"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func testListenerNetwork(
+ t *testing.T,
+ listenFunc func(net, addr string) (net.Listener, error),
+ expectedNet, givenNet, addr string, validIp4 bool,
+) {
+ l, err := listenFunc(givenNet, addr)
+ require.NoError(t, err)
+ defer l.Close()
+ assert.EqualValues(t, expectedNet, l.Addr().Network())
+ ip := missinggo.AddrIP(l.Addr())
+ assert.Equal(t, validIp4, ip.To4() != nil, ip)
+}
+
+func listenUtpListener(net, addr string) (l net.Listener, err error) {
+ l, err = NewUtpSocket(net, addr)
+ return
+}
+
+func testAcceptedConnAddr(
+ t *testing.T,
+ network string, valid4 bool,
+ dial func(addr string) (net.Conn, error),
+ listen func() (net.Listener, error),
+) {
+ l, err := listen()
+ require.NoError(t, err)
+ defer l.Close()
+ done := make(chan struct{})
+ defer close(done)
+ go func() {
+ c, err := dial(l.Addr().String())
+ require.NoError(t, err)
+ <-done
+ c.Close()
+ }()
+ c, err := l.Accept()
+ require.NoError(t, err)
+ defer c.Close()
+ assert.EqualValues(t, network, c.RemoteAddr().Network())
+ assert.Equal(t, valid4, missinggo.AddrIP(c.RemoteAddr()).To4() != nil)
+}
+
+func listenClosure(rawListenFunc func(string, string) (net.Listener, error), network, addr string) func() (net.Listener, error) {
+ return func() (net.Listener, error) {
+ return rawListenFunc(network, addr)
+ }
+}
+
+func dialClosure(f func(net, addr string) (net.Conn, error), network string) func(addr string) (net.Conn, error) {
+ return func(addr string) (net.Conn, error) {
+ return f(network, addr)
+ }
+}
+
+func TestListenLocalhostNetwork(t *testing.T) {
+ testListenerNetwork(t, net.Listen, "tcp", "tcp", "0.0.0.0:0", false)
+ testListenerNetwork(t, net.Listen, "tcp", "tcp", "[::1]:0", false)
+ testListenerNetwork(t, listenUtpListener, "udp", "udp6", "[::1]:0", false)
+ testListenerNetwork(t, listenUtpListener, "udp", "udp6", "[::]:0", false)
+ testListenerNetwork(t, listenUtpListener, "udp", "udp4", "localhost:0", true)
+
+ testAcceptedConnAddr(t, "tcp", false, dialClosure(net.Dial, "tcp"), listenClosure(net.Listen, "tcp6", ":0"))
+}
"golang.org/x/net/proxy"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/perf"
)
type dialer interface {
if err != nil {
return
}
+ defer func() {
+ if err != nil {
+ l.Close()
+ }
+ }()
// If we don't need the proxy - then we should return default net.Dialer,
// otherwise, let's try to parse the proxyURL and return proxy.Dialer
if len(proxyURL) != 0 {
+ // TODO: The error should be propagated, as proxy may be in use for
+ // security or privacy reasons. Also just pass proxy.Dialer in from
+ // the Config.
if dialer, err := getProxyDialer(proxyURL); err == nil {
- return tcpSocket{l, dialer}, nil
+ return tcpSocket{l, func(ctx context.Context, addr string) (conn net.Conn, err error) {
+ defer perf.ScopeTimerErr(&err)()
+ return dialer.Dial(network, addr)
+ }}, nil
}
}
-
- return tcpSocket{l, nil}, nil
+ dialer := net.Dialer{}
+ return tcpSocket{l, func(ctx context.Context, addr string) (conn net.Conn, err error) {
+ defer perf.ScopeTimerErr(&err)()
+ return dialer.DialContext(ctx, network, addr)
+ }}, nil
}
type tcpSocket struct {
net.Listener
- d proxy.Dialer
+ d func(ctx context.Context, addr string) (net.Conn, error)
}
func (me tcpSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
- if me.d != nil {
- return me.d.Dial(me.Addr().Network(), addr)
- }
-
- return net.Dial(me.Addr().Network(), addr)
+ return me.d(ctx, addr)
}
func setPort(addr string, port int) string {
d proxy.Dialer
}
-func (me utpSocketSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
+func (me utpSocketSocket) dial(ctx context.Context, addr string) (conn net.Conn, err error) {
+ defer perf.ScopeTimerErr(&err)()
if me.d != nil {
return me.d.Dial(me.network, addr)
}
// different pieces.
connPieceInclinationPool sync.Pool
// Torrent-level statistics.
- stats TorrentStats
+ stats ConnStats
// Count of each request across active connections.
pendingRequests map[request]int
func (t *Torrent) addPeer(p Peer) {
cl := t.cl
peersAddedBySource.Add(string(p.Source), 1)
+ if t.closed.IsSet() {
+ return
+ }
if cl.badPeerIPPort(p.IP, p.Port) {
torrent.Add("peers not added because of bad addr", 1)
return
}
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
- tr := perf.NewTimer()
-
+ defer perf.ScopeTimerErr(&err)()
n, err := t.pieces[piece].Storage().WriteAt(data, begin)
if err == nil && n != len(data) {
err = io.ErrShortWrite
}
- if err == nil {
- tr.Mark("write chunk")
- }
return
}
heap.Init(&wcs)
for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*connection)
- if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
+ if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
return c
}
// If the connection is in the worst half of the established
}
_, ret = t.conns[c]
delete(t.conns, c)
+ torrent.Add("deleted connections", 1)
c.deleteAllRequests()
if len(t.conns) == 0 {
t.assertNoPendingRequests()
}
func (t *Torrent) assertNoPendingRequests() {
- for _, num := range t.pendingRequests {
- if num != 0 {
- panic(num)
- }
+ if len(t.pendingRequests) != 0 {
+ panic(t.pendingRequests)
}
}
if _url == "" {
return
}
- u, _ := url.Parse(_url)
+ u, err := url.Parse(_url)
+ if err != nil {
+ log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
+ }
if u.Scheme == "udp" {
u.Scheme = "udp4"
t.startScrapingTracker(u.String())
// The following are vaguely described in BEP 3.
Left: t.bytesLeftAnnounce(),
- Uploaded: t.stats.BytesWrittenData,
+ Uploaded: t.stats.BytesWrittenData.Int64(),
// There's no mention of wasted or unwanted download in the BEP.
- Downloaded: t.stats.BytesReadUsefulData,
+ Downloaded: t.stats.BytesReadUsefulData.Int64(),
}
}
}
func (t *Torrent) Stats() TorrentStats {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.mu.RLock()
+ defer t.cl.mu.RUnlock()
return t.statsLocked()
}
-func (t *Torrent) statsLocked() TorrentStats {
- t.stats.ActivePeers = len(t.conns)
- t.stats.HalfOpenPeers = len(t.halfOpen)
- t.stats.PendingPeers = t.peers.Len()
- t.stats.TotalPeers = t.numTotalPeers()
- t.stats.ConnectedSeeders = 0
+func (t *Torrent) statsLocked() (ret TorrentStats) {
+ ret.ActivePeers = len(t.conns)
+ ret.HalfOpenPeers = len(t.halfOpen)
+ ret.PendingPeers = t.peers.Len()
+ ret.TotalPeers = t.numTotalPeers()
+ ret.ConnectedSeeders = 0
for c := range t.conns {
if all, ok := c.peerHasAllPieces(); all && ok {
- t.stats.ConnectedSeeders++
+ ret.ConnectedSeeders++
}
}
- return t.stats
+ ret.ConnStats = t.stats.Copy()
+ return
}
// The total number of peers in the torrent.
// Reconcile bytes transferred before connection was associated with a
// torrent.
func (t *Torrent) reconcileHandshakeStats(c *connection) {
- t.stats.wroteBytes(c.stats.BytesWritten)
- t.stats.readBytes(c.stats.BytesRead)
+ if c.stats != (ConnStats{
+ // Handshakes should only increment these fields:
+ BytesWritten: c.stats.BytesWritten,
+ BytesRead: c.stats.BytesRead,
+ }) {
+ panic("bad stats")
+ }
+ c.postHandshakeStats(func(cs *ConnStats) {
+ cs.BytesRead.Add(c.stats.BytesRead.Int64())
+ cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
+ })
+ c.reconciledHandshakeStats = true
}
// Returns true if the connection is added.
-func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
+func (t *Torrent) addConnection(c *connection) (err error) {
+ defer func() {
+ if err == nil {
+ torrent.Add("added connections", 1)
+ }
+ }()
if t.closed.IsSet() {
- return false
- }
- if !t.wantConns() {
- return false
+ return errors.New("torrent closed")
}
for c0 := range t.conns {
- if c.PeerID == c0.PeerID {
- // Already connected to a client with that ID.
- duplicateClientConns.Add(1)
- lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
- // Retain the connection from initiated from lower peer ID to
- // higher.
- if outgoing == lower {
- // Close the other one.
- c0.Close()
- // TODO: Is it safe to delete from the map while we're
- // iterating over it?
- t.deleteConnection(c0)
- } else {
- // Abandon this one.
- return false
- }
+ if c.PeerID != c0.PeerID {
+ continue
+ }
+ if !t.cl.config.dropDuplicatePeerIds {
+ continue
+ }
+ if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
+ c0.Close()
+ t.deleteConnection(c0)
+ } else {
+ return errors.New("existing connection preferred")
}
}
+ if !t.wantConns() {
+ return errors.New("don't want conns")
+ }
if len(t.conns) >= t.maxEstablishedConns {
c := t.worstBadConn()
- if c == nil {
- return false
- }
if t.cl.config.Debug && missinggo.CryHeard() {
log.Printf("%s: dropping connection to make room for new one:\n %v", t, c)
}
if len(t.conns) >= t.maxEstablishedConns {
panic(len(t.conns))
}
- c.setTorrent(t)
- return true
+ t.conns[c] = struct{}{}
+ return nil
}
func (t *Torrent) wantConns() bool {
}
if correct {
if len(touchers) != 0 {
- t.stats.PiecesDirtiedGood++
+ // Don't increment stats above connection-level for every involved
+ // connection.
+ t.allStats((*ConnStats).incrementPiecesDirtiedGood)
}
for _, c := range touchers {
- c.stats.PiecesDirtiedGood++
+ c.stats.incrementPiecesDirtiedGood()
}
err := p.Storage().MarkComplete()
if err != nil {
}
} else {
if len(touchers) != 0 {
- t.stats.PiecesDirtiedBad++
+ // Don't increment stats above connection-level for every involved
+ // connection.
+ t.allStats((*ConnStats).incrementPiecesDirtiedBad)
for _, c := range touchers {
// Y u do dis peer?!
- c.stats.PiecesDirtiedBad++
+ c.stats.incrementPiecesDirtiedBad()
}
slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug {
return
}())
}
+
+// All stats that include this Torrent. Useful when we want to increment
+// ConnStats but not for every connection.
+func (t *Torrent) allStats(f func(*ConnStats)) {
+ f(&t.stats)
+ f(&t.cl.stats)
+}
numPieces = 13410
pieceLength = 256 << 10
)
- cl := &Client{}
+ cl := &Client{config: &ClientConfig{}}
cl.initLogger()
t := cl.newTorrent(metainfo.Hash{}, nil)
require.NoError(b, t.setInfo(&metainfo.Info{
// Check that a torrent containing zero-length file(s) will start, and that
// they're created in the filesystem. The client storage is assumed to be
// file-based on the native filesystem based.
-func testEmptyFilesAndZeroPieceLength(t *testing.T, cfg *Config) {
+func testEmptyFilesAndZeroPieceLength(t *testing.T, cfg *ClientConfig) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
func TestPieceHashFailed(t *testing.T) {
mi := testutil.GreetingMetaInfo()
cl := new(Client)
+ cl.config = &ClientConfig{}
cl.initLogger()
tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
tt.setChunkSize(2)
import "container/heap"
func worseConn(l, r *connection) bool {
- if l.useful() != r.useful() {
- return r.useful()
- }
- if !l.lastHelpful().Equal(r.lastHelpful()) {
- return l.lastHelpful().Before(r.lastHelpful())
- }
- return l.completedHandshake.Before(r.completedHandshake)
+ var ml multiLess
+ ml.NextBool(!l.useful(), !r.useful())
+ ml.StrictNext(
+ l.lastHelpful().Equal(r.lastHelpful()),
+ l.lastHelpful().Before(r.lastHelpful()))
+ ml.StrictNext(
+ l.completedHandshake.Equal(r.completedHandshake),
+ l.completedHandshake.Before(r.completedHandshake))
+ return ml.Final()
}
type worseConnSlice struct {