23 "github.com/anacrolix/missinggo"
24 "github.com/anacrolix/missinggo/pproffd"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/utp"
29 "github.com/anacrolix/torrent/bencode"
30 "github.com/anacrolix/torrent/dht"
31 "github.com/anacrolix/torrent/iplist"
32 "github.com/anacrolix/torrent/metainfo"
33 "github.com/anacrolix/torrent/mse"
34 pp "github.com/anacrolix/torrent/peer_protocol"
35 "github.com/anacrolix/torrent/storage"
36 "github.com/anacrolix/torrent/tracker"
39 // Currently doesn't really queue, but should in the future.
40 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
41 piece := &t.pieces[pieceIndex]
42 if piece.QueuedForHash {
45 piece.QueuedForHash = true
46 t.publishPieceChange(pieceIndex)
47 go cl.verifyPiece(t, pieceIndex)
50 // Queue a piece check if one isn't already queued, and the piece has never
51 // been checked before.
52 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
54 if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
57 cl.queuePieceCheck(t, piece)
60 // Clients contain zero or more Torrents. A Client manages a blocklist, the
61 // TCP/UDP protocol ports, and DHT as desired.
65 listeners []net.Listener
68 ipBlockList iplist.Ranger
69 bannedTorrents map[metainfo.Hash]struct{}
71 pruneTimer *time.Timer
72 extensionBytes peerExtensionBytes
73 // Set of addresses that have our client ID. This intentionally will
74 // include ourselves if we end up trying to connect to our own address
75 // through legitimate channels.
76 dopplegangerAddrs map[string]struct{}
78 defaultStorage storage.I
82 closed missinggo.Event
84 torrents map[metainfo.Hash]*Torrent
87 func (cl *Client) IPBlockList() iplist.Ranger {
93 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
98 cl.dHT.SetIPBlockList(list)
102 func (cl *Client) PeerID() string {
103 return string(cl.peerID[:])
106 func (cl *Client) ListenAddr() (addr net.Addr) {
107 for _, l := range cl.listeners {
114 type hashSorter struct {
115 Hashes []metainfo.Hash
118 func (hs hashSorter) Len() int {
119 return len(hs.Hashes)
122 func (hs hashSorter) Less(a, b int) bool {
123 return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
126 func (hs hashSorter) Swap(a, b int) {
127 hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
130 func (cl *Client) sortedTorrents() (ret []*Torrent) {
132 for ih := range cl.torrents {
133 hs.Hashes = append(hs.Hashes, ih)
136 for _, ih := range hs.Hashes {
137 ret = append(ret, cl.torrent(ih))
142 // Writes out a human readable status of the client, such as for writing to a
144 func (cl *Client) WriteStatus(_w io.Writer) {
146 defer cl.mu.RUnlock()
147 w := bufio.NewWriter(_w)
149 if addr := cl.ListenAddr(); addr != nil {
150 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
152 fmt.Fprintln(w, "Not listening!")
154 fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
156 dhtStats := cl.dHT.Stats()
157 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
158 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
159 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
160 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
161 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
163 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
165 for _, t := range cl.sortedTorrents() {
167 fmt.Fprint(w, "<unknown name>")
169 fmt.Fprint(w, t.name())
173 fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
175 w.WriteString("<missing metainfo>")
183 func (cl *Client) configDir() string {
184 if cl.config.ConfigDir == "" {
185 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
187 return cl.config.ConfigDir
190 // The directory where the Client expects to find and store configuration
191 // data. Defaults to $HOME/.config/torrent.
192 func (cl *Client) ConfigDir() string {
193 return cl.configDir()
196 // Creates a new client.
197 func NewClient(cfg *Config) (cl *Client, err error) {
208 halfOpenLimit: socketsPerTorrent,
210 defaultStorage: cfg.DefaultStorage,
211 dopplegangerAddrs: make(map[string]struct{}),
212 torrents: make(map[metainfo.Hash]*Torrent),
214 missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
216 if cl.defaultStorage == nil {
217 cl.defaultStorage = storage.NewFile(cfg.DataDir)
219 if cfg.IPBlocklist != nil {
220 cl.ipBlockList = cfg.IPBlocklist
223 if cfg.PeerID != "" {
224 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
226 o := copy(cl.peerID[:], bep20)
227 _, err = rand.Read(cl.peerID[o:])
229 panic("error generating peer id")
233 // Returns the laddr string to listen on for the next Listen call.
234 listenAddr := func() string {
235 if addr := cl.ListenAddr(); addr != nil {
238 if cfg.ListenAddr == "" {
241 return cfg.ListenAddr
243 if !cl.config.DisableTCP {
245 l, err = net.Listen(func() string {
246 if cl.config.DisableIPv6 {
255 cl.listeners = append(cl.listeners, l)
256 go cl.acceptConnections(l, false)
258 if !cl.config.DisableUTP {
259 cl.utpSock, err = utp.NewSocket(func() string {
260 if cl.config.DisableIPv6 {
269 cl.listeners = append(cl.listeners, cl.utpSock)
270 go cl.acceptConnections(cl.utpSock, true)
273 dhtCfg := cfg.DHTConfig
274 if dhtCfg.IPBlocklist == nil {
275 dhtCfg.IPBlocklist = cl.ipBlockList
277 if dhtCfg.Addr == "" {
278 dhtCfg.Addr = listenAddr()
280 if dhtCfg.Conn == nil && cl.utpSock != nil {
281 dhtCfg.Conn = cl.utpSock
283 cl.dHT, err = dht.NewServer(&dhtCfg)
292 // Stops the client. All connections to peers are closed and all activity will
294 func (cl *Client) Close() {
301 for _, l := range cl.listeners {
304 for _, t := range cl.torrents {
310 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
312 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
313 if cl.ipBlockList == nil {
317 // If blocklists are enabled, then block non-IPv4 addresses, because
318 // blocklists do not yet support IPv6.
320 if missinggo.CryHeard() {
321 log.Printf("blocking non-IPv4 address: %s", ip)
327 return cl.ipBlockList.Lookup(ip4)
330 func (cl *Client) waitAccept() {
334 for _, t := range cl.torrents {
339 if cl.closed.IsSet() {
346 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
349 conn, err := l.Accept()
350 conn = pproffd.WrapNetConn(conn)
351 if cl.closed.IsSet() {
359 // I think something harsher should happen here? Our accept
360 // routine just fucked off.
369 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
370 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
372 if blocked || doppleganger {
374 // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
378 go cl.incomingConnection(conn, utp)
382 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
384 if tc, ok := nc.(*net.TCPConn); ok {
390 c.Discovery = peerSourceIncoming
392 err := cl.runReceivedConn(c)
398 // Returns a handle to the given torrent, if it's present in the client.
399 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
402 t, ok = cl.torrents[ih]
406 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
407 return cl.torrents[ih]
410 type dialResult struct {
415 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
416 conn, err := dial(addr, t)
421 conn = nil // Pedantic
423 ch <- dialResult{conn, utp}
425 successfulDials.Add(1)
428 unsuccessfulDials.Add(1)
431 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
432 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
433 if ret < minDialTimeout {
439 // Returns whether an address is known to connect to a client with our own ID.
440 func (cl *Client) dopplegangerAddr(addr string) bool {
441 _, ok := cl.dopplegangerAddrs[addr]
445 // Start the process of connecting to the given peer for the given torrent if
447 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
448 if peer.Id == cl.peerID {
451 addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
452 if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
453 duplicateConnsAvoided.Add(1)
456 if r, ok := cl.ipBlockRange(peer.IP); ok {
457 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
460 t.halfOpen[addr] = struct{}{}
461 go cl.outgoingConnection(t, addr, peer.Source)
464 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
466 pendingPeers := len(t.peers)
468 return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
471 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
472 c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
474 c.(*net.TCPConn).SetLinger(0)
479 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
480 return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
483 // Returns a connection over UTP or TCP, whichever is first to connect.
484 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
485 // Initiate connections via TCP and UTP simultaneously. Use the first one
488 if !cl.config.DisableUTP {
491 if !cl.config.DisableTCP {
494 resCh := make(chan dialResult, left)
495 if !cl.config.DisableUTP {
496 go doDial(cl.dialUTP, resCh, true, addr, t)
498 if !cl.config.DisableTCP {
499 go doDial(cl.dialTCP, resCh, false, addr, t)
502 // Wait for a successful connection.
503 for ; left > 0 && res.Conn == nil; left-- {
507 // There are still incompleted dials.
509 for ; left > 0; left-- {
510 conn := (<-resCh).Conn
522 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
523 if _, ok := t.halfOpen[addr]; !ok {
524 panic("invariant broken")
526 delete(t.halfOpen, addr)
530 // Performs initiator handshakes and returns a connection. Returns nil
531 // *connection if no connection for valid reasons.
532 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
536 c.encrypted = encrypted
538 err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
542 ok, err := cl.initiateHandshakes(c, t)
549 // Returns nil connection and nil error if no connection could be established
550 // for valid reasons.
551 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
552 nc, utp := cl.dialFirst(addr, t)
556 c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
564 if cl.config.DisableEncryption {
565 // We already tried without encryption.
568 // Try again without encryption, using whichever protocol type worked last
571 nc, err = cl.dialUTP(addr, t)
573 nc, err = cl.dialTCP(addr, t)
576 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
579 c, err = cl.handshakesConnection(nc, t, false, utp)
580 if err != nil || c == nil {
586 // Called to dial out and run a connection. The addr we're given is already
587 // considered half-open.
588 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
589 c, err := cl.establishOutgoingConn(t, addr)
592 // Don't release lock between here and addConnection, unless it's for
594 cl.noLongerHalfOpen(t, addr)
597 log.Printf("error establishing outgoing connection: %s", err)
606 err = cl.runInitiatedHandshookConn(c, t)
609 log.Printf("error in established outgoing connection: %s", err)
614 // The port number for incoming peer connections. 0 if the client isn't
616 func (cl *Client) incomingPeerPort() int {
617 listenAddr := cl.ListenAddr()
618 if listenAddr == nil {
621 return missinggo.AddrPort(listenAddr)
624 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
625 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
626 func addrCompactIP(addr net.Addr) (string, error) {
627 host, _, err := net.SplitHostPort(addr.String())
631 ip := net.ParseIP(host)
632 if v4 := ip.To4(); v4 != nil {
636 return string(v4), nil
638 return string(ip.To16()), nil
641 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
653 peerExtensionBytes [8]byte
657 func (pex *peerExtensionBytes) SupportsExtended() bool {
658 return pex[5]&0x10 != 0
661 func (pex *peerExtensionBytes) SupportsDHT() bool {
662 return pex[7]&0x01 != 0
665 func (pex *peerExtensionBytes) SupportsFast() bool {
666 return pex[7]&0x04 != 0
669 type handshakeResult struct {
675 // ih is nil if we expect the peer to declare the InfoHash, such as when the
676 // peer initiated the connection. Returns ok if the handshake was successful,
677 // and err if there was an unexpected condition other than the peer simply
678 // abandoning the handshake.
679 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
680 // Bytes to be sent to the peer. Should never block the sender.
681 postCh := make(chan []byte, 4)
682 // A single error value sent when the writer completes.
683 writeDone := make(chan error, 1)
684 // Performs writes to the socket and ensures posts don't block.
685 go handshakeWriter(sock, postCh, writeDone)
688 close(postCh) // Done writing.
695 // Wait until writes complete before returning from handshake.
698 err = fmt.Errorf("error writing: %s", err)
702 post := func(bb []byte) {
706 panic("mustn't block while posting")
710 post([]byte(pp.Protocol))
712 if ih != nil { // We already know what we want.
717 _, err = io.ReadFull(sock, b[:68])
722 if string(b[:20]) != pp.Protocol {
725 missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
726 missinggo.CopyExact(&res.Hash, b[28:48])
727 missinggo.CopyExact(&res.peerID, b[48:68])
728 peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
730 // TODO: Maybe we can just drop peers here if we're not interested. This
731 // could prevent them trying to reconnect, falsely believing there was
733 if ih == nil { // We were waiting for the peer to tell us what they wanted.
742 // Wraps a raw connection and provides the interface we want for using the
743 // connection in the message loop.
744 type deadlineReader struct {
749 func (r deadlineReader) Read(b []byte) (n int, err error) {
750 // Keep-alives should be received every 2 mins. Give a bit of gracetime.
751 err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
753 err = fmt.Errorf("error setting read deadline: %s", err)
756 // Convert common errors into io.EOF.
758 // if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
760 // } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
770 type readWriter struct {
775 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
776 var protocol [len(pp.Protocol)]byte
777 _, err = io.ReadFull(rw, protocol[:])
782 io.MultiReader(bytes.NewReader(protocol[:]), rw),
785 if string(protocol[:]) == pp.Protocol {
789 ret, err = mse.ReceiveHandshake(ret, skeys)
793 func (cl *Client) receiveSkeys() (ret [][]byte) {
794 for ih := range cl.torrents {
795 ret = append(ret, ih[:])
800 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
802 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
807 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
808 if ih != t.infoHash {
814 // Do encryption and bittorrent handshakes as receiver.
815 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
817 skeys := cl.receiveSkeys()
819 if !cl.config.DisableEncryption {
820 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
822 if err == mse.ErrNoSecretKeyMatch {
828 ih, ok, err := cl.connBTHandshake(c, nil)
830 err = fmt.Errorf("error during bt handshake: %s", err)
842 // Returns !ok if handshake failed for valid reasons.
843 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
844 res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
845 if err != nil || !ok {
849 c.PeerExtensionBytes = res.peerExtensionBytes
850 c.PeerID = res.peerID
851 c.completedHandshake = time.Now()
855 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
856 if c.PeerID == cl.peerID {
857 // Only if we initiated the connection is the remote address a
858 // listen addr for a doppleganger.
860 addr := c.conn.RemoteAddr().String()
861 cl.dopplegangerAddrs[addr] = struct{}{}
864 return cl.runHandshookConn(c, t)
867 func (cl *Client) runReceivedConn(c *connection) (err error) {
868 err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
872 t, err := cl.receiveHandshakes(c)
874 err = fmt.Errorf("error receiving handshakes: %s", err)
882 if c.PeerID == cl.peerID {
885 return cl.runHandshookConn(c, t)
888 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
889 c.conn.SetWriteDeadline(time.Time{})
891 deadlineReader{c.conn, c.rw},
894 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
895 if !cl.addConnection(t, c) {
898 defer cl.dropConnection(t, c)
900 go c.writeOptimizer(time.Minute)
901 cl.sendInitialMessages(c, t)
902 err = cl.connectionLoop(t, c)
904 err = fmt.Errorf("error during connection loop: %s", err)
909 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
910 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
911 conn.Post(pp.Message{
913 ExtendedID: pp.HandshakeExtendedID,
914 ExtendedPayload: func() []byte {
915 d := map[string]interface{}{
916 "m": func() (ret map[string]int) {
917 ret = make(map[string]int, 2)
918 ret["ut_metadata"] = metadataExtendedId
919 if !cl.config.DisablePEX {
920 ret["ut_pex"] = pexExtendedId
924 "v": extendedHandshakeClientVersion,
925 // No upload queue is implemented yet.
928 if !cl.config.DisableEncryption {
931 if torrent.metadataSizeKnown() {
932 d["metadata_size"] = torrent.metadataSize()
934 if p := cl.incomingPeerPort(); p != 0 {
937 yourip, err := addrCompactIP(conn.remoteAddr())
939 log.Printf("error calculating yourip field value in extension handshake: %s", err)
943 // log.Printf("sending %v", d)
944 b, err := bencode.Marshal(d)
952 if torrent.haveAnyPieces() {
953 conn.Bitfield(torrent.bitfield())
954 } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
955 conn.Post(pp.Message{
959 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
960 conn.Post(pp.Message{
962 Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
967 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
968 conn.updateRequests()
971 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
979 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
980 if !cn.RequestPending(r) {
983 delete(cn.Requests, r)
987 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
991 if c.PeerExtensionIDs["ut_metadata"] == 0 {
992 // Peer doesn't support this.
995 // Request metadata pieces that we don't have in a random order.
997 for index := 0; index < t.metadataPieceCount(); index++ {
998 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
999 pending = append(pending, index)
1002 for _, i := range mathRand.Perm(len(pending)) {
1003 c.requestMetadataPiece(pending[i])
1007 // Process incoming ut_metadata message.
1008 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1009 var d map[string]int
1010 err = bencode.Unmarshal(payload, &d)
1012 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1015 msgType, ok := d["msg_type"]
1017 err = errors.New("missing msg_type field")
1022 case pp.DataMetadataExtensionMsgType:
1023 if !c.requestedMetadataPiece(piece) {
1024 err = fmt.Errorf("got unexpected piece %d", piece)
1027 c.metadataRequests[piece] = false
1028 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1029 if begin < 0 || begin >= len(payload) {
1030 err = fmt.Errorf("data has bad offset in payload: %d", begin)
1033 t.saveMetadataPiece(piece, payload[begin:])
1034 c.UsefulChunksReceived++
1035 c.lastUsefulChunkReceived = time.Now()
1036 t.maybeMetadataCompleted()
1037 case pp.RequestMetadataExtensionMsgType:
1038 if !t.haveMetadataPiece(piece) {
1039 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1042 start := (1 << 14) * piece
1043 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1044 case pp.RejectMetadataExtensionMsgType:
1046 err = errors.New("unknown msg_type value")
1051 func (cl *Client) upload(t *Torrent, c *connection) {
1052 if cl.config.NoUpload {
1055 if !c.PeerInterested {
1058 seeding := cl.seeding(t)
1059 if !seeding && !t.connHasWantedPieces(c) {
1063 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1065 for r := range c.PeerRequests {
1066 err := cl.sendChunk(t, c, r)
1068 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1069 // We had the piece, but not anymore.
1071 log.Printf("error sending chunk %+v to peer: %s", r, err)
1073 // If we failed to send a chunk, choke the peer to ensure they
1074 // flush all their requests. We've probably dropped a piece,
1075 // but there's no way to communicate this to the peer. If they
1076 // ask for it again, we'll kick them to allow us to send them
1077 // an updated bitfield.
1080 delete(c.PeerRequests, r)
1088 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1089 // Count the chunk being sent, even if it isn't.
1090 b := make([]byte, r.Length)
1091 p := t.info.Piece(int(r.Index))
1092 n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1095 panic("expected error")
1106 uploadChunksPosted.Add(1)
1107 c.lastChunkSent = time.Now()
1111 // Processes incoming bittorrent messages. The client lock is held upon entry
1113 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1114 decoder := pp.Decoder{
1115 R: bufio.NewReader(c.rw),
1116 MaxLength: 256 * 1024,
1121 err := decoder.Decode(&msg)
1123 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1129 c.lastMessageReceived = time.Now()
1131 receivedKeepalives.Add(1)
1134 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1139 // We can then reset our interest.
1142 cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1145 c.PeerChoked = false
1146 cl.peerUnchoked(t, c)
1148 c.PeerInterested = true
1150 case pp.NotInterested:
1151 c.PeerInterested = false
1154 err = c.peerSentHave(int(msg.Index))
1159 if !c.PeerInterested {
1160 err = errors.New("peer sent request but isn't interested")
1163 if !t.havePiece(msg.Index.Int()) {
1164 // This isn't necessarily them screwing up. We can drop pieces
1165 // from our storage, and can't communicate this to peers
1166 // except by reconnecting.
1167 requestsReceivedForMissingPieces.Add(1)
1168 err = errors.New("peer requested piece we don't have")
1171 if c.PeerRequests == nil {
1172 c.PeerRequests = make(map[request]struct{}, maxRequests)
1174 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1177 req := newRequest(msg.Index, msg.Begin, msg.Length)
1178 if !c.PeerCancel(req) {
1179 unexpectedCancels.Add(1)
1182 err = c.peerSentBitfield(msg.Bitfield)
1184 err = c.peerSentHaveAll()
1186 err = c.peerSentHaveNone()
1188 cl.downloadedChunk(t, c, &msg)
1190 switch msg.ExtendedID {
1191 case pp.HandshakeExtendedID:
1192 // TODO: Create a bencode struct for this.
1193 var d map[string]interface{}
1194 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1196 err = fmt.Errorf("error decoding extended message payload: %s", err)
1199 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1200 if reqq, ok := d["reqq"]; ok {
1201 if i, ok := reqq.(int64); ok {
1202 c.PeerMaxRequests = int(i)
1205 if v, ok := d["v"]; ok {
1206 c.PeerClientName = v.(string)
1210 err = errors.New("handshake missing m item")
1213 mTyped, ok := m.(map[string]interface{})
1215 err = errors.New("handshake m value is not dict")
1218 if c.PeerExtensionIDs == nil {
1219 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1221 for name, v := range mTyped {
1224 log.Printf("bad handshake m item extension ID type: %T", v)
1228 delete(c.PeerExtensionIDs, name)
1230 if c.PeerExtensionIDs[name] == 0 {
1231 supportedExtensionMessages.Add(name, 1)
1233 c.PeerExtensionIDs[name] = byte(id)
1236 metadata_sizeUntyped, ok := d["metadata_size"]
1238 metadata_size, ok := metadata_sizeUntyped.(int64)
1240 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1242 t.setMetadataSize(metadata_size, cl)
1245 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1246 cl.requestPendingMetadata(t, c)
1248 case metadataExtendedId:
1249 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1251 err = fmt.Errorf("error handling metadata extension message: %s", err)
1254 if cl.config.DisablePEX {
1257 var pexMsg peerExchangeMessage
1258 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1260 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1265 cl.addPeers(t, func() (ret []Peer) {
1266 for i, cp := range pexMsg.Added {
1268 IP: make([]byte, 4),
1270 Source: peerSourcePEX,
1272 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1273 p.SupportsEncryption = true
1275 missinggo.CopyExact(p.IP, cp.IP[:])
1276 ret = append(ret, p)
1283 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1286 // That client uses its own extension IDs for outgoing message
1287 // types, which is incorrect.
1288 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1289 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1297 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1302 pingAddr.Port = int(msg.Port)
1304 cl.dHT.Ping(pingAddr)
1306 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1314 // Returns true if connection is removed from torrent.Conns.
1315 func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
1316 for i0, _c := range t.conns {
1320 i1 := len(t.conns) - 1
1322 t.conns[i0] = t.conns[i1]
1324 t.conns = t.conns[:i1]
1330 func (cl *Client) dropConnection(t *Torrent, c *connection) {
1331 cl.event.Broadcast()
1333 if cl.deleteConnection(t, c) {
1338 // Returns true if the connection is added.
1339 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1340 if cl.closed.IsSet() {
1344 case <-t.ceasingNetworking:
1348 if !cl.wantConns(t) {
1351 for _, c0 := range t.conns {
1352 if c.PeerID == c0.PeerID {
1353 // Already connected to a client with that ID.
1354 duplicateClientConns.Add(1)
1358 if len(t.conns) >= socketsPerTorrent {
1359 c := t.worstBadConn(cl)
1363 if cl.config.Debug && missinggo.CryHeard() {
1364 log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
1367 cl.deleteConnection(t, c)
1369 if len(t.conns) >= socketsPerTorrent {
1372 t.conns = append(t.conns, c)
1377 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1378 if c.closed.IsSet() {
1382 return c.supportsExtension("ut_metadata")
1385 return c.PeerInterested
1387 return t.connHasWantedPieces(c)
1390 func (cl *Client) wantConns(t *Torrent) bool {
1391 if !cl.seeding(t) && !t.needData() {
1394 if len(t.conns) < socketsPerTorrent {
1397 return t.worstBadConn(cl) != nil
1400 func (cl *Client) openNewConns(t *Torrent) {
1402 case <-t.ceasingNetworking:
1406 for len(t.peers) != 0 {
1407 if !cl.wantConns(t) {
1410 if len(t.halfOpen) >= cl.halfOpenLimit {
1417 for k, p = range t.peers {
1421 cl.initiateConn(p, t)
1423 t.wantPeers.Broadcast()
1426 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1427 for _, p := range peers {
1428 if cl.dopplegangerAddr(net.JoinHostPort(
1430 strconv.FormatInt(int64(p.Port), 10),
1434 if _, ok := cl.ipBlockRange(p.IP); ok {
1438 // The spec says to scrub these yourselves. Fine.
1445 func (cl *Client) cachedMetaInfoFilename(ih metainfo.Hash) string {
1446 return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1449 func (cl *Client) saveTorrentFile(t *Torrent) error {
1450 path := cl.cachedMetaInfoFilename(t.infoHash)
1451 os.MkdirAll(filepath.Dir(path), 0777)
1452 f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1454 return fmt.Errorf("error opening file: %s", err)
1457 e := bencode.NewEncoder(f)
1458 err = e.Encode(t.metainfo())
1460 return fmt.Errorf("error marshalling metainfo: %s", err)
1462 mi, err := cl.torrentCacheMetaInfo(t.infoHash)
1464 // For example, a script kiddy makes us load too many files, and we're
1465 // able to save the torrent, but not load it again to check it.
1468 if !bytes.Equal(mi.Info.Hash.Bytes(), t.infoHash[:]) {
1469 log.Fatalf("%x != %x", mi.Info.Hash, t.infoHash[:])
1474 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1475 err = t.setMetadata(md, bytes)
1479 if !cl.config.DisableMetainfoCache {
1480 if err := cl.saveTorrentFile(t); err != nil {
1481 log.Printf("error saving torrent file for %s: %s", t, err)
1484 cl.event.Broadcast()
1485 close(t.gotMetainfo)
1489 // Prepare a Torrent without any attachment to a Client. That means we can
1490 // initialize fields all fields that don't require the Client without locking
1492 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1495 chunkSize: defaultChunkSize,
1496 peers: make(map[peersKey]Peer),
1498 closing: make(chan struct{}),
1499 ceasingNetworking: make(chan struct{}),
1501 gotMetainfo: make(chan struct{}),
1503 halfOpen: make(map[string]struct{}),
1504 pieceStateChanges: pubsub.NewPubSub(),
1510 // For shuffling the tracker tiers.
1511 mathRand.Seed(time.Now().Unix())
1514 type trackerTier []string
1516 // The trackers within each tier must be shuffled before use.
1517 // http://stackoverflow.com/a/12267471/149482
1518 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1519 func shuffleTier(tier trackerTier) {
1520 for i := range tier {
1521 j := mathRand.Intn(i + 1)
1522 tier[i], tier[j] = tier[j], tier[i]
1526 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1527 for _, tier := range base {
1528 copy = append(copy, append(trackerTier(nil), tier...))
1533 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1535 for _, url := range newURLs {
1536 for _, trURL := range tier {
1541 tier = append(tier, url)
1546 // A file-like handle to some torrent data resource.
1547 type Handle interface {
1554 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1555 // metainfo has the correct infohash.
1556 func (cl *Client) torrentCacheMetaInfo(ih metainfo.Hash) (mi *metainfo.MetaInfo, err error) {
1557 if cl.config.DisableMetainfoCache {
1560 f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1562 if os.IsNotExist(err) {
1568 dec := bencode.NewDecoder(f)
1569 err = dec.Decode(&mi)
1573 if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1574 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1580 // Specifies a new torrent for adding to a client. There are helpers for
1581 // magnet URIs and torrent metainfo files.
1582 type TorrentSpec struct {
1583 // The tiered tracker URIs.
1585 InfoHash metainfo.Hash
1586 Info *metainfo.InfoEx
1587 // The name to use if the Name field from the Info isn't available.
1589 // The chunk size to use for outbound requests. Defaults to 16KiB if not
1595 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1596 m, err := metainfo.ParseMagnetURI(uri)
1600 spec = &TorrentSpec{
1601 Trackers: [][]string{m.Trackers},
1602 DisplayName: m.DisplayName,
1603 InfoHash: m.InfoHash,
1608 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1609 spec = &TorrentSpec{
1610 Trackers: mi.AnnounceList,
1612 DisplayName: mi.Info.Name,
1615 if len(spec.Trackers) == 0 {
1616 spec.Trackers = [][]string{[]string{mi.Announce}}
1618 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1621 missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1625 // Add or merge a torrent spec. If the torrent is already present, the
1626 // trackers will be merged with the existing ones. If the Info isn't yet
1627 // known, it will be set. The display name is replaced if the new spec
1628 // provides one. Returns new if the torrent wasn't already in the client.
1629 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1631 defer cl.mu.Unlock()
1633 t, ok := cl.torrents[spec.InfoHash]
1637 // TODO: This doesn't belong in the core client, it's more of a
1639 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1640 err = errors.New("banned torrent")
1643 // TODO: Tidy this up?
1644 t = newTorrent(spec.InfoHash)
1646 t.wantPeers.L = &cl.mu
1647 if spec.ChunkSize != 0 {
1648 t.chunkSize = pp.Integer(spec.ChunkSize)
1650 t.storageOpener = spec.Storage
1651 if t.storageOpener == nil {
1652 t.storageOpener = cl.defaultStorage
1655 if spec.DisplayName != "" {
1656 t.setDisplayName(spec.DisplayName)
1658 // Try to merge in info we have on the torrent. Any err left will
1659 // terminate the function.
1661 if spec.Info != nil {
1662 err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1664 var mi *metainfo.MetaInfo
1665 mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1667 log.Printf("error getting cached metainfo: %s", err)
1669 } else if mi != nil {
1670 t.addTrackers(mi.AnnounceList)
1671 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1678 t.addTrackers(spec.Trackers)
1680 cl.torrents[spec.InfoHash] = t
1683 // From this point onwards, we can consider the torrent a part of the
1686 if !cl.config.DisableTrackers {
1687 go cl.announceTorrentTrackers(t)
1690 go cl.announceTorrentDHT(t, true)
1696 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1697 t, ok := cl.torrents[infoHash]
1699 err = fmt.Errorf("no such torrent")
1706 delete(cl.torrents, infoHash)
1710 // Returns true when peers are required, or false if the torrent is closing.
1711 func (cl *Client) waitWantPeers(t *Torrent) bool {
1713 defer cl.mu.Unlock()
1716 case <-t.ceasingNetworking:
1720 if len(t.peers) > torrentPeersLowWater {
1723 if t.needData() || cl.seeding(t) {
1731 // Returns whether the client should make effort to seed the torrent.
1732 func (cl *Client) seeding(t *Torrent) bool {
1733 if cl.config.NoUpload {
1736 if !cl.config.Seed {
1745 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1746 for cl.waitWantPeers(t) {
1747 // log.Printf("getting peers for %q from DHT", t)
1748 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1750 log.Printf("error getting peers from dht: %s", err)
1753 // Count all the unique addresses we got during this announce.
1754 allAddrs := make(map[string]struct{})
1758 case v, ok := <-ps.Peers:
1762 addPeers := make([]Peer, 0, len(v.Peers))
1763 for _, cp := range v.Peers {
1765 // Can't do anything with this.
1768 addPeers = append(addPeers, Peer{
1771 Source: peerSourceDHT,
1773 key := (&net.UDPAddr{
1777 allAddrs[key] = struct{}{}
1780 cl.addPeers(t, addPeers)
1781 numPeers := len(t.peers)
1783 if numPeers >= torrentPeersHighWater {
1786 case <-t.ceasingNetworking:
1792 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1796 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1797 url_, err := url.Parse(trRawURL)
1801 host, _, err := net.SplitHostPort(url_.Host)
1805 addr, err := net.ResolveIPAddr("ip", host)
1810 _, blocked = cl.ipBlockRange(addr.IP)
1815 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1816 blocked, err := cl.trackerBlockedUnlocked(tr)
1818 return fmt.Errorf("error determining if tracker blocked: %s", err)
1821 return fmt.Errorf("tracker blocked: %s", tr)
1823 resp, err := tracker.Announce(tr, req)
1825 return fmt.Errorf("error announcing: %s", err)
1828 for _, peer := range resp.Peers {
1829 peers = append(peers, Peer{
1835 cl.addPeers(t, peers)
1838 // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1840 time.Sleep(time.Second * time.Duration(resp.Interval))
1844 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1845 oks := make(chan bool)
1847 for _, tier := range trackers {
1848 for _, tr := range tier {
1850 go func(tr string) {
1851 err := cl.announceTorrentSingleTracker(tr, req, t)
1856 for outstanding > 0 {
1866 // Announce torrent to its trackers.
1867 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1868 req := tracker.AnnounceRequest{
1869 Event: tracker.Started,
1871 Port: uint16(cl.incomingPeerPort()),
1873 InfoHash: t.infoHash,
1875 if !cl.waitWantPeers(t) {
1879 req.Left = t.bytesLeftAnnounce()
1880 trackers := t.trackers
1882 if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1883 req.Event = tracker.None
1886 for cl.waitWantPeers(t) {
1888 req.Left = t.bytesLeftAnnounce()
1889 trackers = t.trackers
1891 numTrackersTried := 0
1892 for _, tier := range trackers {
1893 for trIndex, tr := range tier {
1895 err := cl.announceTorrentSingleTracker(tr, &req, t)
1899 // Float the successful announce to the top of the tier. If
1900 // the trackers list has been changed, we'll be modifying an
1901 // old copy so it won't matter.
1903 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1906 req.Event = tracker.None
1907 continue newAnnounce
1910 if numTrackersTried != 0 {
1911 log.Printf("%s: all trackers failed", t)
1913 // TODO: Wait until trackers are added if there are none.
1914 time.Sleep(10 * time.Second)
1918 func (cl *Client) allTorrentsCompleted() bool {
1919 for _, t := range cl.torrents {
1923 if t.numPiecesCompleted() != t.numPieces() {
1930 // Returns true when all torrents are completely downloaded and false if the
1931 // client is stopped before that.
1932 func (cl *Client) WaitAll() bool {
1934 defer cl.mu.Unlock()
1935 for !cl.allTorrentsCompleted() {
1936 if cl.closed.IsSet() {
1944 // Handle a received chunk from a peer.
1945 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1946 chunksReceived.Add(1)
1948 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1950 // Request has been satisfied.
1951 if cl.connDeleteRequest(t, c, req) {
1952 defer c.updateRequests()
1954 unexpectedChunksReceived.Add(1)
1957 index := int(req.Index)
1958 piece := &t.pieces[index]
1960 // Do we actually want this chunk?
1961 if !t.wantPiece(req) {
1962 unwantedChunksReceived.Add(1)
1963 c.UnwantedChunksReceived++
1967 c.UsefulChunksReceived++
1968 c.lastUsefulChunkReceived = time.Now()
1972 // Need to record that it hasn't been written yet, before we attempt to do
1973 // anything with it.
1974 piece.incrementPendingWrites()
1975 // Record that we have the chunk.
1976 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1978 // Cancel pending requests for this chunk.
1979 for _, c := range t.conns {
1980 if cl.connCancel(t, c, req) {
1986 // Write the chunk out.
1987 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1990 piece.decrementPendingWrites()
1993 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1995 t.updatePieceCompletion(int(msg.Index))
1999 // It's important that the piece is potentially queued before we check if
2000 // the piece is still wanted, because if it is queued, it won't be wanted.
2001 if t.pieceAllDirty(index) {
2002 cl.queuePieceCheck(t, int(req.Index))
2005 if c.peerTouchedPieces == nil {
2006 c.peerTouchedPieces = make(map[int]struct{})
2008 c.peerTouchedPieces[index] = struct{}{}
2010 cl.event.Broadcast()
2011 t.publishPieceChange(int(req.Index))
2015 // Return the connections that touched a piece, and clear the entry while
2017 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
2018 for _, c := range t.conns {
2019 if _, ok := c.peerTouchedPieces[piece]; ok {
2020 ret = append(ret, c)
2021 delete(c.peerTouchedPieces, piece)
2027 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
2028 p := &t.pieces[piece]
2030 // Don't score the first time a piece is hashed, it could be an
2033 pieceHashedCorrect.Add(1)
2035 log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2036 pieceHashedNotCorrect.Add(1)
2040 touchers := cl.reapPieceTouches(t, piece)
2042 err := p.Storage().MarkComplete()
2044 log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2046 t.updatePieceCompletion(piece)
2047 } else if len(touchers) != 0 {
2048 log.Printf("dropping %d conns that touched piece", len(touchers))
2049 for _, c := range touchers {
2050 cl.dropConnection(t, c)
2053 cl.pieceChanged(t, piece)
2056 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
2057 t.pendingPieces.Remove(piece)
2058 t.pendAllChunkSpecs(piece)
2059 for _, conn := range t.conns {
2061 for r := range conn.Requests {
2062 if int(r.Index) == piece {
2066 // Could check here if peer doesn't have piece, but due to caching
2067 // some peers may have said they have a piece but they don't.
2072 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
2073 if t.pieceAllDirty(piece) {
2074 t.pendAllChunkSpecs(piece)
2076 if !t.wantPieceIndex(piece) {
2080 for _, conn := range t.conns {
2081 if conn.PeerHasPiece(piece) {
2082 conn.updateRequests()
2087 func (cl *Client) pieceChanged(t *Torrent, piece int) {
2088 correct := t.pieceComplete(piece)
2089 defer cl.event.Broadcast()
2091 cl.onCompletedPiece(t, piece)
2093 cl.onFailedPiece(t, piece)
2095 if t.updatePiecePriority(piece) {
2096 t.piecePriorityChanged(piece)
2098 t.publishPieceChange(piece)
2101 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2103 defer cl.mu.Unlock()
2104 p := &t.pieces[piece]
2105 for p.Hashing || t.storage == nil {
2108 p.QueuedForHash = false
2109 if t.isClosed() || t.pieceComplete(piece) {
2110 t.updatePiecePriority(piece)
2111 t.publishPieceChange(piece)
2115 t.publishPieceChange(piece)
2117 sum := t.hashPiece(piece)
2125 cl.pieceHashed(t, piece, sum == p.Hash)
2128 // Returns handles to all the torrents loaded in the Client.
2129 func (cl *Client) Torrents() (ret []*Torrent) {
2131 for _, t := range cl.torrents {
2132 ret = append(ret, t)
2138 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
2139 spec, err := TorrentSpecFromMagnetURI(uri)
2143 T, _, err = cl.AddTorrentSpec(spec)
2147 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2148 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2150 missinggo.CastSlice(&ss, mi.Nodes)
2155 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2156 mi, err := metainfo.LoadFromFile(filename)
2160 return cl.AddTorrent(mi)
2163 func (cl *Client) DHT() *dht.Server {
2167 func (cl *Client) AddDHTNodes(nodes []string) {
2168 for _, n := range nodes {
2169 hmp := missinggo.SplitHostMaybePort(n)
2170 ip := net.ParseIP(hmp.Host)
2172 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2176 Addr: dht.NewAddr(&net.UDPAddr{
2181 cl.DHT().AddNode(ni)