21 "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/sync"
25 "github.com/anacrolix/utp"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/dht"
29 "github.com/anacrolix/torrent/iplist"
30 "github.com/anacrolix/torrent/metainfo"
31 "github.com/anacrolix/torrent/mse"
32 pp "github.com/anacrolix/torrent/peer_protocol"
33 "github.com/anacrolix/torrent/storage"
34 "github.com/anacrolix/torrent/tracker"
37 // Currently doesn't really queue, but should in the future.
38 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
39 piece := &t.pieces[pieceIndex]
40 if piece.QueuedForHash {
43 piece.QueuedForHash = true
44 t.publishPieceChange(pieceIndex)
45 go cl.verifyPiece(t, pieceIndex)
48 // Queue a piece check if one isn't already queued, and the piece has never
49 // been checked before.
50 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
52 if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
55 cl.queuePieceCheck(t, piece)
58 // Clients contain zero or more Torrents. A Client manages a blocklist, the
59 // TCP/UDP protocol ports, and DHT as desired.
63 listeners []net.Listener
66 ipBlockList iplist.Ranger
67 bannedTorrents map[metainfo.Hash]struct{}
69 pruneTimer *time.Timer
70 extensionBytes peerExtensionBytes
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
76 defaultStorage storage.I
80 closed missinggo.Event
82 torrents map[metainfo.Hash]*Torrent
85 func (cl *Client) IPBlockList() iplist.Ranger {
91 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
96 cl.dHT.SetIPBlockList(list)
100 func (cl *Client) PeerID() string {
101 return string(cl.peerID[:])
104 func (cl *Client) ListenAddr() (addr net.Addr) {
105 for _, l := range cl.listeners {
112 type hashSorter struct {
113 Hashes []metainfo.Hash
116 func (hs hashSorter) Len() int {
117 return len(hs.Hashes)
120 func (hs hashSorter) Less(a, b int) bool {
121 return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
124 func (hs hashSorter) Swap(a, b int) {
125 hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
128 func (cl *Client) sortedTorrents() (ret []*Torrent) {
130 for ih := range cl.torrents {
131 hs.Hashes = append(hs.Hashes, ih)
134 for _, ih := range hs.Hashes {
135 ret = append(ret, cl.torrent(ih))
140 // Writes out a human readable status of the client, such as for writing to a
142 func (cl *Client) WriteStatus(_w io.Writer) {
144 defer cl.mu.RUnlock()
145 w := bufio.NewWriter(_w)
147 if addr := cl.ListenAddr(); addr != nil {
148 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
150 fmt.Fprintln(w, "Not listening!")
152 fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
154 dhtStats := cl.dHT.Stats()
155 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
156 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
157 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
158 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
159 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
161 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
163 for _, t := range cl.sortedTorrents() {
165 fmt.Fprint(w, "<unknown name>")
167 fmt.Fprint(w, t.name())
171 fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
173 w.WriteString("<missing metainfo>")
181 // Creates a new client.
182 func NewClient(cfg *Config) (cl *Client, err error) {
193 halfOpenLimit: socketsPerTorrent,
195 defaultStorage: cfg.DefaultStorage,
196 dopplegangerAddrs: make(map[string]struct{}),
197 torrents: make(map[metainfo.Hash]*Torrent),
199 missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
201 if cl.defaultStorage == nil {
202 cl.defaultStorage = storage.NewFile(cfg.DataDir)
204 if cfg.IPBlocklist != nil {
205 cl.ipBlockList = cfg.IPBlocklist
208 if cfg.PeerID != "" {
209 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
211 o := copy(cl.peerID[:], bep20)
212 _, err = rand.Read(cl.peerID[o:])
214 panic("error generating peer id")
218 // Returns the laddr string to listen on for the next Listen call.
219 listenAddr := func() string {
220 if addr := cl.ListenAddr(); addr != nil {
223 if cfg.ListenAddr == "" {
226 return cfg.ListenAddr
228 if !cl.config.DisableTCP {
230 l, err = net.Listen(func() string {
231 if cl.config.DisableIPv6 {
240 cl.listeners = append(cl.listeners, l)
241 go cl.acceptConnections(l, false)
243 if !cl.config.DisableUTP {
244 cl.utpSock, err = utp.NewSocket(func() string {
245 if cl.config.DisableIPv6 {
254 cl.listeners = append(cl.listeners, cl.utpSock)
255 go cl.acceptConnections(cl.utpSock, true)
258 dhtCfg := cfg.DHTConfig
259 if dhtCfg.IPBlocklist == nil {
260 dhtCfg.IPBlocklist = cl.ipBlockList
262 if dhtCfg.Addr == "" {
263 dhtCfg.Addr = listenAddr()
265 if dhtCfg.Conn == nil && cl.utpSock != nil {
266 dhtCfg.Conn = cl.utpSock
268 cl.dHT, err = dht.NewServer(&dhtCfg)
277 // Stops the client. All connections to peers are closed and all activity will
279 func (cl *Client) Close() {
286 for _, l := range cl.listeners {
289 for _, t := range cl.torrents {
295 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
297 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
298 if cl.ipBlockList == nil {
302 // If blocklists are enabled, then block non-IPv4 addresses, because
303 // blocklists do not yet support IPv6.
305 if missinggo.CryHeard() {
306 log.Printf("blocking non-IPv4 address: %s", ip)
312 return cl.ipBlockList.Lookup(ip4)
315 func (cl *Client) waitAccept() {
319 for _, t := range cl.torrents {
324 if cl.closed.IsSet() {
331 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
334 conn, err := l.Accept()
335 conn = pproffd.WrapNetConn(conn)
336 if cl.closed.IsSet() {
344 // I think something harsher should happen here? Our accept
345 // routine just fucked off.
354 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
355 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
357 if blocked || doppleganger {
359 // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
363 go cl.incomingConnection(conn, utp)
367 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
369 if tc, ok := nc.(*net.TCPConn); ok {
375 c.Discovery = peerSourceIncoming
377 err := cl.runReceivedConn(c)
383 // Returns a handle to the given torrent, if it's present in the client.
384 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
387 t, ok = cl.torrents[ih]
391 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
392 return cl.torrents[ih]
395 type dialResult struct {
400 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
401 conn, err := dial(addr, t)
406 conn = nil // Pedantic
408 ch <- dialResult{conn, utp}
410 successfulDials.Add(1)
413 unsuccessfulDials.Add(1)
416 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
417 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
418 if ret < minDialTimeout {
424 // Returns whether an address is known to connect to a client with our own ID.
425 func (cl *Client) dopplegangerAddr(addr string) bool {
426 _, ok := cl.dopplegangerAddrs[addr]
430 // Start the process of connecting to the given peer for the given torrent if
432 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
433 if peer.Id == cl.peerID {
436 addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
437 if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
438 duplicateConnsAvoided.Add(1)
441 if r, ok := cl.ipBlockRange(peer.IP); ok {
442 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
445 t.halfOpen[addr] = struct{}{}
446 go cl.outgoingConnection(t, addr, peer.Source)
449 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
451 pendingPeers := len(t.peers)
453 return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
456 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
457 c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
459 c.(*net.TCPConn).SetLinger(0)
464 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
465 return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
468 // Returns a connection over UTP or TCP, whichever is first to connect.
469 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
470 // Initiate connections via TCP and UTP simultaneously. Use the first one
473 if !cl.config.DisableUTP {
476 if !cl.config.DisableTCP {
479 resCh := make(chan dialResult, left)
480 if !cl.config.DisableUTP {
481 go doDial(cl.dialUTP, resCh, true, addr, t)
483 if !cl.config.DisableTCP {
484 go doDial(cl.dialTCP, resCh, false, addr, t)
487 // Wait for a successful connection.
488 for ; left > 0 && res.Conn == nil; left-- {
492 // There are still incompleted dials.
494 for ; left > 0; left-- {
495 conn := (<-resCh).Conn
507 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
508 if _, ok := t.halfOpen[addr]; !ok {
509 panic("invariant broken")
511 delete(t.halfOpen, addr)
515 // Performs initiator handshakes and returns a connection. Returns nil
516 // *connection if no connection for valid reasons.
517 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
521 c.encrypted = encrypted
523 err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
527 ok, err := cl.initiateHandshakes(c, t)
534 // Returns nil connection and nil error if no connection could be established
535 // for valid reasons.
536 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
537 nc, utp := cl.dialFirst(addr, t)
541 c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
549 if cl.config.DisableEncryption {
550 // We already tried without encryption.
553 // Try again without encryption, using whichever protocol type worked last
556 nc, err = cl.dialUTP(addr, t)
558 nc, err = cl.dialTCP(addr, t)
561 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
564 c, err = cl.handshakesConnection(nc, t, false, utp)
565 if err != nil || c == nil {
571 // Called to dial out and run a connection. The addr we're given is already
572 // considered half-open.
573 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
574 c, err := cl.establishOutgoingConn(t, addr)
577 // Don't release lock between here and addConnection, unless it's for
579 cl.noLongerHalfOpen(t, addr)
582 log.Printf("error establishing outgoing connection: %s", err)
591 err = cl.runInitiatedHandshookConn(c, t)
594 log.Printf("error in established outgoing connection: %s", err)
599 // The port number for incoming peer connections. 0 if the client isn't
601 func (cl *Client) incomingPeerPort() int {
602 listenAddr := cl.ListenAddr()
603 if listenAddr == nil {
606 return missinggo.AddrPort(listenAddr)
609 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
610 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
611 func addrCompactIP(addr net.Addr) (string, error) {
612 host, _, err := net.SplitHostPort(addr.String())
616 ip := net.ParseIP(host)
617 if v4 := ip.To4(); v4 != nil {
621 return string(v4), nil
623 return string(ip.To16()), nil
626 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
638 peerExtensionBytes [8]byte
642 func (pex *peerExtensionBytes) SupportsExtended() bool {
643 return pex[5]&0x10 != 0
646 func (pex *peerExtensionBytes) SupportsDHT() bool {
647 return pex[7]&0x01 != 0
650 func (pex *peerExtensionBytes) SupportsFast() bool {
651 return pex[7]&0x04 != 0
654 type handshakeResult struct {
660 // ih is nil if we expect the peer to declare the InfoHash, such as when the
661 // peer initiated the connection. Returns ok if the handshake was successful,
662 // and err if there was an unexpected condition other than the peer simply
663 // abandoning the handshake.
664 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
665 // Bytes to be sent to the peer. Should never block the sender.
666 postCh := make(chan []byte, 4)
667 // A single error value sent when the writer completes.
668 writeDone := make(chan error, 1)
669 // Performs writes to the socket and ensures posts don't block.
670 go handshakeWriter(sock, postCh, writeDone)
673 close(postCh) // Done writing.
680 // Wait until writes complete before returning from handshake.
683 err = fmt.Errorf("error writing: %s", err)
687 post := func(bb []byte) {
691 panic("mustn't block while posting")
695 post([]byte(pp.Protocol))
697 if ih != nil { // We already know what we want.
702 _, err = io.ReadFull(sock, b[:68])
707 if string(b[:20]) != pp.Protocol {
710 missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
711 missinggo.CopyExact(&res.Hash, b[28:48])
712 missinggo.CopyExact(&res.peerID, b[48:68])
713 peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
715 // TODO: Maybe we can just drop peers here if we're not interested. This
716 // could prevent them trying to reconnect, falsely believing there was
718 if ih == nil { // We were waiting for the peer to tell us what they wanted.
727 // Wraps a raw connection and provides the interface we want for using the
728 // connection in the message loop.
729 type deadlineReader struct {
734 func (r deadlineReader) Read(b []byte) (n int, err error) {
735 // Keep-alives should be received every 2 mins. Give a bit of gracetime.
736 err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
738 err = fmt.Errorf("error setting read deadline: %s", err)
741 // Convert common errors into io.EOF.
743 // if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
745 // } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
755 type readWriter struct {
760 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
761 var protocol [len(pp.Protocol)]byte
762 _, err = io.ReadFull(rw, protocol[:])
767 io.MultiReader(bytes.NewReader(protocol[:]), rw),
770 if string(protocol[:]) == pp.Protocol {
774 ret, err = mse.ReceiveHandshake(ret, skeys)
778 func (cl *Client) receiveSkeys() (ret [][]byte) {
779 for ih := range cl.torrents {
780 ret = append(ret, ih[:])
785 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
787 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
792 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
793 if ih != t.infoHash {
799 // Do encryption and bittorrent handshakes as receiver.
800 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
802 skeys := cl.receiveSkeys()
804 if !cl.config.DisableEncryption {
805 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
807 if err == mse.ErrNoSecretKeyMatch {
813 ih, ok, err := cl.connBTHandshake(c, nil)
815 err = fmt.Errorf("error during bt handshake: %s", err)
827 // Returns !ok if handshake failed for valid reasons.
828 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
829 res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
830 if err != nil || !ok {
834 c.PeerExtensionBytes = res.peerExtensionBytes
835 c.PeerID = res.peerID
836 c.completedHandshake = time.Now()
840 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
841 if c.PeerID == cl.peerID {
842 // Only if we initiated the connection is the remote address a
843 // listen addr for a doppleganger.
845 addr := c.conn.RemoteAddr().String()
846 cl.dopplegangerAddrs[addr] = struct{}{}
849 return cl.runHandshookConn(c, t)
852 func (cl *Client) runReceivedConn(c *connection) (err error) {
853 err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
857 t, err := cl.receiveHandshakes(c)
859 err = fmt.Errorf("error receiving handshakes: %s", err)
867 if c.PeerID == cl.peerID {
870 return cl.runHandshookConn(c, t)
873 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
874 c.conn.SetWriteDeadline(time.Time{})
876 deadlineReader{c.conn, c.rw},
879 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
880 if !cl.addConnection(t, c) {
883 defer cl.dropConnection(t, c)
885 go c.writeOptimizer(time.Minute)
886 cl.sendInitialMessages(c, t)
887 err = cl.connectionLoop(t, c)
889 err = fmt.Errorf("error during connection loop: %s", err)
894 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
895 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
896 conn.Post(pp.Message{
898 ExtendedID: pp.HandshakeExtendedID,
899 ExtendedPayload: func() []byte {
900 d := map[string]interface{}{
901 "m": func() (ret map[string]int) {
902 ret = make(map[string]int, 2)
903 ret["ut_metadata"] = metadataExtendedId
904 if !cl.config.DisablePEX {
905 ret["ut_pex"] = pexExtendedId
909 "v": extendedHandshakeClientVersion,
910 // No upload queue is implemented yet.
913 if !cl.config.DisableEncryption {
916 if torrent.metadataSizeKnown() {
917 d["metadata_size"] = torrent.metadataSize()
919 if p := cl.incomingPeerPort(); p != 0 {
922 yourip, err := addrCompactIP(conn.remoteAddr())
924 log.Printf("error calculating yourip field value in extension handshake: %s", err)
928 // log.Printf("sending %v", d)
929 b, err := bencode.Marshal(d)
937 if torrent.haveAnyPieces() {
938 conn.Bitfield(torrent.bitfield())
939 } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
940 conn.Post(pp.Message{
944 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
945 conn.Post(pp.Message{
947 Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
952 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
953 conn.updateRequests()
956 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
964 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
965 if !cn.RequestPending(r) {
968 delete(cn.Requests, r)
972 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
976 if c.PeerExtensionIDs["ut_metadata"] == 0 {
977 // Peer doesn't support this.
980 // Request metadata pieces that we don't have in a random order.
982 for index := 0; index < t.metadataPieceCount(); index++ {
983 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
984 pending = append(pending, index)
987 for _, i := range mathRand.Perm(len(pending)) {
988 c.requestMetadataPiece(pending[i])
992 // Process incoming ut_metadata message.
993 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
995 err = bencode.Unmarshal(payload, &d)
997 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1000 msgType, ok := d["msg_type"]
1002 err = errors.New("missing msg_type field")
1007 case pp.DataMetadataExtensionMsgType:
1008 if !c.requestedMetadataPiece(piece) {
1009 err = fmt.Errorf("got unexpected piece %d", piece)
1012 c.metadataRequests[piece] = false
1013 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1014 if begin < 0 || begin >= len(payload) {
1015 err = fmt.Errorf("data has bad offset in payload: %d", begin)
1018 t.saveMetadataPiece(piece, payload[begin:])
1019 c.UsefulChunksReceived++
1020 c.lastUsefulChunkReceived = time.Now()
1021 t.maybeMetadataCompleted()
1022 case pp.RequestMetadataExtensionMsgType:
1023 if !t.haveMetadataPiece(piece) {
1024 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1027 start := (1 << 14) * piece
1028 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1029 case pp.RejectMetadataExtensionMsgType:
1031 err = errors.New("unknown msg_type value")
1036 func (cl *Client) upload(t *Torrent, c *connection) {
1037 if cl.config.NoUpload {
1040 if !c.PeerInterested {
1043 seeding := cl.seeding(t)
1044 if !seeding && !t.connHasWantedPieces(c) {
1048 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1050 for r := range c.PeerRequests {
1051 err := cl.sendChunk(t, c, r)
1053 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1054 // We had the piece, but not anymore.
1056 log.Printf("error sending chunk %+v to peer: %s", r, err)
1058 // If we failed to send a chunk, choke the peer to ensure they
1059 // flush all their requests. We've probably dropped a piece,
1060 // but there's no way to communicate this to the peer. If they
1061 // ask for it again, we'll kick them to allow us to send them
1062 // an updated bitfield.
1065 delete(c.PeerRequests, r)
1073 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1074 // Count the chunk being sent, even if it isn't.
1075 b := make([]byte, r.Length)
1076 p := t.info.Piece(int(r.Index))
1077 n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1080 panic("expected error")
1091 uploadChunksPosted.Add(1)
1092 c.lastChunkSent = time.Now()
1096 // Processes incoming bittorrent messages. The client lock is held upon entry
1098 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1099 decoder := pp.Decoder{
1100 R: bufio.NewReader(c.rw),
1101 MaxLength: 256 * 1024,
1106 err := decoder.Decode(&msg)
1108 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1114 c.lastMessageReceived = time.Now()
1116 receivedKeepalives.Add(1)
1119 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1124 // We can then reset our interest.
1127 cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1130 c.PeerChoked = false
1131 cl.peerUnchoked(t, c)
1133 c.PeerInterested = true
1135 case pp.NotInterested:
1136 c.PeerInterested = false
1139 err = c.peerSentHave(int(msg.Index))
1144 if !c.PeerInterested {
1145 err = errors.New("peer sent request but isn't interested")
1148 if !t.havePiece(msg.Index.Int()) {
1149 // This isn't necessarily them screwing up. We can drop pieces
1150 // from our storage, and can't communicate this to peers
1151 // except by reconnecting.
1152 requestsReceivedForMissingPieces.Add(1)
1153 err = errors.New("peer requested piece we don't have")
1156 if c.PeerRequests == nil {
1157 c.PeerRequests = make(map[request]struct{}, maxRequests)
1159 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1162 req := newRequest(msg.Index, msg.Begin, msg.Length)
1163 if !c.PeerCancel(req) {
1164 unexpectedCancels.Add(1)
1167 err = c.peerSentBitfield(msg.Bitfield)
1169 err = c.peerSentHaveAll()
1171 err = c.peerSentHaveNone()
1173 cl.downloadedChunk(t, c, &msg)
1175 switch msg.ExtendedID {
1176 case pp.HandshakeExtendedID:
1177 // TODO: Create a bencode struct for this.
1178 var d map[string]interface{}
1179 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1181 err = fmt.Errorf("error decoding extended message payload: %s", err)
1184 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1185 if reqq, ok := d["reqq"]; ok {
1186 if i, ok := reqq.(int64); ok {
1187 c.PeerMaxRequests = int(i)
1190 if v, ok := d["v"]; ok {
1191 c.PeerClientName = v.(string)
1195 err = errors.New("handshake missing m item")
1198 mTyped, ok := m.(map[string]interface{})
1200 err = errors.New("handshake m value is not dict")
1203 if c.PeerExtensionIDs == nil {
1204 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1206 for name, v := range mTyped {
1209 log.Printf("bad handshake m item extension ID type: %T", v)
1213 delete(c.PeerExtensionIDs, name)
1215 if c.PeerExtensionIDs[name] == 0 {
1216 supportedExtensionMessages.Add(name, 1)
1218 c.PeerExtensionIDs[name] = byte(id)
1221 metadata_sizeUntyped, ok := d["metadata_size"]
1223 metadata_size, ok := metadata_sizeUntyped.(int64)
1225 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1227 t.setMetadataSize(metadata_size, cl)
1230 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1231 cl.requestPendingMetadata(t, c)
1233 case metadataExtendedId:
1234 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1236 err = fmt.Errorf("error handling metadata extension message: %s", err)
1239 if cl.config.DisablePEX {
1242 var pexMsg peerExchangeMessage
1243 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1245 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1250 cl.addPeers(t, func() (ret []Peer) {
1251 for i, cp := range pexMsg.Added {
1253 IP: make([]byte, 4),
1255 Source: peerSourcePEX,
1257 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1258 p.SupportsEncryption = true
1260 missinggo.CopyExact(p.IP, cp.IP[:])
1261 ret = append(ret, p)
1268 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1271 // That client uses its own extension IDs for outgoing message
1272 // types, which is incorrect.
1273 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1274 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1282 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1287 pingAddr.Port = int(msg.Port)
1289 cl.dHT.Ping(pingAddr)
1291 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1299 // Returns true if connection is removed from torrent.Conns.
1300 func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
1301 for i0, _c := range t.conns {
1305 i1 := len(t.conns) - 1
1307 t.conns[i0] = t.conns[i1]
1309 t.conns = t.conns[:i1]
1315 func (cl *Client) dropConnection(t *Torrent, c *connection) {
1316 cl.event.Broadcast()
1318 if cl.deleteConnection(t, c) {
1323 // Returns true if the connection is added.
1324 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1325 if cl.closed.IsSet() {
1329 case <-t.ceasingNetworking:
1333 if !cl.wantConns(t) {
1336 for _, c0 := range t.conns {
1337 if c.PeerID == c0.PeerID {
1338 // Already connected to a client with that ID.
1339 duplicateClientConns.Add(1)
1343 if len(t.conns) >= socketsPerTorrent {
1344 c := t.worstBadConn(cl)
1348 if cl.config.Debug && missinggo.CryHeard() {
1349 log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
1352 cl.deleteConnection(t, c)
1354 if len(t.conns) >= socketsPerTorrent {
1357 t.conns = append(t.conns, c)
1362 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1363 if c.closed.IsSet() {
1367 return c.supportsExtension("ut_metadata")
1370 return c.PeerInterested
1372 return t.connHasWantedPieces(c)
1375 func (cl *Client) wantConns(t *Torrent) bool {
1376 if !cl.seeding(t) && !t.needData() {
1379 if len(t.conns) < socketsPerTorrent {
1382 return t.worstBadConn(cl) != nil
1385 func (cl *Client) openNewConns(t *Torrent) {
1387 case <-t.ceasingNetworking:
1391 for len(t.peers) != 0 {
1392 if !cl.wantConns(t) {
1395 if len(t.halfOpen) >= cl.halfOpenLimit {
1402 for k, p = range t.peers {
1406 cl.initiateConn(p, t)
1408 t.wantPeers.Broadcast()
1411 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1412 for _, p := range peers {
1413 if cl.dopplegangerAddr(net.JoinHostPort(
1415 strconv.FormatInt(int64(p.Port), 10),
1419 if _, ok := cl.ipBlockRange(p.IP); ok {
1423 // The spec says to scrub these yourselves. Fine.
1430 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1431 err = t.setMetadata(md, bytes)
1435 cl.event.Broadcast()
1436 close(t.gotMetainfo)
1440 // Prepare a Torrent without any attachment to a Client. That means we can
1441 // initialize fields all fields that don't require the Client without locking
1443 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1446 chunkSize: defaultChunkSize,
1447 peers: make(map[peersKey]Peer),
1449 closing: make(chan struct{}),
1450 ceasingNetworking: make(chan struct{}),
1452 gotMetainfo: make(chan struct{}),
1454 halfOpen: make(map[string]struct{}),
1455 pieceStateChanges: pubsub.NewPubSub(),
1461 // For shuffling the tracker tiers.
1462 mathRand.Seed(time.Now().Unix())
1465 type trackerTier []string
1467 // The trackers within each tier must be shuffled before use.
1468 // http://stackoverflow.com/a/12267471/149482
1469 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1470 func shuffleTier(tier trackerTier) {
1471 for i := range tier {
1472 j := mathRand.Intn(i + 1)
1473 tier[i], tier[j] = tier[j], tier[i]
1477 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1478 for _, tier := range base {
1479 copy = append(copy, append(trackerTier(nil), tier...))
1484 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1486 for _, url := range newURLs {
1487 for _, trURL := range tier {
1492 tier = append(tier, url)
1497 // A file-like handle to some torrent data resource.
1498 type Handle interface {
1505 // Specifies a new torrent for adding to a client. There are helpers for
1506 // magnet URIs and torrent metainfo files.
1507 type TorrentSpec struct {
1508 // The tiered tracker URIs.
1510 InfoHash metainfo.Hash
1511 Info *metainfo.InfoEx
1512 // The name to use if the Name field from the Info isn't available.
1514 // The chunk size to use for outbound requests. Defaults to 16KiB if not
1520 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1521 m, err := metainfo.ParseMagnetURI(uri)
1525 spec = &TorrentSpec{
1526 Trackers: [][]string{m.Trackers},
1527 DisplayName: m.DisplayName,
1528 InfoHash: m.InfoHash,
1533 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1534 spec = &TorrentSpec{
1535 Trackers: mi.AnnounceList,
1537 DisplayName: mi.Info.Name,
1538 InfoHash: mi.Info.Hash(),
1540 if len(spec.Trackers) == 0 {
1541 spec.Trackers = [][]string{[]string{mi.Announce}}
1543 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1548 // Add or merge a torrent spec. If the torrent is already present, the
1549 // trackers will be merged with the existing ones. If the Info isn't yet
1550 // known, it will be set. The display name is replaced if the new spec
1551 // provides one. Returns new if the torrent wasn't already in the client.
1552 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1554 defer cl.mu.Unlock()
1556 t, ok := cl.torrents[spec.InfoHash]
1560 // TODO: This doesn't belong in the core client, it's more of a
1562 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1563 err = errors.New("banned torrent")
1566 // TODO: Tidy this up?
1567 t = newTorrent(spec.InfoHash)
1569 t.wantPeers.L = &cl.mu
1570 if spec.ChunkSize != 0 {
1571 t.chunkSize = pp.Integer(spec.ChunkSize)
1573 t.storageOpener = spec.Storage
1574 if t.storageOpener == nil {
1575 t.storageOpener = cl.defaultStorage
1578 if spec.DisplayName != "" {
1579 t.setDisplayName(spec.DisplayName)
1581 // Try to merge in info we have on the torrent. Any err left will
1582 // terminate the function.
1583 if t.info == nil && spec.Info != nil {
1584 err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1589 t.addTrackers(spec.Trackers)
1591 cl.torrents[spec.InfoHash] = t
1594 // From this point onwards, we can consider the torrent a part of the
1597 if !cl.config.DisableTrackers {
1598 go cl.announceTorrentTrackers(t)
1601 go cl.announceTorrentDHT(t, true)
1607 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1608 t, ok := cl.torrents[infoHash]
1610 err = fmt.Errorf("no such torrent")
1617 delete(cl.torrents, infoHash)
1621 // Returns true when peers are required, or false if the torrent is closing.
1622 func (cl *Client) waitWantPeers(t *Torrent) bool {
1624 defer cl.mu.Unlock()
1627 case <-t.ceasingNetworking:
1631 if len(t.peers) > torrentPeersLowWater {
1634 if t.needData() || cl.seeding(t) {
1642 // Returns whether the client should make effort to seed the torrent.
1643 func (cl *Client) seeding(t *Torrent) bool {
1644 if cl.config.NoUpload {
1647 if !cl.config.Seed {
1656 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1657 for cl.waitWantPeers(t) {
1658 // log.Printf("getting peers for %q from DHT", t)
1659 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1661 log.Printf("error getting peers from dht: %s", err)
1664 // Count all the unique addresses we got during this announce.
1665 allAddrs := make(map[string]struct{})
1669 case v, ok := <-ps.Peers:
1673 addPeers := make([]Peer, 0, len(v.Peers))
1674 for _, cp := range v.Peers {
1676 // Can't do anything with this.
1679 addPeers = append(addPeers, Peer{
1682 Source: peerSourceDHT,
1684 key := (&net.UDPAddr{
1688 allAddrs[key] = struct{}{}
1691 cl.addPeers(t, addPeers)
1692 numPeers := len(t.peers)
1694 if numPeers >= torrentPeersHighWater {
1697 case <-t.ceasingNetworking:
1703 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1707 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1708 url_, err := url.Parse(trRawURL)
1712 host, _, err := net.SplitHostPort(url_.Host)
1716 addr, err := net.ResolveIPAddr("ip", host)
1721 _, blocked = cl.ipBlockRange(addr.IP)
1726 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1727 blocked, err := cl.trackerBlockedUnlocked(tr)
1729 return fmt.Errorf("error determining if tracker blocked: %s", err)
1732 return fmt.Errorf("tracker blocked: %s", tr)
1734 resp, err := tracker.Announce(tr, req)
1736 return fmt.Errorf("error announcing: %s", err)
1739 for _, peer := range resp.Peers {
1740 peers = append(peers, Peer{
1746 cl.addPeers(t, peers)
1749 // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1751 time.Sleep(time.Second * time.Duration(resp.Interval))
1755 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1756 oks := make(chan bool)
1758 for _, tier := range trackers {
1759 for _, tr := range tier {
1761 go func(tr string) {
1762 err := cl.announceTorrentSingleTracker(tr, req, t)
1767 for outstanding > 0 {
1777 // Announce torrent to its trackers.
1778 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1779 req := tracker.AnnounceRequest{
1780 Event: tracker.Started,
1782 Port: uint16(cl.incomingPeerPort()),
1784 InfoHash: t.infoHash,
1786 if !cl.waitWantPeers(t) {
1790 req.Left = t.bytesLeftAnnounce()
1791 trackers := t.trackers
1793 if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1794 req.Event = tracker.None
1797 for cl.waitWantPeers(t) {
1799 req.Left = t.bytesLeftAnnounce()
1800 trackers = t.trackers
1802 numTrackersTried := 0
1803 for _, tier := range trackers {
1804 for trIndex, tr := range tier {
1806 err := cl.announceTorrentSingleTracker(tr, &req, t)
1810 // Float the successful announce to the top of the tier. If
1811 // the trackers list has been changed, we'll be modifying an
1812 // old copy so it won't matter.
1814 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1817 req.Event = tracker.None
1818 continue newAnnounce
1821 if numTrackersTried != 0 {
1822 log.Printf("%s: all trackers failed", t)
1824 // TODO: Wait until trackers are added if there are none.
1825 time.Sleep(10 * time.Second)
1829 func (cl *Client) allTorrentsCompleted() bool {
1830 for _, t := range cl.torrents {
1834 if t.numPiecesCompleted() != t.numPieces() {
1841 // Returns true when all torrents are completely downloaded and false if the
1842 // client is stopped before that.
1843 func (cl *Client) WaitAll() bool {
1845 defer cl.mu.Unlock()
1846 for !cl.allTorrentsCompleted() {
1847 if cl.closed.IsSet() {
1855 // Handle a received chunk from a peer.
1856 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1857 chunksReceived.Add(1)
1859 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1861 // Request has been satisfied.
1862 if cl.connDeleteRequest(t, c, req) {
1863 defer c.updateRequests()
1865 unexpectedChunksReceived.Add(1)
1868 index := int(req.Index)
1869 piece := &t.pieces[index]
1871 // Do we actually want this chunk?
1872 if !t.wantPiece(req) {
1873 unwantedChunksReceived.Add(1)
1874 c.UnwantedChunksReceived++
1878 c.UsefulChunksReceived++
1879 c.lastUsefulChunkReceived = time.Now()
1883 // Need to record that it hasn't been written yet, before we attempt to do
1884 // anything with it.
1885 piece.incrementPendingWrites()
1886 // Record that we have the chunk.
1887 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1889 // Cancel pending requests for this chunk.
1890 for _, c := range t.conns {
1891 if cl.connCancel(t, c, req) {
1897 // Write the chunk out.
1898 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1901 piece.decrementPendingWrites()
1904 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1906 t.updatePieceCompletion(int(msg.Index))
1910 // It's important that the piece is potentially queued before we check if
1911 // the piece is still wanted, because if it is queued, it won't be wanted.
1912 if t.pieceAllDirty(index) {
1913 cl.queuePieceCheck(t, int(req.Index))
1916 if c.peerTouchedPieces == nil {
1917 c.peerTouchedPieces = make(map[int]struct{})
1919 c.peerTouchedPieces[index] = struct{}{}
1921 cl.event.Broadcast()
1922 t.publishPieceChange(int(req.Index))
1926 // Return the connections that touched a piece, and clear the entry while
1928 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1929 for _, c := range t.conns {
1930 if _, ok := c.peerTouchedPieces[piece]; ok {
1931 ret = append(ret, c)
1932 delete(c.peerTouchedPieces, piece)
1938 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1939 p := &t.pieces[piece]
1941 // Don't score the first time a piece is hashed, it could be an
1944 pieceHashedCorrect.Add(1)
1946 log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1947 pieceHashedNotCorrect.Add(1)
1951 touchers := cl.reapPieceTouches(t, piece)
1953 err := p.Storage().MarkComplete()
1955 log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1957 t.updatePieceCompletion(piece)
1958 } else if len(touchers) != 0 {
1959 log.Printf("dropping %d conns that touched piece", len(touchers))
1960 for _, c := range touchers {
1961 cl.dropConnection(t, c)
1964 cl.pieceChanged(t, piece)
1967 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1968 t.pendingPieces.Remove(piece)
1969 t.pendAllChunkSpecs(piece)
1970 for _, conn := range t.conns {
1972 for r := range conn.Requests {
1973 if int(r.Index) == piece {
1977 // Could check here if peer doesn't have piece, but due to caching
1978 // some peers may have said they have a piece but they don't.
1983 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1984 if t.pieceAllDirty(piece) {
1985 t.pendAllChunkSpecs(piece)
1987 if !t.wantPieceIndex(piece) {
1991 for _, conn := range t.conns {
1992 if conn.PeerHasPiece(piece) {
1993 conn.updateRequests()
1998 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1999 correct := t.pieceComplete(piece)
2000 defer cl.event.Broadcast()
2002 cl.onCompletedPiece(t, piece)
2004 cl.onFailedPiece(t, piece)
2006 if t.updatePiecePriority(piece) {
2007 t.piecePriorityChanged(piece)
2009 t.publishPieceChange(piece)
2012 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2014 defer cl.mu.Unlock()
2015 p := &t.pieces[piece]
2016 for p.Hashing || t.storage == nil {
2019 p.QueuedForHash = false
2020 if t.isClosed() || t.pieceComplete(piece) {
2021 t.updatePiecePriority(piece)
2022 t.publishPieceChange(piece)
2026 t.publishPieceChange(piece)
2028 sum := t.hashPiece(piece)
2036 cl.pieceHashed(t, piece, sum == p.Hash)
2039 // Returns handles to all the torrents loaded in the Client.
2040 func (cl *Client) Torrents() (ret []*Torrent) {
2042 for _, t := range cl.torrents {
2043 ret = append(ret, t)
2049 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
2050 spec, err := TorrentSpecFromMagnetURI(uri)
2054 T, _, err = cl.AddTorrentSpec(spec)
2058 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2059 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2061 missinggo.CastSlice(&ss, mi.Nodes)
2066 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2067 mi, err := metainfo.LoadFromFile(filename)
2071 return cl.AddTorrent(mi)
2074 func (cl *Client) DHT() *dht.Server {
2078 func (cl *Client) AddDHTNodes(nodes []string) {
2079 for _, n := range nodes {
2080 hmp := missinggo.SplitHostMaybePort(n)
2081 ip := net.ParseIP(hmp.Host)
2083 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2087 Addr: dht.NewAddr(&net.UDPAddr{
2092 cl.DHT().AddNode(ni)