From d039436f55244c308e80c999c80089530b88edec Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 21 Oct 2013 01:07:01 +1100 Subject: [PATCH] Significant progress and improvements Piece state is broken up into several dimensions. Implement keep-alive in connection writer. Lazily hash pieces, only as requested. Replace client actor mechanism with a mutex. Fix runConnection/connectionLoop/handshake misnomers. Fix broken reading from partially complete pieces. --- client.go | 512 ++++++++++++++++++------------------- cmd/torrent-verify/main.go | 4 +- cmd/torrentfs/main.go | 35 ++- mmap_span.go | 9 +- 4 files changed, 278 insertions(+), 282 deletions(-) diff --git a/client.go b/client.go index f6d07a7b..69535861 100644 --- a/client.go +++ b/client.go @@ -17,11 +17,13 @@ import ( "os" "path/filepath" "sort" + "sync" + "time" ) const ( PieceHash = crypto.SHA1 - maxRequests = 10 + maxRequests = 250 chunkSize = 0x4000 // 16KiB BEP20 = "-GT0000-" ) @@ -45,21 +47,31 @@ func BytesInfoHash(b []byte) (ih InfoHash) { type pieceState uint8 -const ( - pieceStateUnknown = iota - pieceStateComplete - pieceStateIncomplete -) - type piece struct { - State pieceState Hash pieceSum PendingChunkSpecs map[ChunkSpec]struct{} + Hashing bool + EverHashed bool +} + +func (p *piece) Complete() bool { + return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed +} + +func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) { + cs.Begin = (pieceLength - 1) / chunkSize * chunkSize + cs.Length = pieceLength - cs.Begin + return } -func (p piece) NumPendingBytes() (count peer_protocol.Integer) { - for cs, _ := range p.PendingChunkSpecs { - count += cs.Length +func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) { + pendingChunks := t.Pieces[index].PendingChunkSpecs + count = peer_protocol.Integer(len(pendingChunks)) * chunkSize + _lastChunkSpec := lastChunkSpec(t.PieceLength(index)) + if _lastChunkSpec.Length != chunkSize { + if _, ok := pendingChunks[_lastChunkSpec]; ok { + count += _lastChunkSpec.Length - chunkSize + } } return } @@ -145,11 +157,22 @@ func (c *connection) SetInterested(interested bool) { c.Interested = interested } +var ( + keepAliveBytes [4]byte +) + func (conn *connection) writer() { for { - b := <-conn.write - if b == nil { - break + timer := time.NewTimer(time.Minute) + var b []byte + select { + case <-timer.C: + b = keepAliveBytes[:] + case b = <-conn.write: + timer.Stop() + if b == nil { + return + } } n, err := conn.Socket.Write(b) if err != nil { @@ -192,7 +215,7 @@ func (conn *connection) writeOptimizer() { type Torrent struct { InfoHash InfoHash - Pieces []piece + Pieces []*piece Data MMapSpan MetaInfo *metainfo.MetaInfo Conns []*connection @@ -200,9 +223,16 @@ type Torrent struct { Priorities *list.List } +func (t *Torrent) Close() (err error) { + t.Data.Close() + for _, conn := range t.Conns { + conn.Close() + } + return +} + type pieceByBytesPendingSlice struct { - Torrent *Torrent - Indices []peer_protocol.Integer + Pending, Indices []peer_protocol.Integer } func (pcs pieceByBytesPendingSlice) Len() int { @@ -210,7 +240,7 @@ func (pcs pieceByBytesPendingSlice) Len() int { } func (me pieceByBytesPendingSlice) Less(i, j int) bool { - return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes() + return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]] } func (me pieceByBytesPendingSlice) Swap(i, j int) { @@ -218,41 +248,42 @@ func (me pieceByBytesPendingSlice) Swap(i, j int) { } func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { - slice := pieceByBytesPendingSlice{ - Torrent: t, - Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)), - } + slice := pieceByBytesPendingSlice{} for i := range t.Pieces { + slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i))) slice.Indices = append(slice.Indices, peer_protocol.Integer(i)) } sort.Sort(sort.Reverse(slice)) return slice.Indices } -func (t *Torrent) PrioritizeDataRegion(off, len_ int64) { +func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) { + piece := t.Pieces[pieceIndex] + if piece.Hashing { + return + } + piece.Hashing = true + go cl.verifyPiece(t, pieceIndex) +} + +func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { + cl.mu.Lock() + defer cl.mu.Unlock() + t := cl.torrent(ih) newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) for len_ > 0 { index := peer_protocol.Integer(off / int64(t.PieceLength(0))) pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) piece := t.Pieces[index] - if piece.State == pieceStateComplete { - adv := int64(t.PieceLength(index) - pieceOff) - off += adv - len_ -= adv - continue + if !piece.EverHashed { + cl.queuePieceCheck(t, index) } chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} adv := int64(chunkSize - pieceOff%chunkSize) off += adv len_ -= adv - switch piece.State { - case pieceStateIncomplete: - if _, ok := piece.PendingChunkSpecs[chunk]; !ok { - continue - } - case pieceStateUnknown: - default: - panic("unexpected piece state") + if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing { + continue } newPriorities = append(newPriorities, Request{index, chunk}) } @@ -267,6 +298,9 @@ func (t *Torrent) PrioritizeDataRegion(off, len_ int64) { for _, req := range newPriorities[1:] { t.Priorities.PushBack(req) } + for _, cn := range t.Conns { + cl.replenishConnRequests(t, cn) + } } func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { @@ -276,7 +310,7 @@ func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { func (t *Torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { - bf = append(bf, p.State == pieceStateComplete) + bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0) } return } @@ -347,13 +381,11 @@ type Client struct { PeerId [20]byte DataReady chan DataSpec + mu sync.Mutex + event sync.Cond + halfOpen int torrents map[InfoHash]*Torrent - - noTorrents chan struct{} - addTorrent chan *Torrent - torrentFinished chan InfoHash - actorTask chan func() } var ( @@ -361,54 +393,47 @@ var ( ) func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) { - done := make(chan struct{}) - cl.withContext(func() { - defer func() { - close(done) - }() - t := cl.torrent(ih) - if t == nil { - err = errors.New("unknown torrent") - return + cl.mu.Lock() + defer cl.mu.Unlock() + t := cl.torrent(ih) + if t == nil { + err = errors.New("unknown torrent") + return + } + index := peer_protocol.Integer(off / int64(t.PieceLength(0))) + piece := t.Pieces[index] + if !piece.EverHashed { + cl.queuePieceCheck(t, index) + } + if piece.Hashing { + err = ErrDataNotReady + return + } + pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) + high := int(t.PieceLength(index) - pieceOff) + if high < len(p) { + p = p[:high] + } + for cs, _ := range piece.PendingChunkSpecs { + chunkOff := int64(pieceOff) - int64(cs.Begin) + if chunkOff >= int64(t.PieceLength(index)) { + panic(chunkOff) } - index := peer_protocol.Integer(off / int64(t.PieceLength(0))) - piece := t.Pieces[index] - pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) - switch piece.State { - case pieceStateComplete: - high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0)))) - if high < len(p) { - p = p[:high] - } - case pieceStateIncomplete: - for cs, _ := range piece.PendingChunkSpecs { - chunkOff := int64(pieceOff - cs.Begin) - if 0 <= chunkOff && chunkOff < int64(cs.Length) { - // read begins in a pending chunk - err = ErrDataNotReady - return - } - // pending chunk caps available data - if chunkOff < 0 && int64(len(p)) > -chunkOff { - p = p[:-chunkOff] - } - } - default: + if 0 <= chunkOff && chunkOff < int64(cs.Length) { + // read begins in a pending chunk err = ErrDataNotReady return } - n, err = t.Data.ReadAt(p, off) - }) - <-done - return + // pending chunk caps available data + if chunkOff < 0 && int64(len(p)) > -chunkOff { + p = p[:-chunkOff] + } + } + return t.Data.ReadAt(p, off) } func (c *Client) Start() { c.torrents = make(map[InfoHash]*Torrent) - c.noTorrents = make(chan struct{}) - c.addTorrent = make(chan *Torrent) - c.torrentFinished = make(chan InfoHash) - c.actorTask = make(chan func()) if c.HalfOpenLimit == 0 { c.HalfOpenLimit = 10 } @@ -417,7 +442,6 @@ func (c *Client) Start() { if err != nil { panic("error generating peer id") } - go c.run() } func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) { @@ -487,16 +511,18 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { IP: peer.IP, Port: peer.Port, }) - me.withContext(func() { - me.halfOpen-- - me.openNewConns() - }) + + me.mu.Lock() + me.halfOpen-- + me.openNewConns() + me.mu.Unlock() + if err != nil { log.Printf("error connecting to peer: %s", err) return } log.Printf("connected to %s", conn.RemoteAddr()) - err = me.handshake(conn, torrent, peer.Id) + err = me.runConnection(conn, torrent, peer.Id) if err != nil { log.Print(err) } @@ -505,14 +531,14 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { func (me *Torrent) haveAnyPieces() bool { for _, piece := range me.Pieces { - if piece.State == pieceStateComplete { + if piece.Complete() { return true } } return false } -func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) { +func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) { conn := &connection{ Socket: sock, Choked: true, @@ -531,11 +557,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e } var b [28]byte _, err = io.ReadFull(conn.Socket, b[:]) - switch err { - case nil: - case io.EOF: - return - default: + if err != nil { err = fmt.Errorf("when reading protocol and extensions: %s", err) return } @@ -550,11 +572,11 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e var infoHash [20]byte _, err = io.ReadFull(conn.Socket, infoHash[:]) if err != nil { - return + return fmt.Errorf("reading peer info hash: %s", err) } _, err = io.ReadFull(conn.Socket, conn.PeerId[:]) if err != nil { - return + return fmt.Errorf("reading peer id: %s", err) } if torrent == nil { torrent = me.torrent(infoHash) @@ -564,22 +586,20 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (e conn.post <- peer_protocol.Bytes(torrent.InfoHash[:]) conn.post <- peer_protocol.Bytes(me.PeerId[:]) } - done := make(chan struct{}) - me.withContext(func() { - me.addConnection(torrent, conn) - if torrent.haveAnyPieces() { - conn.Post(peer_protocol.Message{ - Type: peer_protocol.Bitfield, - Bitfield: torrent.bitfield(), - }) - } - close(done) - }) - <-done - defer me.withContext(func() { - me.dropConnection(torrent, conn) - }) - err = me.runConnection(torrent, conn) + me.mu.Lock() + me.addConnection(torrent, conn) + if torrent.haveAnyPieces() { + conn.Post(peer_protocol.Message{ + Type: peer_protocol.Bitfield, + Bitfield: torrent.bitfield(), + }) + } + err = me.connectionLoop(torrent, conn) + if err != nil { + err = fmt.Errorf("during connection loop: %s", err) + } + me.dropConnection(torrent, conn) + me.mu.Unlock() return } @@ -594,81 +614,78 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) { } func (t *Torrent) wantPiece(index int) bool { - return t.Pieces[index].State == pieceStateIncomplete + return !t.Pieces[index].Complete() } func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) { me.replenishConnRequests(torrent, conn) } -func (me *Client) runConnection(torrent *Torrent, conn *connection) error { +func (me *Client) connectionLoop(torrent *Torrent, conn *connection) error { decoder := peer_protocol.Decoder{ R: bufio.NewReader(conn.Socket), MaxLength: 256 * 1024, } for { + me.mu.Unlock() msg := new(peer_protocol.Message) err := decoder.Decode(msg) + me.mu.Lock() if err != nil { return err } + log.Print(msg.Type) if msg.Keepalive { continue } - go me.withContext(func() { - // log.Print(msg) - var err error - switch msg.Type { - case peer_protocol.Choke: - conn.PeerChoked = true - conn.Requests = nil - case peer_protocol.Unchoke: - conn.PeerChoked = false - me.peerUnchoked(torrent, conn) - case peer_protocol.Interested: - conn.PeerInterested = true - case peer_protocol.NotInterested: - conn.PeerInterested = false - case peer_protocol.Have: - me.peerGotPiece(torrent, conn, int(msg.Index)) - case peer_protocol.Request: - conn.PeerRequests[Request{ - Index: msg.Index, - ChunkSpec: ChunkSpec{msg.Begin, msg.Length}, - }] = struct{}{} - case peer_protocol.Bitfield: - if len(msg.Bitfield) < len(torrent.Pieces) { - err = errors.New("received invalid bitfield") - break - } - if conn.PeerPieces != nil { - err = errors.New("received unexpected bitfield") - break - } - conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)] - for index, has := range conn.PeerPieces { - if has { - me.peerGotPiece(torrent, conn, index) - } - } - case peer_protocol.Piece: - request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} - if _, ok := conn.Requests[request_]; !ok { - err = errors.New("unexpected piece") - break + switch msg.Type { + case peer_protocol.Choke: + conn.PeerChoked = true + conn.Requests = nil + case peer_protocol.Unchoke: + conn.PeerChoked = false + me.peerUnchoked(torrent, conn) + case peer_protocol.Interested: + conn.PeerInterested = true + case peer_protocol.NotInterested: + conn.PeerInterested = false + case peer_protocol.Have: + me.peerGotPiece(torrent, conn, int(msg.Index)) + case peer_protocol.Request: + conn.PeerRequests[Request{ + Index: msg.Index, + ChunkSpec: ChunkSpec{msg.Begin, msg.Length}, + }] = struct{}{} + case peer_protocol.Bitfield: + if len(msg.Bitfield) < len(torrent.Pieces) { + err = errors.New("received invalid bitfield") + break + } + if conn.PeerPieces != nil { + err = errors.New("received unexpected bitfield") + break + } + conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)] + for index, has := range conn.PeerPieces { + if has { + me.peerGotPiece(torrent, conn, index) } - delete(conn.Requests, request_) - err = me.downloadedChunk(torrent, msg) - default: - log.Printf("received unknown message type: %#v", msg.Type) } - if err != nil { - log.Print(err) - me.dropConnection(torrent, conn) - return + case peer_protocol.Piece: + request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} + if _, ok := conn.Requests[request_]; !ok { + err = errors.New("unexpected piece") + break } - me.replenishConnRequests(torrent, conn) - }) + delete(conn.Requests, request_) + err = me.downloadedChunk(torrent, msg) + default: + log.Printf("received unknown message type: %#v", msg.Type) + } + if err != nil { + return err + } + me.replenishConnRequests(torrent, conn) } } @@ -711,17 +728,16 @@ func (me *Client) openNewConns() { } } -func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) (err error) { - me.withContext(func() { - t := me.torrent(infoHash) - if t == nil { - err = errors.New("no such torrent") - return - } - t.Peers = append(t.Peers, peers...) - me.openNewConns() - }) - return +func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { + me.mu.Lock() + t := me.torrent(infoHash) + if t == nil { + return errors.New("no such torrent") + } + t.Peers = append(t.Peers, peers...) + me.openNewConns() + me.mu.Unlock() + return nil } func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { @@ -734,7 +750,7 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { if len(hash) != PieceHash.Size() { return errors.New("bad piece hash in metainfo") } - piece := piece{} + piece := &piece{} copyHashSum(piece.Hash[:], hash) torrent.Pieces = append(torrent.Pieces, piece) } @@ -743,28 +759,34 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { if err != nil { return err } - me.addTorrent <- torrent + me.mu.Lock() + defer me.mu.Unlock() + if _, ok := me.torrents[torrent.InfoHash]; ok { + return torrent.Close() + } + me.torrents[torrent.InfoHash] = torrent return nil } func (me *Client) WaitAll() { - <-me.noTorrents + me.mu.Lock() + for len(me.torrents) != 0 { + me.event.Wait() + } + me.mu.Unlock() } func (me *Client) Stop() { } -func (me *Client) withContext(f func()) { - me.actorTask <- f -} - func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { requestHeatMap := torrent.requestHeat() addRequest := func(req Request) (again bool) { - if !conn.PeerPieces[req.Index] { + piece := torrent.Pieces[req.Index] + if piece.Hashing { return true } - if torrent.Pieces[req.Index].State != pieceStateIncomplete { + if piece.Complete() { return true } if requestHeatMap[req] > 0 { @@ -780,7 +802,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { } } for _, index := range torrent.piecesByPendingBytesDesc() { - if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) { + if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) { continue } for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs { @@ -804,8 +826,7 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) } delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec) if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 { - torrent.Pieces[request.Index].State = pieceStateUnknown - go me.verifyPiece(torrent, request.Index) + me.queuePieceCheck(torrent, request.Index) return } var next *list.Element @@ -828,43 +849,37 @@ func (cl *Client) dataReady(ds DataSpec) { }() } -func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) { - torrent := me.torrents[ih] - newState := func() pieceState { - if correct { - return pieceStateComplete - } else { - return pieceStateIncomplete - } - }() - // oldState := torrent.Pieces[piece].State - // if newState == oldState { - // return - // } - torrent.Pieces[piece].State = newState - switch newState { - case pieceStateIncomplete: - torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece) - case pieceStateComplete: +func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) { + p := t.Pieces[piece] + if !p.Hashing { + panic("invalid state") + } + p.Hashing = false + p.EverHashed = true + if correct { + p.PendingChunkSpecs = nil var next *list.Element - if torrent.Priorities != nil { - for e := torrent.Priorities.Front(); e != nil; e = next { + if t.Priorities != nil { + for e := t.Priorities.Front(); e != nil; e = next { next = e.Next() if e.Value.(Request).Index == piece { - torrent.Priorities.Remove(e) + t.Priorities.Remove(e) } } } me.dataReady(DataSpec{ - torrent.InfoHash, + t.InfoHash, Request{ peer_protocol.Integer(piece), - ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))}, + ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))}, }, }) - torrent.Pieces[piece].PendingChunkSpecs = nil + } else { + if len(p.PendingChunkSpecs) == 0 { + p.PendingChunkSpecs = t.pieceChunkSpecs(piece) + } } - for _, conn := range torrent.Conns { + for _, conn := range t.Conns { if correct { conn.Post(peer_protocol.Message{ Type: peer_protocol.Have, @@ -872,69 +887,26 @@ func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct }) } else { if conn.PeerHasPiece(piece) { - me.replenishConnRequests(torrent, conn) + me.replenishConnRequests(t, conn) } } } } -func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) { - sum := torrent.HashPiece(index) - done := make(chan struct{}) - me.withContext(func() { - me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash) - close(done) - }) - <-done -} - -func (me *Client) run() { - for { - noTorrents := me.noTorrents - if len(me.torrents) != 0 { - noTorrents = nil - } - select { - case noTorrents <- struct{}{}: - case torrent := <-me.addTorrent: - if _, ok := me.torrents[torrent.InfoHash]; ok { - break - } - me.torrents[torrent.InfoHash] = torrent - go func() { - for index := range torrent.Pieces { - me.verifyPiece(torrent, peer_protocol.Integer(index)) - } - }() - case infoHash := <-me.torrentFinished: - delete(me.torrents, infoHash) - case task := <-me.actorTask: - task() - } - } +func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) { + sum := t.HashPiece(index) + cl.mu.Lock() + piece := t.Pieces[index] + cl.pieceHashed(t, index, sum == piece.Hash) + piece.Hashing = false + cl.mu.Unlock() } func (me *Client) Torrents() (ret []*Torrent) { - done := make(chan struct{}) - me.withContext(func() { - for _, t := range me.torrents { - ret = append(ret, t) - } - close(done) - }) - <-done + me.mu.Lock() + for _, t := range me.torrents { + ret = append(ret, t) + } + me.mu.Unlock() return } - -func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { - done := make(chan struct{}) - cl.withContext(func() { - t := cl.torrent(ih) - t.PrioritizeDataRegion(off, len_) - for _, cn := range t.Conns { - cl.replenishConnRequests(t, cn) - } - close(done) - }) - <-done -} diff --git a/cmd/torrent-verify/main.go b/cmd/torrent-verify/main.go index 993dfedb..7becbc80 100644 --- a/cmd/torrent-verify/main.go +++ b/cmd/torrent-verify/main.go @@ -6,7 +6,7 @@ import ( "crypto/sha1" "flag" "fmt" - "github.com/davecheney/profile" + // "github.com/davecheney/profile" metainfo "github.com/nsf/libtorgo/torrent" "launchpad.net/gommap" "log" @@ -24,7 +24,7 @@ func init() { } func main() { - defer profile.Start(profile.CPUProfile).Stop() + // defer profile.Start(profile.CPUProfile).Stop() metaInfo, err := metainfo.LoadFromFile(*filePath) if err != nil { log.Fatal(err) diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 9ea5ab5b..edfd1d17 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -5,7 +5,6 @@ import ( fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" "flag" - "github.com/davecheney/profile" metainfo "github.com/nsf/libtorgo/torrent" "log" "net" @@ -15,6 +14,7 @@ import ( "os/user" "path/filepath" "sync" + "time" ) var ( @@ -273,15 +273,17 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) { func main() { pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address") + testPeer := flag.String("testPeer", "", "the address for a test peer") flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) if *pprofAddr != "" { go http.ListenAndServe(*pprofAddr, nil) } - defer profile.Start(profile.CPUProfile).Stop() + // defer profile.Start(profile.CPUProfile).Stop() client := &torrent.Client{ - DataDir: downloadDir, - DataReady: make(chan torrent.DataSpec), + DataDir: downloadDir, + DataReady: make(chan torrent.DataSpec), + HalfOpenLimit: 2, } client.Start() torrentDir, err := os.Open(torrentPath) @@ -293,6 +295,13 @@ func main() { if err != nil { log.Fatal(err) } + var testAddr *net.TCPAddr + if *testPeer != "" { + testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer) + if err != nil { + log.Fatal(err) + } + } for _, name := range names { metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name)) if err != nil { @@ -302,10 +311,6 @@ func main() { if err != nil { log.Print(err) } - client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{ - IP: net.IPv4(127, 0, 0, 1), - Port: 3000, - }}) } conn, err := fuse.Mount(mountDir) if err != nil { @@ -316,5 +321,19 @@ func main() { DataSubs: make(map[chan torrent.DataSpec]struct{}), } go fs.publishData() + go func() { + for { + for _, t := range client.Torrents() { + if testAddr != nil { + client.AddPeers(t.InfoHash, []torrent.Peer{{ + IP: testAddr.IP, + Port: testAddr.Port, + }}) + } + } + time.Sleep(10 * time.Second) + break + } + }() fusefs.Serve(conn, fs) } diff --git a/mmap_span.go b/mmap_span.go index 37f97720..3fe53654 100644 --- a/mmap_span.go +++ b/mmap_span.go @@ -68,12 +68,17 @@ func (me MMapSpan) WriteSectionTo(w io.Writer, off, n int64) (written int64, err func (me MMapSpan) WriteAt(p []byte, off int64) (n int, err error) { me.span().ApplyTo(off, func(iOff int64, i sizer) (stop bool) { - _n := copy(i.(MMap).MMap[iOff:], p) + mMap := i.(MMap) + _n := copy(mMap.MMap[iOff:], p) + // err = mMap.Sync(gommap.MS_ASYNC) + // if err != nil { + // return true + // } p = p[_n:] n += _n return len(p) == 0 }) - if len(p) != 0 { + if err != nil && len(p) != 0 { err = io.ErrShortWrite } return -- 2.48.1