]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Disable data downloading on storage write errors
authorMatt Joiner <anacrolix@gmail.com>
Sat, 22 Feb 2020 08:40:50 +0000 (19:40 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 22 Feb 2020 08:40:50 +0000 (19:40 +1100)
Also add a hook and Torrent methods to modify this.

peerconn.go
test/issue377_test.go
torrent.go

index 9ff1a693c231d4a6ecef0a6f0ebe7889b2dc420a..ee5cdfd986216ee2509098a4d7ea6c496c969cd3 100644 (file)
@@ -509,7 +509,7 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
 }
 
 func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
-       if !cn.t.networkingEnabled {
+       if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
                if !cn.setInterested(false, msg) {
                        return
                }
@@ -1246,9 +1246,10 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
        piece.decrementPendingWrites()
 
        if err != nil {
-               panic(fmt.Sprintf("error writing chunk: %v", err))
+               c.logger.Printf("error writing received chunk %v: %v", req, err)
                t.pendRequest(req)
-               t.updatePieceCompletion(pieceIndex(msg.Index))
+               //t.updatePieceCompletion(pieceIndex(msg.Index))
+               t.onWriteChunkErr(err)
                return nil
        }
 
index 0c2e3837448d33ee24273b6697ec507ae4077318..40ed8916e0ddb72348156c3a94642d5dbd487ec9 100644 (file)
@@ -2,7 +2,9 @@ package test
 
 import (
        "errors"
+       "log"
        "os"
+       "sync"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -14,11 +16,17 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
+func justOneNetwork(cc *torrent.ClientConfig) {
+       cc.DisableTCP = true
+       cc.DisableIPv4 = true
+}
+
 func TestReceiveChunkStorageFailure(t *testing.T) {
        seederDataDir, metainfo := testutil.GreetingTestTorrent()
        defer os.RemoveAll(seederDataDir)
        seederClientConfig := torrent.TestingConfig()
        seederClientConfig.Debug = true
+       justOneNetwork(seederClientConfig)
        seederClientStorage := storage.NewMMap(seederDataDir)
        defer seederClientStorage.Close()
        seederClientConfig.DefaultStorage = seederClientStorage
@@ -29,57 +37,110 @@ func TestReceiveChunkStorageFailure(t *testing.T) {
        defer testutil.ExportStatusWriter(seederClient, "s")()
        leecherClientConfig := torrent.TestingConfig()
        leecherClientConfig.Debug = true
+       justOneNetwork(leecherClientConfig)
        leecherClient, err := torrent.NewClient(leecherClientConfig)
        require.NoError(t, err)
        defer testutil.ExportStatusWriter(leecherClient, "l")()
+       info, err := metainfo.UnmarshalInfo()
+       require.NoError(t, err)
+       leecherStorage := diskFullStorage{
+               pieces: make([]pieceState, info.NumPieces()),
+               data:   make([]byte, info.TotalLength()),
+       }
+       defer leecherStorage.Close()
        leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{
                InfoHash: metainfo.HashInfoBytes(),
-               Storage:  diskFullStorage{},
+               Storage:  &leecherStorage,
        })
+       leecherStorage.t = leecherTorrent
        require.NoError(t, err)
        assert.True(t, new)
        seederTorrent, err := seederClient.AddTorrent(metainfo)
        require.NoError(t, err)
        // Tell the seeder to find the leecher. Is it guaranteed seeders will always try to do this?
        seederTorrent.AddClientPeer(leecherClient)
-       //leecherTorrent.AddClientPeer(seederClient)
        <-leecherTorrent.GotInfo()
        assertReadAllGreeting(t, leecherTorrent.NewReader())
 }
 
-type diskFullStorage struct{}
+type pieceState struct {
+       complete bool
+}
 
-func (me diskFullStorage) ReadAt(p []byte, off int64) (n int, err error) {
-       panic("implement me")
+type diskFullStorage struct {
+       pieces                        []pieceState
+       t                             *torrent.Torrent
+       defaultHandledWriteChunkError bool
+       data                          []byte
+
+       mu          sync.Mutex
+       diskNotFull bool
 }
 
-func (me diskFullStorage) WriteAt(p []byte, off int64) (n int, err error) {
-       return 1, errors.New("disk full")
+func (me *diskFullStorage) Piece(p metainfo.Piece) storage.PieceImpl {
+       return pieceImpl{
+               mip:             p,
+               diskFullStorage: me,
+       }
 }
 
-func (me diskFullStorage) MarkComplete() error {
-       panic("implement me")
+func (me diskFullStorage) Close() error {
+       return nil
 }
 
-func (me diskFullStorage) MarkNotComplete() error {
-       panic("implement me")
+func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
+       return &d, nil
 }
 
-func (me diskFullStorage) Completion() storage.Completion {
-       return storage.Completion{
-               Complete: false,
-               Ok:       true,
+type pieceImpl struct {
+       mip metainfo.Piece
+       *diskFullStorage
+}
+
+func (me pieceImpl) state() *pieceState {
+       return &me.diskFullStorage.pieces[me.mip.Index()]
+}
+
+func (me pieceImpl) ReadAt(p []byte, off int64) (n int, err error) {
+       off += me.mip.Offset()
+       return copy(p, me.data[off:]), nil
+}
+
+func (me pieceImpl) WriteAt(p []byte, off int64) (int, error) {
+       off += me.mip.Offset()
+       if !me.defaultHandledWriteChunkError {
+               go func() {
+                       me.t.SetOnWriteChunkError(func(err error) {
+                               log.Printf("got write chunk error to custom handler: %v", err)
+                               me.mu.Lock()
+                               me.diskNotFull = true
+                               me.mu.Unlock()
+                               me.t.AllowDataDownload()
+                       })
+                       me.t.AllowDataDownload()
+               }()
+               me.defaultHandledWriteChunkError = true
        }
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.diskNotFull {
+               return copy(me.data[off:], p), nil
+       }
+       return copy(me.data[off:], p[:1]), errors.New("disk full")
 }
 
-func (me diskFullStorage) Piece(metainfo.Piece) storage.PieceImpl {
-       return me
+func (me pieceImpl) MarkComplete() error {
+       me.state().complete = true
+       return nil
 }
 
-func (me diskFullStorage) Close() error {
+func (me pieceImpl) MarkNotComplete() error {
        panic("implement me")
 }
 
-func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
-       return d, nil
+func (me pieceImpl) Completion() storage.Completion {
+       return storage.Completion{
+               Complete: me.state().complete,
+               Ok:       true,
+       }
 }
index 74688c0b2614b947206bacc5d309e2eabcf46587..cd136523f0d50c72a6163d2d93626569c0250f66 100644 (file)
@@ -40,7 +40,9 @@ type Torrent struct {
        cl     *Client
        logger log.Logger
 
-       networkingEnabled bool
+       networkingEnabled      bool
+       dataDownloadDisallowed bool
+       userOnWriteChunkErr    func(error)
 
        // Determines what chunks to request from peers.
        requestStrategy requestStrategy
@@ -1810,3 +1812,42 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
 func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
        return torrentRequestStrategyCallbacks{t}
 }
+
+func (t *Torrent) onWriteChunkErr(err error) {
+       if t.userOnWriteChunkErr != nil {
+               go t.userOnWriteChunkErr(err)
+               return
+       }
+       t.disallowDataDownloadLocked()
+}
+
+func (t *Torrent) DisallowDataDownload() {
+       t.cl.lock()
+       defer t.cl.unlock()
+       t.disallowDataDownloadLocked()
+}
+
+func (t *Torrent) disallowDataDownloadLocked() {
+       log.Printf("disallowing data download")
+       t.dataDownloadDisallowed = true
+       for c := range t.conns {
+               c.updateRequests()
+       }
+}
+
+func (t *Torrent) AllowDataDownload() {
+       t.cl.lock()
+       defer t.cl.unlock()
+       log.Printf("AllowDataDownload")
+       t.dataDownloadDisallowed = false
+       for c := range t.conns {
+               c.updateRequests()
+       }
+
+}
+
+func (t *Torrent) SetOnWriteChunkError(f func(error)) {
+       t.cl.lock()
+       defer t.cl.unlock()
+       t.userOnWriteChunkErr = f
+}