From: Matt Joiner Date: Wed, 17 Feb 2016 07:26:10 +0000 (+1100) Subject: Rework Reader reading, and TestCompletedPieceWrongSize X-Git-Tag: v1.0.0~881 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=dcce06115256651014d72d2b2cced6fdfba7a5bd;p=btrtrc.git Rework Reader reading, and TestCompletedPieceWrongSize --- diff --git a/client_test.go b/client_test.go index 20efe8b0..9c2fb646 100644 --- a/client_test.go +++ b/client_test.go @@ -2,14 +2,17 @@ package torrent import ( "encoding/binary" + "errors" "fmt" "io" "io/ioutil" "log" + "math/rand" "net" "net/http" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -434,10 +437,8 @@ func (me badData) WriteAt(b []byte, off int64) (int, error) { } func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) { - written, err := w.Write([]byte("hello")) - if err == nil { - err = io.ErrUnexpectedEOF - } + p := []byte(me.randomlyTruncatedDataString()) + written, err := w.Write(p) return int64(written), err } @@ -446,20 +447,21 @@ func (me badData) PieceComplete(piece int) bool { } func (me badData) PieceCompleted(piece int) error { - return nil + return errors.New("psyyyyyyyche") +} + +func (me badData) randomlyTruncatedDataString() string { + return "hello, world\n"[:rand.Intn(14)] } func (me badData) ReadAt(b []byte, off int64) (n int, err error) { - if off >= 5 { - err = io.EOF - return - } - n = copy(b, []byte("hello")[off:]) - return + r := strings.NewReader(me.randomlyTruncatedDataString()) + return r.ReadAt(b, off) } // We read from a piece which is marked completed, but is missing data. func TestCompletedPieceWrongSize(t *testing.T) { + t.Parallel() cfg := TestingConfig cfg.TorrentDataOpener = func(*metainfo.Info) Data { return badData{} @@ -477,20 +479,14 @@ func TestCompletedPieceWrongSize(t *testing.T) { }, }, }) - if err != nil { - t.Fatal(err) - } - if !new { - t.Fatal("expected new") - } + require.NoError(t, err) + defer tt.Drop() + assert.True(t, new) r := tt.NewReader() defer r.Close() - b := make([]byte, 20) - n, err := io.ReadFull(r, b) - if n != 5 || err != io.ErrUnexpectedEOF { - t.Fatal(n, err) - } - defer tt.Drop() + b, err := ioutil.ReadAll(r) + assert.Len(t, b, 13) + assert.NoError(t, err) } func BenchmarkAddLargeTorrent(b *testing.B) { diff --git a/reader.go b/reader.go index 9d30781d..637c9354 100644 --- a/reader.go +++ b/reader.go @@ -3,6 +3,7 @@ package torrent import ( "errors" "io" + "log" "os" "sync" ) @@ -34,11 +35,7 @@ func (r *Reader) SetReadahead(readahead int64) { } func (r *Reader) readable(off int64) (ret bool) { - // log.Println("readable", off) - // defer func() { - // log.Println("readable", ret) - // }() - if r.t.torrent.isClosed() { + if r.torrentClosed() { return true } req, ok := r.t.torrent.offsetRequest(off) @@ -96,49 +93,77 @@ func (r *Reader) Read(b []byte) (n int, err error) { return } -// Must only return EOF at the end of the torrent. -func (r *Reader) readAt(b []byte, pos int64) (n int, err error) { - // defer func() { - // log.Println(pos, n, err) - // }() - maxLen := r.t.torrent.Info.TotalLength() - pos - if maxLen <= 0 { - err = io.EOF - return - } - if int64(len(b)) > maxLen { - b = b[:maxLen] - } -again: +// Safe to call with or without client lock. +func (r *Reader) torrentClosed() bool { + return r.t.torrent.isClosed() +} + +// Wait until some data should be available to read. Tickles the client if it +// isn't. Returns how much should be readable without blocking. +func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) { r.t.cl.mu.Lock() + defer r.t.cl.mu.Unlock() for !r.readable(pos) { r.waitReadable(pos) } - avail := r.available(pos, int64(len(b))) - // log.Println("available", avail) - r.t.cl.mu.Unlock() - b1 := b[:avail] - pi := int(pos / r.t.Info().PieceLength) - tp := &r.t.torrent.Pieces[pi] - ip := r.t.Info().Piece(pi) - po := pos % ip.Length() - if int64(len(b1)) > ip.Length()-po { - b1 = b1[:ip.Length()-po] - } - tp.waitNoPendingWrites() - n, err = dataReadAt(r.t.torrent.data, b1, pos) - if n != 0 { - err = nil + return r.available(pos, wanted) +} + +// Performs at most one successful read to torrent storage. +func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) { + if pos >= r.t.torrent.length { + err = io.EOF return } - if r.t.torrent.isClosed() { - if err == nil { - err = errors.New("torrent closed") + for { + avail := r.waitAvailable(pos, int64(len(b))) + if avail == 0 { + if r.torrentClosed() { + err = errors.New("torrent closed") + return + } } - return + b1 := b[:avail] + pi := int(pos / r.t.Info().PieceLength) + tp := &r.t.torrent.Pieces[pi] + ip := r.t.Info().Piece(pi) + po := pos % ip.Length() + if int64(len(b1)) > ip.Length()-po { + b1 = b1[:ip.Length()-po] + } + tp.waitNoPendingWrites() + n, err = dataReadAt(r.t.torrent.data, b1, pos) + if n != 0 { + return + } + log.Printf("error reading from torrent storage: %s", err) + r.t.torrent.updatePieceCompletion(pi) + r.t.torrent.updatePiecePriority(pi) } - if err == io.ErrUnexpectedEOF { - goto again +} + +// Must only return EOF at the end of the torrent. Fills b until error or +// valid EOF. Note that the Reader pos is not updated until the read +// completes, this may reduce piece priority recalculation, but also the +// effectiveness of readahead. +func (r *Reader) readAt(b []byte, pos int64) (n int, err error) { + for len(b) != 0 { + var n1 int + n1, err = r.readOnceAt(b, pos) + if n1 == 0 { + if err == nil { + panic("expected error") + } + break + } + b = b[n1:] + n += n1 + pos += int64(n1) + } + if pos >= r.t.torrent.length { + err = io.EOF + } else if err == io.EOF { + err = io.ErrUnexpectedEOF } return }