From 72e54fb353128ef85f00ec7c2c94b8648f1eb889 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 9 Jan 2020 22:58:23 +1100
Subject: [PATCH] Resource per piece storage: Store incomplete chunks
 separately

---
 client_test.go            |   2 +-
 storage/issue95_test.go   |   2 +-
 storage/piece_resource.go | 123 +++++++++++++++++++++++++++++++-------
 3 files changed, 102 insertions(+), 25 deletions(-)

diff --git a/client_test.go b/client_test.go
index 056d6740..372f4bcd 100644
--- a/client_test.go
+++ b/client_test.go
@@ -21,7 +21,7 @@ import (
 	"github.com/anacrolix/dht/v2"
 	_ "github.com/anacrolix/envpprof"
 	"github.com/anacrolix/missinggo"
-	"github.com/anacrolix/missinggo/filecache"
+	"github.com/anacrolix/missinggo/v2/filecache"
 
 	"github.com/anacrolix/torrent/bencode"
 	"github.com/anacrolix/torrent/internal/testutil"
diff --git a/storage/issue95_test.go b/storage/issue95_test.go
index d47410da..4762d80e 100644
--- a/storage/issue95_test.go
+++ b/storage/issue95_test.go
@@ -5,7 +5,7 @@ import (
 	"os"
 	"testing"
 
-	"github.com/anacrolix/missinggo/resource"
+	"github.com/anacrolix/missinggo/v2/resource"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
diff --git a/storage/piece_resource.go b/storage/piece_resource.go
index 1064fae5..e57e8a89 100644
--- a/storage/piece_resource.go
+++ b/storage/piece_resource.go
@@ -1,9 +1,13 @@
 package storage
 
 import (
+	"bytes"
+	"io"
 	"path"
+	"sort"
+	"strconv"
 
-	"github.com/anacrolix/missinggo/resource"
+	"github.com/anacrolix/missinggo/v2/resource"
 
 	"github.com/anacrolix/torrent/metainfo"
 )
@@ -27,51 +31,124 @@ func (s *piecePerResource) Close() error {
 }
 
 func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl {
-	completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString()))
-	if err != nil {
-		panic(err)
-	}
-	incomplete, err := s.p.NewInstance(path.Join("incomplete", p.Hash().HexString()))
-	if err != nil {
-		panic(err)
-	}
 	return piecePerResourcePiece{
-		p: p,
-		c: completed,
-		i: incomplete,
+		mp: p,
+		rp: s.p,
 	}
 }
 
 type piecePerResourcePiece struct {
-	p metainfo.Piece
-	c resource.Instance
-	i resource.Instance
+	mp metainfo.Piece
+	rp resource.Provider
 }
 
 func (s piecePerResourcePiece) Completion() Completion {
-	fi, err := s.c.Stat()
+	fi, err := s.completed().Stat()
 	return Completion{
-		Complete: err == nil && fi.Size() == s.p.Length(),
+		Complete: err == nil && fi.Size() == s.mp.Length(),
 		Ok:       true,
 	}
 }
 
 func (s piecePerResourcePiece) MarkComplete() error {
-	return resource.Move(s.i, s.c)
+	incompleteChunks := s.getChunks()
+	err := s.completed().Put(io.NewSectionReader(incompleteChunks, 0, s.mp.Length()))
+	if err == nil {
+		for _, c := range incompleteChunks {
+			c.instance.Delete()
+		}
+	}
+	return err
 }
 
 func (s piecePerResourcePiece) MarkNotComplete() error {
-	return s.c.Delete()
+	return s.completed().Delete()
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
 	if s.Completion().Complete {
-		return s.c.ReadAt(b, off)
-	} else {
-		return s.i.ReadAt(b, off)
+		return s.completed().ReadAt(b, off)
 	}
+	return s.getChunks().ReadAt(b, off)
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
-	return s.i.WriteAt(b, off)
+	i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
+	if err != nil {
+		panic(err)
+	}
+	r := bytes.NewReader(b)
+	err = i.Put(r)
+	n = len(b) - r.Len()
+	return
+}
+
+type chunk struct {
+	offset   int64
+	instance resource.Instance
+}
+
+type chunks []chunk
+
+func (me chunks) ReadAt(b []byte, off int64) (int, error) {
+	for {
+		if len(me) == 0 {
+			return 0, io.EOF
+		}
+		if me[0].offset <= off {
+			break
+		}
+		me = me[1:]
+	}
+	n, err := me[0].instance.ReadAt(b, off-me[0].offset)
+	if n == len(b) {
+		return n, nil
+	}
+	if err == nil || err == io.EOF {
+		n_, err := me[1:].ReadAt(b[n:], off+int64(n))
+		return n + n_, err
+	}
+	return n, err
+}
+
+func (s piecePerResourcePiece) getChunks() (chunks chunks) {
+	names, err := s.incompleteDir().Readdirnames()
+	if err != nil {
+		return
+	}
+	for _, n := range names {
+		offset, err := strconv.ParseInt(n, 10, 64)
+		if err != nil {
+			continue
+		}
+		i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
+		if err != nil {
+			panic(err)
+		}
+		chunks = append(chunks, chunk{offset, i})
+	}
+	sort.Slice(chunks, func(i, j int) bool {
+		return chunks[i].offset < chunks[j].offset
+	})
+	return
+}
+
+func (s piecePerResourcePiece) completed() resource.Instance {
+	i, err := s.rp.NewInstance(path.Join("completed", s.mp.Hash().HexString()))
+	if err != nil {
+		panic(err)
+	}
+	return i
+}
+
+func (s piecePerResourcePiece) incompleteDirPath() string {
+	return path.Join("incompleted", s.mp.Hash().HexString())
+}
+
+func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
+	i, err := s.rp.NewInstance(s.incompleteDirPath())
+	if err != nil {
+		panic(err)
+	}
+	return i.(resource.DirInstance)
 }
-- 
2.51.0