]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Significant progress and improvements
authorMatt Joiner <anacrolix@gmail.com>
Sun, 20 Oct 2013 14:07:01 +0000 (01:07 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 20 Oct 2013 14:07:01 +0000 (01:07 +1100)
Piece state is broken up into several dimensions.
Implement keep-alive in connection writer.
Lazily hash pieces, only as requested.
Replace client actor mechanism with a mutex.
Fix runConnection/connectionLoop/handshake misnomers.
Fix broken reading from partially complete pieces.

client.go
cmd/torrent-verify/main.go
cmd/torrentfs/main.go
mmap_span.go

index f6d07a7b58c40b86a3afa439e05917a00fe2e783..695358616e5584559621ba73340bde458e5f8bf8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -17,11 +17,13 @@ import (
        "os"
        "path/filepath"
        "sort"
+       "sync"
+       "time"
 )
 
 const (
        PieceHash   = crypto.SHA1
-       maxRequests = 10
+       maxRequests = 250
        chunkSize   = 0x4000 // 16KiB
        BEP20       = "-GT0000-"
 )
@@ -45,21 +47,31 @@ func BytesInfoHash(b []byte) (ih InfoHash) {
 
 type pieceState uint8
 
-const (
-       pieceStateUnknown = iota
-       pieceStateComplete
-       pieceStateIncomplete
-)
-
 type piece struct {
-       State             pieceState
        Hash              pieceSum
        PendingChunkSpecs map[ChunkSpec]struct{}
+       Hashing           bool
+       EverHashed        bool
+}
+
+func (p *piece) Complete() bool {
+       return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed
+}
+
+func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
+       cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
+       cs.Length = pieceLength - cs.Begin
+       return
 }
 
-func (p piece) NumPendingBytes() (count peer_protocol.Integer) {
-       for cs, _ := range p.PendingChunkSpecs {
-               count += cs.Length
+func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
+       pendingChunks := t.Pieces[index].PendingChunkSpecs
+       count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
+       _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
+       if _lastChunkSpec.Length != chunkSize {
+               if _, ok := pendingChunks[_lastChunkSpec]; ok {
+                       count += _lastChunkSpec.Length - chunkSize
+               }
        }
        return
 }
@@ -145,11 +157,22 @@ func (c *connection) SetInterested(interested bool) {
        c.Interested = interested
 }
 
+var (
+       keepAliveBytes [4]byte
+)
+
 func (conn *connection) writer() {
        for {
-               b := <-conn.write
-               if b == nil {
-                       break
+               timer := time.NewTimer(time.Minute)
+               var b []byte
+               select {
+               case <-timer.C:
+                       b = keepAliveBytes[:]
+               case b = <-conn.write:
+                       timer.Stop()
+                       if b == nil {
+                               return
+                       }
                }
                n, err := conn.Socket.Write(b)
                if err != nil {
@@ -192,7 +215,7 @@ func (conn *connection) writeOptimizer() {
 
 type Torrent struct {
        InfoHash   InfoHash
-       Pieces     []piece
+       Pieces     []*piece
        Data       MMapSpan
        MetaInfo   *metainfo.MetaInfo
        Conns      []*connection
@@ -200,9 +223,16 @@ type Torrent struct {
        Priorities *list.List
 }
 
+func (t *Torrent) Close() (err error) {
+       t.Data.Close()
+       for _, conn := range t.Conns {
+               conn.Close()
+       }
+       return
+}
+
 type pieceByBytesPendingSlice struct {
-       Torrent *Torrent
-       Indices []peer_protocol.Integer
+       Pending, Indices []peer_protocol.Integer
 }
 
 func (pcs pieceByBytesPendingSlice) Len() int {
@@ -210,7 +240,7 @@ func (pcs pieceByBytesPendingSlice) Len() int {
 }
 
 func (me pieceByBytesPendingSlice) Less(i, j int) bool {
-       return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes()
+       return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
 }
 
 func (me pieceByBytesPendingSlice) Swap(i, j int) {
@@ -218,41 +248,42 @@ func (me pieceByBytesPendingSlice) Swap(i, j int) {
 }
 
 func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
-       slice := pieceByBytesPendingSlice{
-               Torrent: t,
-               Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
-       }
+       slice := pieceByBytesPendingSlice{}
        for i := range t.Pieces {
+               slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
                slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
        }
        sort.Sort(sort.Reverse(slice))
        return slice.Indices
 }
 
-func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
+func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
+       piece := t.Pieces[pieceIndex]
+       if piece.Hashing {
+               return
+       }
+       piece.Hashing = true
+       go cl.verifyPiece(t, pieceIndex)
+}
+
+func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       t := cl.torrent(ih)
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
        for len_ > 0 {
                index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
                pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
                piece := t.Pieces[index]
-               if piece.State == pieceStateComplete {
-                       adv := int64(t.PieceLength(index) - pieceOff)
-                       off += adv
-                       len_ -= adv
-                       continue
+               if !piece.EverHashed {
+                       cl.queuePieceCheck(t, index)
                }
                chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
                adv := int64(chunkSize - pieceOff%chunkSize)
                off += adv
                len_ -= adv
-               switch piece.State {
-               case pieceStateIncomplete:
-                       if _, ok := piece.PendingChunkSpecs[chunk]; !ok {
-                               continue
-                       }
-               case pieceStateUnknown:
-               default:
-                       panic("unexpected piece state")
+               if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
+                       continue
                }
                newPriorities = append(newPriorities, Request{index, chunk})
        }
@@ -267,6 +298,9 @@ func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
        for _, req := range newPriorities[1:] {
                t.Priorities.PushBack(req)
        }
+       for _, cn := range t.Conns {
+               cl.replenishConnRequests(t, cn)
+       }
 }
 
 func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
@@ -276,7 +310,7 @@ func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
 
 func (t *Torrent) bitfield() (bf []bool) {
        for _, p := range t.Pieces {
-               bf = append(bf, p.State == pieceStateComplete)
+               bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0)
        }
        return
 }
@@ -347,13 +381,11 @@ type Client struct {
        PeerId        [20]byte
        DataReady     chan DataSpec
 
+       mu    sync.Mutex
+       event sync.Cond
+
        halfOpen int
        torrents map[InfoHash]*Torrent
-
-       noTorrents      chan struct{}
-       addTorrent      chan *Torrent
-       torrentFinished chan InfoHash
-       actorTask       chan func()
 }
 
 var (
@@ -361,54 +393,47 @@ var (
 )
 
 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
-       done := make(chan struct{})
-       cl.withContext(func() {
-               defer func() {
-                       close(done)
-               }()
-               t := cl.torrent(ih)
-               if t == nil {
-                       err = errors.New("unknown torrent")
-                       return
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       t := cl.torrent(ih)
+       if t == nil {
+               err = errors.New("unknown torrent")
+               return
+       }
+       index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+       piece := t.Pieces[index]
+       if !piece.EverHashed {
+               cl.queuePieceCheck(t, index)
+       }
+       if piece.Hashing {
+               err = ErrDataNotReady
+               return
+       }
+       pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+       high := int(t.PieceLength(index) - pieceOff)
+       if high < len(p) {
+               p = p[:high]
+       }
+       for cs, _ := range piece.PendingChunkSpecs {
+               chunkOff := int64(pieceOff) - int64(cs.Begin)
+               if chunkOff >= int64(t.PieceLength(index)) {
+                       panic(chunkOff)
                }
-               index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
-               piece := t.Pieces[index]
-               pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
-               switch piece.State {
-               case pieceStateComplete:
-                       high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0))))
-                       if high < len(p) {
-                               p = p[:high]
-                       }
-               case pieceStateIncomplete:
-                       for cs, _ := range piece.PendingChunkSpecs {
-                               chunkOff := int64(pieceOff - cs.Begin)
-                               if 0 <= chunkOff && chunkOff < int64(cs.Length) {
-                                       // read begins in a pending chunk
-                                       err = ErrDataNotReady
-                                       return
-                               }
-                               // pending chunk caps available data
-                               if chunkOff < 0 && int64(len(p)) > -chunkOff {
-                                       p = p[:-chunkOff]
-                               }
-                       }
-               default:
+               if 0 <= chunkOff && chunkOff < int64(cs.Length) {
+                       // read begins in a pending chunk
                        err = ErrDataNotReady
                        return
                }
-               n, err = t.Data.ReadAt(p, off)
-       })
-       <-done
-       return
+               // pending chunk caps available data
+               if chunkOff < 0 && int64(len(p)) > -chunkOff {
+                       p = p[:-chunkOff]
+               }
+       }
+       return t.Data.ReadAt(p, off)
 }
 
 func (c *Client) Start() {
        c.torrents = make(map[InfoHash]*Torrent)
-       c.noTorrents = make(chan struct{})
-       c.addTorrent = make(chan *Torrent)
-       c.torrentFinished = make(chan InfoHash)
-       c.actorTask = make(chan func())
        if c.HalfOpenLimit == 0 {
                c.HalfOpenLimit = 10
        }
@@ -417,7 +442,6 @@ func (c *Client) Start() {
        if err != nil {
                panic("error generating peer id")
        }
-       go c.run()
 }
 
 func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
@@ -487,16 +511,18 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
                        IP:   peer.IP,
                        Port: peer.Port,
                })
-               me.withContext(func() {
-                       me.halfOpen--
-                       me.openNewConns()
-               })
+
+               me.mu.Lock()
+               me.halfOpen--
+               me.openNewConns()
+               me.mu.Unlock()
+
                if err != nil {
                        log.Printf("error connecting to peer: %s", err)
                        return
                }
                log.Printf("connected to %s", conn.RemoteAddr())
-               err = me.handshake(conn, torrent, peer.Id)
+               err = me.runConnection(conn, torrent, peer.Id)
                if err != nil {
                        log.Print(err)
                }
@@ -505,14 +531,14 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
 
 func (me *Torrent) haveAnyPieces() bool {
        for _, piece := range me.Pieces {
-               if piece.State == pieceStateComplete {
+               if piece.Complete() {
                        return true
                }
        }
        return false
 }
 
-func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
        conn := &connection{
                Socket:     sock,
                Choked:     true,
@@ -531,11 +557,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e
        }
        var b [28]byte
        _, err = io.ReadFull(conn.Socket, b[:])
-       switch err {
-       case nil:
-       case io.EOF:
-               return
-       default:
+       if err != nil {
                err = fmt.Errorf("when reading protocol and extensions: %s", err)
                return
        }
@@ -550,11 +572,11 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e
        var infoHash [20]byte
        _, err = io.ReadFull(conn.Socket, infoHash[:])
        if err != nil {
-               return
+               return fmt.Errorf("reading peer info hash: %s", err)
        }
        _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
        if err != nil {
-               return
+               return fmt.Errorf("reading peer id: %s", err)
        }
        if torrent == nil {
                torrent = me.torrent(infoHash)
@@ -564,22 +586,20 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e
                conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
                conn.post <- peer_protocol.Bytes(me.PeerId[:])
        }
-       done := make(chan struct{})
-       me.withContext(func() {
-               me.addConnection(torrent, conn)
-               if torrent.haveAnyPieces() {
-                       conn.Post(peer_protocol.Message{
-                               Type:     peer_protocol.Bitfield,
-                               Bitfield: torrent.bitfield(),
-                       })
-               }
-               close(done)
-       })
-       <-done
-       defer me.withContext(func() {
-               me.dropConnection(torrent, conn)
-       })
-       err = me.runConnection(torrent, conn)
+       me.mu.Lock()
+       me.addConnection(torrent, conn)
+       if torrent.haveAnyPieces() {
+               conn.Post(peer_protocol.Message{
+                       Type:     peer_protocol.Bitfield,
+                       Bitfield: torrent.bitfield(),
+               })
+       }
+       err = me.connectionLoop(torrent, conn)
+       if err != nil {
+               err = fmt.Errorf("during connection loop: %s", err)
+       }
+       me.dropConnection(torrent, conn)
+       me.mu.Unlock()
        return
 }
 
@@ -594,81 +614,78 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
 }
 
 func (t *Torrent) wantPiece(index int) bool {
-       return t.Pieces[index].State == pieceStateIncomplete
+       return !t.Pieces[index].Complete()
 }
 
 func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
        me.replenishConnRequests(torrent, conn)
 }
 
-func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
+func (me *Client) connectionLoop(torrent *Torrent, conn *connection) error {
        decoder := peer_protocol.Decoder{
                R:         bufio.NewReader(conn.Socket),
                MaxLength: 256 * 1024,
        }
        for {
+               me.mu.Unlock()
                msg := new(peer_protocol.Message)
                err := decoder.Decode(msg)
+               me.mu.Lock()
                if err != nil {
                        return err
                }
+               log.Print(msg.Type)
                if msg.Keepalive {
                        continue
                }
-               go me.withContext(func() {
-                       // log.Print(msg)
-                       var err error
-                       switch msg.Type {
-                       case peer_protocol.Choke:
-                               conn.PeerChoked = true
-                               conn.Requests = nil
-                       case peer_protocol.Unchoke:
-                               conn.PeerChoked = false
-                               me.peerUnchoked(torrent, conn)
-                       case peer_protocol.Interested:
-                               conn.PeerInterested = true
-                       case peer_protocol.NotInterested:
-                               conn.PeerInterested = false
-                       case peer_protocol.Have:
-                               me.peerGotPiece(torrent, conn, int(msg.Index))
-                       case peer_protocol.Request:
-                               conn.PeerRequests[Request{
-                                       Index:     msg.Index,
-                                       ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
-                               }] = struct{}{}
-                       case peer_protocol.Bitfield:
-                               if len(msg.Bitfield) < len(torrent.Pieces) {
-                                       err = errors.New("received invalid bitfield")
-                                       break
-                               }
-                               if conn.PeerPieces != nil {
-                                       err = errors.New("received unexpected bitfield")
-                                       break
-                               }
-                               conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
-                               for index, has := range conn.PeerPieces {
-                                       if has {
-                                               me.peerGotPiece(torrent, conn, index)
-                                       }
-                               }
-                       case peer_protocol.Piece:
-                               request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
-                               if _, ok := conn.Requests[request_]; !ok {
-                                       err = errors.New("unexpected piece")
-                                       break
+               switch msg.Type {
+               case peer_protocol.Choke:
+                       conn.PeerChoked = true
+                       conn.Requests = nil
+               case peer_protocol.Unchoke:
+                       conn.PeerChoked = false
+                       me.peerUnchoked(torrent, conn)
+               case peer_protocol.Interested:
+                       conn.PeerInterested = true
+               case peer_protocol.NotInterested:
+                       conn.PeerInterested = false
+               case peer_protocol.Have:
+                       me.peerGotPiece(torrent, conn, int(msg.Index))
+               case peer_protocol.Request:
+                       conn.PeerRequests[Request{
+                               Index:     msg.Index,
+                               ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
+                       }] = struct{}{}
+               case peer_protocol.Bitfield:
+                       if len(msg.Bitfield) < len(torrent.Pieces) {
+                               err = errors.New("received invalid bitfield")
+                               break
+                       }
+                       if conn.PeerPieces != nil {
+                               err = errors.New("received unexpected bitfield")
+                               break
+                       }
+                       conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
+                       for index, has := range conn.PeerPieces {
+                               if has {
+                                       me.peerGotPiece(torrent, conn, index)
                                }
-                               delete(conn.Requests, request_)
-                               err = me.downloadedChunk(torrent, msg)
-                       default:
-                               log.Printf("received unknown message type: %#v", msg.Type)
                        }
-                       if err != nil {
-                               log.Print(err)
-                               me.dropConnection(torrent, conn)
-                               return
+               case peer_protocol.Piece:
+                       request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+                       if _, ok := conn.Requests[request_]; !ok {
+                               err = errors.New("unexpected piece")
+                               break
                        }
-                       me.replenishConnRequests(torrent, conn)
-               })
+                       delete(conn.Requests, request_)
+                       err = me.downloadedChunk(torrent, msg)
+               default:
+                       log.Printf("received unknown message type: %#v", msg.Type)
+               }
+               if err != nil {
+                       return err
+               }
+               me.replenishConnRequests(torrent, conn)
        }
 }
 
@@ -711,17 +728,16 @@ func (me *Client) openNewConns() {
        }
 }
 
-func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
-       me.withContext(func() {
-               t := me.torrent(infoHash)
-               if t == nil {
-                       err = errors.New("no such torrent")
-                       return
-               }
-               t.Peers = append(t.Peers, peers...)
-               me.openNewConns()
-       })
-       return
+func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
+       me.mu.Lock()
+       t := me.torrent(infoHash)
+       if t == nil {
+               return errors.New("no such torrent")
+       }
+       t.Peers = append(t.Peers, peers...)
+       me.openNewConns()
+       me.mu.Unlock()
+       return nil
 }
 
 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
@@ -734,7 +750,7 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
                if len(hash) != PieceHash.Size() {
                        return errors.New("bad piece hash in metainfo")
                }
-               piece := piece{}
+               piece := &piece{}
                copyHashSum(piece.Hash[:], hash)
                torrent.Pieces = append(torrent.Pieces, piece)
        }
@@ -743,28 +759,34 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        if err != nil {
                return err
        }
-       me.addTorrent <- torrent
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if _, ok := me.torrents[torrent.InfoHash]; ok {
+               return torrent.Close()
+       }
+       me.torrents[torrent.InfoHash] = torrent
        return nil
 }
 
 func (me *Client) WaitAll() {
-       <-me.noTorrents
+       me.mu.Lock()
+       for len(me.torrents) != 0 {
+               me.event.Wait()
+       }
+       me.mu.Unlock()
 }
 
 func (me *Client) Stop() {
 }
 
-func (me *Client) withContext(f func()) {
-       me.actorTask <- f
-}
-
 func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
        requestHeatMap := torrent.requestHeat()
        addRequest := func(req Request) (again bool) {
-               if !conn.PeerPieces[req.Index] {
+               piece := torrent.Pieces[req.Index]
+               if piece.Hashing {
                        return true
                }
-               if torrent.Pieces[req.Index].State != pieceStateIncomplete {
+               if piece.Complete() {
                        return true
                }
                if requestHeatMap[req] > 0 {
@@ -780,7 +802,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
                }
        }
        for _, index := range torrent.piecesByPendingBytesDesc() {
-               if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) {
+               if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
                        continue
                }
                for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
@@ -804,8 +826,7 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
        }
        delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
        if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
-               torrent.Pieces[request.Index].State = pieceStateUnknown
-               go me.verifyPiece(torrent, request.Index)
+               me.queuePieceCheck(torrent, request.Index)
                return
        }
        var next *list.Element
@@ -828,43 +849,37 @@ func (cl *Client) dataReady(ds DataSpec) {
        }()
 }
 
-func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) {
-       torrent := me.torrents[ih]
-       newState := func() pieceState {
-               if correct {
-                       return pieceStateComplete
-               } else {
-                       return pieceStateIncomplete
-               }
-       }()
-       // oldState := torrent.Pieces[piece].State
-       // if newState == oldState {
-       //      return
-       // }
-       torrent.Pieces[piece].State = newState
-       switch newState {
-       case pieceStateIncomplete:
-               torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
-       case pieceStateComplete:
+func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
+       p := t.Pieces[piece]
+       if !p.Hashing {
+               panic("invalid state")
+       }
+       p.Hashing = false
+       p.EverHashed = true
+       if correct {
+               p.PendingChunkSpecs = nil
                var next *list.Element
-               if torrent.Priorities != nil {
-                       for e := torrent.Priorities.Front(); e != nil; e = next {
+               if t.Priorities != nil {
+                       for e := t.Priorities.Front(); e != nil; e = next {
                                next = e.Next()
                                if e.Value.(Request).Index == piece {
-                                       torrent.Priorities.Remove(e)
+                                       t.Priorities.Remove(e)
                                }
                        }
                }
                me.dataReady(DataSpec{
-                       torrent.InfoHash,
+                       t.InfoHash,
                        Request{
                                peer_protocol.Integer(piece),
-                               ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
+                               ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
                        },
                })
-               torrent.Pieces[piece].PendingChunkSpecs = nil
+       } else {
+               if len(p.PendingChunkSpecs) == 0 {
+                       p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
+               }
        }
-       for _, conn := range torrent.Conns {
+       for _, conn := range t.Conns {
                if correct {
                        conn.Post(peer_protocol.Message{
                                Type:  peer_protocol.Have,
@@ -872,69 +887,26 @@ func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct
                        })
                } else {
                        if conn.PeerHasPiece(piece) {
-                               me.replenishConnRequests(torrent, conn)
+                               me.replenishConnRequests(t, conn)
                        }
                }
        }
 }
 
-func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) {
-       sum := torrent.HashPiece(index)
-       done := make(chan struct{})
-       me.withContext(func() {
-               me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash)
-               close(done)
-       })
-       <-done
-}
-
-func (me *Client) run() {
-       for {
-               noTorrents := me.noTorrents
-               if len(me.torrents) != 0 {
-                       noTorrents = nil
-               }
-               select {
-               case noTorrents <- struct{}{}:
-               case torrent := <-me.addTorrent:
-                       if _, ok := me.torrents[torrent.InfoHash]; ok {
-                               break
-                       }
-                       me.torrents[torrent.InfoHash] = torrent
-                       go func() {
-                               for index := range torrent.Pieces {
-                                       me.verifyPiece(torrent, peer_protocol.Integer(index))
-                               }
-                       }()
-               case infoHash := <-me.torrentFinished:
-                       delete(me.torrents, infoHash)
-               case task := <-me.actorTask:
-                       task()
-               }
-       }
+func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
+       sum := t.HashPiece(index)
+       cl.mu.Lock()
+       piece := t.Pieces[index]
+       cl.pieceHashed(t, index, sum == piece.Hash)
+       piece.Hashing = false
+       cl.mu.Unlock()
 }
 
 func (me *Client) Torrents() (ret []*Torrent) {
-       done := make(chan struct{})
-       me.withContext(func() {
-               for _, t := range me.torrents {
-                       ret = append(ret, t)
-               }
-               close(done)
-       })
-       <-done
+       me.mu.Lock()
+       for _, t := range me.torrents {
+               ret = append(ret, t)
+       }
+       me.mu.Unlock()
        return
 }
-
-func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
-       done := make(chan struct{})
-       cl.withContext(func() {
-               t := cl.torrent(ih)
-               t.PrioritizeDataRegion(off, len_)
-               for _, cn := range t.Conns {
-                       cl.replenishConnRequests(t, cn)
-               }
-               close(done)
-       })
-       <-done
-}
index 993dfedbb6e6057e7726c1e12f004242555b113c..7becbc80c39a1737d3590a98b01a6a556ec3b688 100644 (file)
@@ -6,7 +6,7 @@ import (
        "crypto/sha1"
        "flag"
        "fmt"
-       "github.com/davecheney/profile"
+       // "github.com/davecheney/profile"
        metainfo "github.com/nsf/libtorgo/torrent"
        "launchpad.net/gommap"
        "log"
@@ -24,7 +24,7 @@ func init() {
 }
 
 func main() {
-       defer profile.Start(profile.CPUProfile).Stop()
+       // defer profile.Start(profile.CPUProfile).Stop()
        metaInfo, err := metainfo.LoadFromFile(*filePath)
        if err != nil {
                log.Fatal(err)
index 9ea5ab5bcf18e491fccc58cb461dd2b9a71f156f..edfd1d17690e790cc6f8680477b200f136252461 100644 (file)
@@ -5,7 +5,6 @@ import (
        fusefs "bazil.org/fuse/fs"
        "bitbucket.org/anacrolix/go.torrent"
        "flag"
-       "github.com/davecheney/profile"
        metainfo "github.com/nsf/libtorgo/torrent"
        "log"
        "net"
@@ -15,6 +14,7 @@ import (
        "os/user"
        "path/filepath"
        "sync"
+       "time"
 )
 
 var (
@@ -273,15 +273,17 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
 
 func main() {
        pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
+       testPeer := flag.String("testPeer", "", "the address for a test peer")
        flag.Parse()
        log.SetFlags(log.LstdFlags | log.Lshortfile)
        if *pprofAddr != "" {
                go http.ListenAndServe(*pprofAddr, nil)
        }
-       defer profile.Start(profile.CPUProfile).Stop()
+       // defer profile.Start(profile.CPUProfile).Stop()
        client := &torrent.Client{
-               DataDir:   downloadDir,
-               DataReady: make(chan torrent.DataSpec),
+               DataDir:       downloadDir,
+               DataReady:     make(chan torrent.DataSpec),
+               HalfOpenLimit: 2,
        }
        client.Start()
        torrentDir, err := os.Open(torrentPath)
@@ -293,6 +295,13 @@ func main() {
        if err != nil {
                log.Fatal(err)
        }
+       var testAddr *net.TCPAddr
+       if *testPeer != "" {
+               testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
+               if err != nil {
+                       log.Fatal(err)
+               }
+       }
        for _, name := range names {
                metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name))
                if err != nil {
@@ -302,10 +311,6 @@ func main() {
                if err != nil {
                        log.Print(err)
                }
-               client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{
-                       IP:   net.IPv4(127, 0, 0, 1),
-                       Port: 3000,
-               }})
        }
        conn, err := fuse.Mount(mountDir)
        if err != nil {
@@ -316,5 +321,19 @@ func main() {
                DataSubs: make(map[chan torrent.DataSpec]struct{}),
        }
        go fs.publishData()
+       go func() {
+               for {
+                       for _, t := range client.Torrents() {
+                               if testAddr != nil {
+                                       client.AddPeers(t.InfoHash, []torrent.Peer{{
+                                               IP:   testAddr.IP,
+                                               Port: testAddr.Port,
+                                       }})
+                               }
+                       }
+                       time.Sleep(10 * time.Second)
+                       break
+               }
+       }()
        fusefs.Serve(conn, fs)
 }
index 37f97720e915341910d3bf4455bfa8cc8dfb39e7..3fe5365436582b441d1d1e9a4a3bb86f261eedf2 100644 (file)
@@ -68,12 +68,17 @@ func (me MMapSpan) WriteSectionTo(w io.Writer, off, n int64) (written int64, err
 
 func (me MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
        me.span().ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
-               _n := copy(i.(MMap).MMap[iOff:], p)
+               mMap := i.(MMap)
+               _n := copy(mMap.MMap[iOff:], p)
+               // err = mMap.Sync(gommap.MS_ASYNC)
+               // if err != nil {
+               //      return true
+               // }
                p = p[_n:]
                n += _n
                return len(p) == 0
        })
-       if len(p) != 0 {
+       if err != nil && len(p) != 0 {
                err = io.ErrShortWrite
        }
        return