From a7dddd9be60f11ba03fcbd9e44a33afe0b80f619 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 18 Mar 2015 18:28:13 +1100 Subject: [PATCH] Rewrite handshaking and connection management --- client.go | 471 +++++++++++++++++++++++++++++++++--------------- connection.go | 25 +-- torrent.go | 22 ++- torrent_test.go | 2 +- 4 files changed, 352 insertions(+), 168 deletions(-) diff --git a/client.go b/client.go index ff335e40..ee238db9 100644 --- a/client.go +++ b/client.go @@ -75,6 +75,12 @@ var ( acceptedConns = expvar.NewInt("acceptedConns") inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked") peerExtensions = expvar.NewMap("peerExtensions") + // Count of connections to peer with same client ID. + connsToSelf = expvar.NewInt("connsToSelf") + // Number of completed connections to a client we're already connected with. + duplicateClientConns = expvar.NewInt("duplicateClientConns") + receivedMessageTypes = expvar.NewMap("receivedMessageTypes") + supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages") ) const ( @@ -94,7 +100,8 @@ const ( // Limit how long handshake can take. This is to reduce the lingering // impact of a few bad apples. 4s loses 1% of successful handshakes that // are obtained with 60s timeout, and 5% of unsuccessful handshakes. - handshakeTimeout = 45 * time.Second + btHandshakeTimeout = 4 * time.Second + handshakesTimeout = 20 * time.Second pruneInterval = 10 * time.Second ) @@ -143,8 +150,6 @@ type Client struct { event sync.Cond quit chan struct{} - handshaking int - torrents map[InfoHash]*torrent } @@ -218,7 +223,6 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintln(w, "Not listening!") } fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID) - fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking) if cl.dHT != nil { dhtStats := cl.dHT.Stats() fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes) @@ -277,7 +281,7 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err } // Sets priorities to download from the given offset. Returns when the piece -// at the given offset can be read. Returns the number of bytes that +// at the given offset can be read. Returns the number of bytes that are // immediately available from the offset. func (cl *Client) prepareRead(t *torrent, off int64) (n int64) { index := int(off / int64(t.usualPieceSize())) @@ -328,6 +332,8 @@ again: panic(fmt.Sprintf("can't read from %T", d)) } +// Calculates the number of pieces to set to Readahead priority, after the +// Now, and Next pieces. func readaheadPieces(readahead, pieceLength int64) int { return int((readahead+pieceLength-1)/pieceLength - 1) } @@ -340,7 +346,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { return } cl.raisePiecePriority(t, index, piecePriorityNext) - for i := 0; i < readaheadPieces(5*1024*1024, t.Info.PieceLength); i++ { + for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) { index++ if index >= t.numPieces() { break @@ -366,6 +372,7 @@ func (t *torrent) connPendPiece(c *connection, piece int) { func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) { if t.Pieces[piece].Priority < priority { + cl.event.Broadcast() cl.prioritizePiece(t, piece, priority) } } @@ -624,12 +631,35 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) { conn.Close() continue } - go func() { - if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil { - log.Print(err) - } - }() + go cl.incomingConnection(conn, utp) + } +} + +func (cl *Client) incomingConnection(nc net.Conn, utp bool) { + defer nc.Close() + if tc, ok := nc.(*net.TCPConn); ok { + tc.SetLinger(0) + } + c := newConnection() + c.conn = nc + c.rw = nc + c.Discovery = peerSourceIncoming + c.uTP = utp + err := cl.runReceivedConn(c) + if err != nil { + log.Print(err) + } +} + +func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) { + cl.mu.Lock() + defer cl.mu.Unlock() + t, ok := cl.torrents[ih] + if !ok { + return } + T = Torrent{cl, t} + return } func (me *Client) torrent(ih InfoHash) *torrent { @@ -637,12 +667,12 @@ func (me *Client) torrent(ih InfoHash) *torrent { } type dialResult struct { - net.Conn - UTP bool + Conn net.Conn + UTP bool } -func doDial(dial func(addr string) (net.Conn, error), ch chan dialResult, utp bool, addr string) { - conn, err := dial(addr) +func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) { + conn, err := dial(addr, t) if err != nil { if conn != nil { conn.Close() @@ -693,70 +723,142 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r) return } - dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers)) t.HalfOpen[addr] = struct{}{} - go func() { - // Binding to the listen address and dialing via net.Dialer gives - // "address in use" error. It seems it's not possible to dial out from - // this address so that peers associate our local address with our - // listen address. - - // Initiate connections via TCP and UTP simultaneously. Use the first - // one that succeeds. - left := 0 - if !me.disableUTP { - left++ - } - if !me.disableTCP { - left++ - } - resCh := make(chan dialResult, left) - if !me.disableUTP { - go doDial(func(addr string) (net.Conn, error) { - return me.utpSock.DialTimeout(addr, dialTimeout) - }, resCh, true, addr) - } - if !me.disableTCP { - go doDial(func(addr string) (net.Conn, error) { - // time.Sleep(time.Second) // Give uTP a bit of a head start. - return net.DialTimeout("tcp", addr, dialTimeout) - }, resCh, false, addr) - } - var res dialResult - for ; left > 0 && res.Conn == nil; left-- { - res = <-resCh - } - // Whether or not the connection attempt succeeds, the half open - // counter should be decremented, and new connection attempts made. + go me.outgoingConnection(t, addr, peer.Source) +} + +func (me *Client) dialTimeout(t *torrent) time.Duration { + return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers)) +} + +func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) { + c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t)) + if err == nil { + c.(*net.TCPConn).SetLinger(0) + } + return +} + +func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) { + return me.utpSock.DialTimeout(addr, me.dialTimeout(t)) +} + +// Returns a connection over UTP or TCP. +func (me *Client) dial(addr string, t *torrent) (conn net.Conn, utp bool) { + // Initiate connections via TCP and UTP simultaneously. Use the first one + // that succeeds. + left := 0 + if !me.disableUTP { + left++ + } + if !me.disableTCP { + left++ + } + resCh := make(chan dialResult, left) + if !me.disableUTP { + go doDial(me.dialUTP, resCh, true, addr, t) + } + if !me.disableTCP { + go doDial(me.dialTCP, resCh, false, addr, t) + } + var res dialResult + // Wait for a successful connection. + for ; left > 0 && res.Conn == nil; left-- { + res = <-resCh + } + if left > 0 { + // There are still incompleted dials. go func() { - me.mu.Lock() - defer me.mu.Unlock() - if _, ok := t.HalfOpen[addr]; !ok { - panic("invariant broken") + for ; left > 0; left-- { + conn := (<-resCh).Conn + if conn != nil { + conn.Close() + } } - delete(t.HalfOpen, addr) - me.openNewConns(t) }() - if res.Conn == nil { + } + conn = res.Conn + utp = res.UTP + return +} + +func (me *Client) noLongerHalfOpen(t *torrent, addr string) { + if _, ok := t.HalfOpen[addr]; !ok { + panic("invariant broken") + } + delete(t.HalfOpen, addr) + me.openNewConns(t) +} + +// Returns nil connection and nil error if no connection could be established +// for valid reasons. +func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) { + handshakesConnection := func(nc net.Conn, encrypted, utp bool) (c *connection, err error) { + c = newConnection() + c.conn = nc + c.rw = nc + c.encrypted = encrypted + c.uTP = utp + err = nc.SetDeadline(time.Now().Add(handshakesTimeout)) + if err != nil { return } - if left > 0 { - go func() { - for ; left > 0; left-- { - conn := (<-resCh).Conn - if conn != nil { - conn.Close() - } - } - }() + ok, err := me.initiateHandshakes(c, t) + if !ok { + c = nil } + return + } + nc, utp := me.dial(addr, t) + if nc == nil { + return + } + c, err = handshakesConnection(nc, true, utp) + if err != nil { + nc.Close() + return + } else if c != nil { + return + } + nc.Close() + if utp { + nc, err = me.dialUTP(addr, t) + } else { + nc, err = me.dialTCP(addr, t) + } + if err != nil { + err = fmt.Errorf("error dialing for unencrypted connection: %s", err) + return + } + c, err = handshakesConnection(nc, false, utp) + if err != nil { + nc.Close() + } + return +} - // log.Printf("connected to %s", conn.RemoteAddr()) - err := me.runConnection(res.Conn, t, peer.Source, res.UTP) - if err != nil { - log.Print(err) - } - }() +// Called to dial out and run a connection. The addr we're given is already +// considered half-open. +func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) { + c, err := me.establishOutgoingConn(t, addr) + me.mu.Lock() + defer me.mu.Unlock() + // Don't release lock between here and addConnection, unless it's for + // failure. + me.noLongerHalfOpen(t, addr) + if err != nil { + log.Print(err) + return + } + if c == nil { + return + } + defer c.Close() + c.Discovery = ps + err = me.runInitiatedHandshookConn(c, t) + if err != nil { + log.Print(err) + } } // The port number for incoming peer connections. 0 if the client isn't @@ -843,7 +945,7 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee // Wait until writes complete before returning from handshake. err = <-writeDone if err != nil { - err = fmt.Errorf("error writing during handshake: %s", err) + err = fmt.Errorf("error writing: %s", err) } }() @@ -889,94 +991,173 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee // Wraps a raw connection and provides the interface we want for using the // connection in the message loop. -type peerConn struct { - net.Conn +type deadlineReader struct { + nc net.Conn + r io.Reader } -func (pc peerConn) Read(b []byte) (n int, err error) { +func (me deadlineReader) Read(b []byte) (n int, err error) { // Keep-alives should be received every 2 mins. Give a bit of gracetime. - err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second)) + err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second)) if err != nil { err = fmt.Errorf("error setting read deadline: %s", err) } - n, err = pc.Conn.Read(b) + n, err = me.r.Read(b) // Convert common errors into io.EOF. + // if err != nil { + // if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET { + // err = io.EOF + // } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // if n != 0 { + // panic(n) + // } + // err = io.EOF + // } + // } + return +} + +type readWriter struct { + io.Reader + io.Writer +} + +func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) { + var protocol [len(pp.Protocol)]byte + _, err = io.ReadFull(rw, protocol[:]) if err != nil { - if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET { - err = io.EOF - } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - if n != 0 { - panic(n) - } - err = io.EOF + return + } + ret = readWriter{ + io.MultiReader(bytes.NewReader(protocol[:]), rw), + rw, + } + if string(protocol[:]) == pp.Protocol { + return + } + encrypted = true + ret, err = mse.ReceiveHandshake(ret, skeys) + return +} + +func (cl *Client) receiveSkeys() (ret [][]byte) { + for ih := range cl.torrents { + ret = append(ret, ih[:]) + } + return +} + +func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) { + if c.encrypted { + c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil) + if err != nil { + return } } + ih, ok, err := me.connBTHandshake(c, &t.InfoHash) + if ih != t.InfoHash { + ok = false + } return } -func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) { - if tcpConn, ok := sock.(*net.TCPConn); ok { - tcpConn.SetLinger(0) +func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) { + cl.mu.Lock() + skeys := cl.receiveSkeys() + cl.mu.Unlock() + // TODO: Filter unmatching skey errors. + c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys) + if err != nil { + if err == mse.ErrNoSecretKeyMatch { + err = nil + } + return } - defer sock.Close() - err = sock.SetDeadline(time.Now().Add(handshakeTimeout)) + ih, ok, err := cl.connBTHandshake(c, nil) if err != nil { - err = fmt.Errorf("couldn't set handshake deadline: %s", err) + fmt.Errorf("error during bt handshake: %s", err) return } - me.mu.Lock() - me.handshaking++ - me.mu.Unlock() - var rw io.ReadWriter = sock - if torrent == nil { - rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) { - for ih := range me.torrents { - ret = append(ret, ih[:]) - } - return - }()) - } else { - rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:]) + if !ok { + return } - if err != nil { - err = fmt.Errorf("error during MSE handshake: %s", err) + cl.mu.Lock() + t = cl.torrents[ih] + cl.mu.Unlock() + return +} + +// Returns !ok if handshake failed for valid reasons. +func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) { + res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes) + if err != nil || !ok { return } - hsRes, ok, err := handshake(rw, func() *InfoHash { - if torrent == nil { - return nil - } else { - return &torrent.InfoHash - } - }(), me.peerID, me.extensionBytes) - me.mu.Lock() - defer me.mu.Unlock() - if me.handshaking == 0 { - panic("handshake count invariant is broken") + ret = res.InfoHash + c.PeerExtensionBytes = res.peerExtensionBytes + c.PeerID = res.peerID + c.completedHandshake = time.Now() + return +} + +func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) { + if c.PeerID == cl.peerID { + // Only if we initiated the connection is the remote address a + // listen addr for a doppleganger. + connsToSelf.Add(1) + addr := c.conn.RemoteAddr().String() + cl.dopplegangerAddrs[addr] = struct{}{} + return } - me.handshaking-- + return cl.runHandshookConn(c, t) +} + +func (cl *Client) runReceivedConn(c *connection) (err error) { + err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout)) if err != nil { - err = fmt.Errorf("error during handshake: %s", err) return } - if !ok { + t, err := cl.receiveHandshakes(c) + if err != nil { + logonce.Stderr.Printf("error receiving handshakes: %s", err) + err = nil return } - if hsRes.peerID == me.peerID { + if t == nil { return } - torrent = me.torrent(hsRes.InfoHash) - if torrent == nil { + cl.mu.Lock() + defer cl.mu.Unlock() + if c.PeerID == cl.peerID { return } - sock.SetWriteDeadline(time.Time{}) - sock = peerConn{sock} - conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw) - defer conn.Close() - conn.Discovery = discovery - if !me.addConnection(torrent, conn) { + return cl.runHandshookConn(c, t) +} + +func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) { + c.conn.SetWriteDeadline(time.Time{}) + c.rw = readWriter{ + deadlineReader{c.conn, c.rw}, + c.rw, + } + if !cl.addConnection(t, c) { return } + defer cl.dropConnection(t, c) + go c.writer() + go c.writeOptimizer(time.Minute) + cl.sendInitialMessages(c, t) + if t.haveInfo() { + t.initRequestOrdering(c) + } + err = cl.connectionLoop(t, c) + if err != nil { + err = fmt.Errorf("error during connection loop: %s", err) + } + return +} + +func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) { if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() { conn.Post(pp.Message{ Type: pp.Extended, @@ -1035,24 +1216,19 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS Port: uint16(AddrPort(me.dHT.LocalAddr())), }) } - if torrent.haveInfo() { - torrent.initRequestOrdering(conn) - } - err = me.connectionLoop(torrent, conn) - if err != nil { - err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err) - } - me.dropConnection(torrent, conn) - return } +// Randomizes the piece order for this connection. Every connection will be +// given a different ordering. Having it stored per connection saves having to +// randomize during request filling, and constantly recalculate the ordering +// based on piece priorities. func (t *torrent) initRequestOrdering(c *connection) { if c.pieceRequestOrder != nil || c.piecePriorities != nil { panic("double init of request ordering") } c.piecePriorities = mathRand.Perm(t.numPieces()) c.pieceRequestOrder = pieceordering.New() - for i := 0; i < t.numPieces(); i++ { + for i := range iter.N(t.Info.NumPieces()) { if !c.PeerHasPiece(i) { continue } @@ -1527,7 +1703,8 @@ func (t *torrent) needData() bool { // TODO: I'm sure there's something here to do with seeding. func (t *torrent) badConn(c *connection) bool { - if time.Now().Sub(c.completedHandshake) < 30*time.Second { + // A 30 second grace for initial messages to go through. + if time.Since(c.completedHandshake) < 30*time.Second { return false } if !t.haveInfo() { @@ -1546,7 +1723,7 @@ func (t *torrent) numGoodConns() (num int) { } func (me *Client) wantConns(t *torrent) bool { - if !t.needData() && me.noUpload { + if me.noUpload && !t.needData() { return false } if t.numGoodConns() >= socketsPerTorrent { @@ -1596,12 +1773,12 @@ func (me *Client) addPeers(t *torrent, peers []Peer) { me.openNewConns(t) } -func (cl *Client) torrentFileCachePath(ih InfoHash) string { +func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string { return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent") } func (cl *Client) saveTorrentFile(t *torrent) error { - path := cl.torrentFileCachePath(t.InfoHash) + path := cl.cachedMetaInfoFilename(t.InfoHash) os.MkdirAll(filepath.Dir(path), 0777) f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { @@ -1682,7 +1859,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e // Prepare a Torrent without any attachment to a Client. That means we can // initialize fields all fields that don't require the Client without locking // it. -func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) { +func newTorrent(ih InfoHash) (t *torrent, err error) { t = &torrent{ InfoHash: ih, Peers: make(map[peersKey]Peer), @@ -1696,7 +1873,6 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor } t.wantPeers.L = &t.stateMu t.GotMetainfo = t.gotMetainfo - t.addTrackers(announceList) return } @@ -1763,7 +1939,9 @@ func (t Torrent) NumPieces() int { } func (t Torrent) Drop() { + t.cl.mu.Lock() t.cl.dropTorrent(t.InfoHash) + t.cl.mu.Unlock() } type File struct { @@ -1959,12 +2137,13 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) { return me.cl.torrentReadAt(me.torrent, off, p) } -// Returns nil metainfo if it isn't in the cache. +// Returns nil metainfo if it isn't in the cache. Checks that the retrieved +// metainfo has the correct infohash. func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { if cl.config.DisableMetainfoCache { return } - f, err := os.Open(cl.torrentFileCachePath(ih)) + f, err := os.Open(cl.cachedMetaInfoFilename(ih)) if err != nil { if os.IsNotExist(err) { err = nil diff --git a/connection.go b/connection.go index 75a1dba3..16e9db05 100644 --- a/connection.go +++ b/connection.go @@ -30,6 +30,7 @@ const ( type connection struct { conn net.Conn rw io.ReadWriter // The real slim shady + encrypted bool Discovery peerSource uTP bool closing chan struct{} @@ -37,7 +38,9 @@ type connection struct { post chan pp.Message writeCh chan []byte - piecePriorities []int + // The connections preferred order to download pieces. + piecePriorities []int + // The piece request order based on piece priorities. pieceRequestOrder *pieceordering.Instance UnwantedChunksReceived int @@ -69,26 +72,16 @@ type connection struct { PeerClientName string } -func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) { +func newConnection() (c *connection) { c = &connection{ - conn: sock, - rw: rw, - uTP: uTP, - - Choked: true, - PeerChoked: true, - PeerMaxRequests: 250, - PeerExtensionBytes: peb, - PeerID: peerID, + Choked: true, + PeerChoked: true, + PeerMaxRequests: 250, closing: make(chan struct{}), writeCh: make(chan []byte), post: make(chan pp.Message), - - completedHandshake: time.Now(), } - go c.writer() - go c.writeOptimizer(time.Minute) return } @@ -100,6 +93,8 @@ func (cn *connection) localAddr() net.Addr { return cn.conn.LocalAddr() } +// Adjust piece position in the request order for this connection based on the +// given piece priority. func (cn *connection) pendPiece(piece int, priority piecePriority) { if priority == piecePriorityNone { cn.pieceRequestOrder.DeletePiece(piece) diff --git a/torrent.go b/torrent.go index 9818f131..d9b4b0f5 100644 --- a/torrent.go +++ b/torrent.go @@ -66,14 +66,18 @@ type torrent struct { InfoHash InfoHash Pieces []*piece - length int64 + // Total length of the torrent in bytes. Stored because it's not O(1) to + // get this from the info dict. + length int64 data StatefulData + // The info dict. Nil if we don't have it. Info *metainfo.Info - // Active peer connections. + // Active peer connections, running message stream loops. Conns []*connection - // Set of addrs to which we're attempting to connect. + // Set of addrs to which we're attempting to connect. Connections are + // half-open until all handshakes are completed. HalfOpen map[string]struct{} // Reserve of peers to connect to. A peer can be both here and in the @@ -84,11 +88,16 @@ type torrent struct { // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list metainfo key. - Trackers [][]tracker.Client - DisplayName string - MetaData []byte + Trackers [][]tracker.Client + // Name used if the info name isn't available. + DisplayName string + // The bencoded bytes of the info dict. + MetaData []byte + // Each element corresponds to the 16KiB metadata pieces. If true, we have + // received that piece. metadataHave []bool + // Closed when .Info is set. gotMetainfo chan struct{} GotMetainfo <-chan struct{} @@ -195,6 +204,7 @@ func (t *torrent) numConnsUnchoked() (num int) { return } +// There's a connection to that address already. func (t *torrent) addrActive(addr string) bool { if _, ok := t.HalfOpen[addr]; ok { return true diff --git a/torrent_test.go b/torrent_test.go index cd73b62a..33c40d6b 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -45,7 +45,7 @@ func TestTorrentRequest(t *testing.T) { } func TestTorrentDoubleClose(t *testing.T) { - tt, err := newTorrent(InfoHash{}, nil, 0) + tt, err := newTorrent(InfoHash{}) tt.pruneTimer = time.NewTimer(0) if err != nil { t.Fatal(err) -- 2.48.1