]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework Reader reading, and TestCompletedPieceWrongSize
authorMatt Joiner <anacrolix@gmail.com>
Wed, 17 Feb 2016 07:26:10 +0000 (18:26 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 17 Feb 2016 07:26:10 +0000 (18:26 +1100)
client_test.go
reader.go

index 20efe8b0247f499c9224c16c724fe4dfe2dcc13f..9c2fb64630e81519ab30e89bf7f8294454a005f0 100644 (file)
@@ -2,14 +2,17 @@ package torrent
 
 import (
        "encoding/binary"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
        "log"
+       "math/rand"
        "net"
        "net/http"
        "os"
        "path/filepath"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -434,10 +437,8 @@ func (me badData) WriteAt(b []byte, off int64) (int, error) {
 }
 
 func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
-       written, err := w.Write([]byte("hello"))
-       if err == nil {
-               err = io.ErrUnexpectedEOF
-       }
+       p := []byte(me.randomlyTruncatedDataString())
+       written, err := w.Write(p)
        return int64(written), err
 }
 
@@ -446,20 +447,21 @@ func (me badData) PieceComplete(piece int) bool {
 }
 
 func (me badData) PieceCompleted(piece int) error {
-       return nil
+       return errors.New("psyyyyyyyche")
+}
+
+func (me badData) randomlyTruncatedDataString() string {
+       return "hello, world\n"[:rand.Intn(14)]
 }
 
 func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
-       if off >= 5 {
-               err = io.EOF
-               return
-       }
-       n = copy(b, []byte("hello")[off:])
-       return
+       r := strings.NewReader(me.randomlyTruncatedDataString())
+       return r.ReadAt(b, off)
 }
 
 // We read from a piece which is marked completed, but is missing data.
 func TestCompletedPieceWrongSize(t *testing.T) {
+       t.Parallel()
        cfg := TestingConfig
        cfg.TorrentDataOpener = func(*metainfo.Info) Data {
                return badData{}
@@ -477,20 +479,14 @@ func TestCompletedPieceWrongSize(t *testing.T) {
                        },
                },
        })
-       if err != nil {
-               t.Fatal(err)
-       }
-       if !new {
-               t.Fatal("expected new")
-       }
+       require.NoError(t, err)
+       defer tt.Drop()
+       assert.True(t, new)
        r := tt.NewReader()
        defer r.Close()
-       b := make([]byte, 20)
-       n, err := io.ReadFull(r, b)
-       if n != 5 || err != io.ErrUnexpectedEOF {
-               t.Fatal(n, err)
-       }
-       defer tt.Drop()
+       b, err := ioutil.ReadAll(r)
+       assert.Len(t, b, 13)
+       assert.NoError(t, err)
 }
 
 func BenchmarkAddLargeTorrent(b *testing.B) {
index 9d30781de8ebff7ff5979165fd3cd73faa01c5a7..637c93548c1cd22c9ea588443f5db5c11cf1bbce 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -3,6 +3,7 @@ package torrent
 import (
        "errors"
        "io"
+       "log"
        "os"
        "sync"
 )
@@ -34,11 +35,7 @@ func (r *Reader) SetReadahead(readahead int64) {
 }
 
 func (r *Reader) readable(off int64) (ret bool) {
-       // log.Println("readable", off)
-       // defer func() {
-       //      log.Println("readable", ret)
-       // }()
-       if r.t.torrent.isClosed() {
+       if r.torrentClosed() {
                return true
        }
        req, ok := r.t.torrent.offsetRequest(off)
@@ -96,49 +93,77 @@ func (r *Reader) Read(b []byte) (n int, err error) {
        return
 }
 
-// Must only return EOF at the end of the torrent.
-func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
-       // defer func() {
-       //      log.Println(pos, n, err)
-       // }()
-       maxLen := r.t.torrent.Info.TotalLength() - pos
-       if maxLen <= 0 {
-               err = io.EOF
-               return
-       }
-       if int64(len(b)) > maxLen {
-               b = b[:maxLen]
-       }
-again:
+// Safe to call with or without client lock.
+func (r *Reader) torrentClosed() bool {
+       return r.t.torrent.isClosed()
+}
+
+// Wait until some data should be available to read. Tickles the client if it
+// isn't. Returns how much should be readable without blocking.
+func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
        r.t.cl.mu.Lock()
+       defer r.t.cl.mu.Unlock()
        for !r.readable(pos) {
                r.waitReadable(pos)
        }
-       avail := r.available(pos, int64(len(b)))
-       // log.Println("available", avail)
-       r.t.cl.mu.Unlock()
-       b1 := b[:avail]
-       pi := int(pos / r.t.Info().PieceLength)
-       tp := &r.t.torrent.Pieces[pi]
-       ip := r.t.Info().Piece(pi)
-       po := pos % ip.Length()
-       if int64(len(b1)) > ip.Length()-po {
-               b1 = b1[:ip.Length()-po]
-       }
-       tp.waitNoPendingWrites()
-       n, err = dataReadAt(r.t.torrent.data, b1, pos)
-       if n != 0 {
-               err = nil
+       return r.available(pos, wanted)
+}
+
+// Performs at most one successful read to torrent storage.
+func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
+       if pos >= r.t.torrent.length {
+               err = io.EOF
                return
        }
-       if r.t.torrent.isClosed() {
-               if err == nil {
-                       err = errors.New("torrent closed")
+       for {
+               avail := r.waitAvailable(pos, int64(len(b)))
+               if avail == 0 {
+                       if r.torrentClosed() {
+                               err = errors.New("torrent closed")
+                               return
+                       }
                }
-               return
+               b1 := b[:avail]
+               pi := int(pos / r.t.Info().PieceLength)
+               tp := &r.t.torrent.Pieces[pi]
+               ip := r.t.Info().Piece(pi)
+               po := pos % ip.Length()
+               if int64(len(b1)) > ip.Length()-po {
+                       b1 = b1[:ip.Length()-po]
+               }
+               tp.waitNoPendingWrites()
+               n, err = dataReadAt(r.t.torrent.data, b1, pos)
+               if n != 0 {
+                       return
+               }
+               log.Printf("error reading from torrent storage: %s", err)
+               r.t.torrent.updatePieceCompletion(pi)
+               r.t.torrent.updatePiecePriority(pi)
        }
-       if err == io.ErrUnexpectedEOF {
-               goto again
+}
+
+// Must only return EOF at the end of the torrent. Fills b until error or
+// valid EOF. Note that the Reader pos is not updated until the read
+// completes, this may reduce piece priority recalculation, but also the
+// effectiveness of readahead.
+func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
+       for len(b) != 0 {
+               var n1 int
+               n1, err = r.readOnceAt(b, pos)
+               if n1 == 0 {
+                       if err == nil {
+                               panic("expected error")
+                       }
+                       break
+               }
+               b = b[n1:]
+               n += n1
+               pos += int64(n1)
+       }
+       if pos >= r.t.torrent.length {
+               err = io.EOF
+       } else if err == io.EOF {
+               err = io.ErrUnexpectedEOF
        }
        return
 }