]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Store chunk data without holding client lock
authorMatt Joiner <anacrolix@gmail.com>
Wed, 15 Jul 2015 06:00:59 +0000 (16:00 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 15 Jul 2015 06:00:59 +0000 (16:00 +1000)
client.go
piece.go
reader.go
torrent.go

index 63d9e0caa9c55e00aede9ebb6084e57b2c75f792..2c4d4d42c8689d59233e80cc97bc765824d5830b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1432,6 +1432,7 @@ another:
 
 func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
        b := make([]byte, r.Length)
+       t.Pieces[r.Index].pendingWrites.Wait()
        p := t.Info.Piece(int(r.Index))
        n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
        if err != nil {
@@ -2523,12 +2524,18 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        me.upload(t, c)
 
-       // Write the chunk out.
-       err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
-       if err != nil {
-               log.Printf("error writing chunk: %s", err)
-               return nil
-       }
+       piece.pendingWrites.Add(1)
+       go func() {
+               defer piece.pendingWrites.Done()
+               // Write the chunk out.
+               tr := perf.NewTimer()
+               err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+               if err != nil {
+                       log.Printf("error writing chunk: %s", err)
+                       return
+               }
+               tr.Stop("write chunk")
+       }()
 
        // log.Println("got chunk", req)
        piece.Event.Broadcast()
index e326c2b4e526adf3bee30888305968c5fb05c7a2..9a3a172917e55a2943958ecf0d67ce8e69c85352 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,6 +30,7 @@ type piece struct {
        EverHashed        bool
        Event             sync.Cond
        Priority          piecePriority
+       pendingWrites     sync.WaitGroup
 }
 
 func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
index 02dee9698c9d8323fdd1975f80e2bb6a534280aa..0ba8d5117016971db7bb4a68fa8f8b9295a9360a 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -121,6 +121,14 @@ again:
        avail := r.available(pos, int64(len(b)))
        // log.Println("available", avail)
        b1 := b[:avail]
+       pi := int(pos / r.t.Info().PieceLength)
+       tp := r.t.torrent.Pieces[pi]
+       ip := r.t.Info().Piece(pi)
+       po := pos % ip.Length()
+       if int64(len(b1)) > ip.Length()-po {
+               b1 = b1[:ip.Length()-po]
+       }
+       tp.pendingWrites.Wait()
        n, err = dataReadAt(r.t.data, b1, pos)
        if n != 0 {
                err = nil
index c4bb59905fa9fdcb91986371795c3092ce628cf5..43d45d5c8c99f16aa53cab968b83dd60605ca484 100644 (file)
@@ -638,6 +638,8 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
 
 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
        hash := pieceHash.New()
+       p := t.Pieces[piece]
+       p.pendingWrites.Wait()
        t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
        util.CopyExact(ps[:], hash.Sum(nil))
        return