Socket: sock,
Choked: true,
PeerChoked: true,
- write: make(chan []byte),
+ writeCh: make(chan []byte),
PeerMaxRequests: 250, // Default in libtorrent is 250.
}
+ go conn.writer()
defer func() {
// There's a lock and deferred unlock later in this function. The
// client will not be locked when this deferred is invoked.
defer me.mu.Unlock()
conn.Close()
}()
- go conn.writer()
// go conn.writeOptimizer()
- conn.write <- pp.Bytes(pp.Protocol)
- conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00")
+ conn.write(pp.Bytes(pp.Protocol))
+ conn.write(pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00"))
if torrent != nil {
- conn.write <- pp.Bytes(torrent.InfoHash[:])
- conn.write <- pp.Bytes(me.PeerId[:])
+ conn.write(pp.Bytes(torrent.InfoHash[:]))
+ conn.write(pp.Bytes(me.PeerId[:]))
}
var b [28]byte
_, err = io.ReadFull(conn.Socket, b[:])
if torrent == nil {
return
}
- conn.write <- pp.Bytes(torrent.InfoHash[:])
- conn.write <- pp.Bytes(me.PeerId[:])
+ conn.write(pp.Bytes(torrent.InfoHash[:]))
+ conn.write(pp.Bytes(me.PeerId[:]))
}
me.mu.Lock()
defer me.mu.Unlock()
closed bool
mu sync.Mutex // Only for closing.
post chan peer_protocol.Message
- write chan []byte
+ writeCh chan []byte
// Stuff controlled by the local peer.
Interested bool
PeerClientName string
}
+func (cn *connection) write(b []byte) {
+ cn.mu.Lock()
+ cn.writeCh <- b
+ cn.mu.Unlock()
+}
+
func (cn *connection) completedString() string {
if cn.PeerPieces == nil {
return "?"
if c.post == nil {
// writeOptimizer isn't running, so we need to signal the writer to
// stop.
- close(c.write)
+ close(c.writeCh)
} else {
// This will kill the writeOptimizer, and it kills the writer.
close(c.post)
// Writes buffers to the socket from the write channel.
func (conn *connection) writer() {
- for b := range conn.write {
+ for b := range conn.writeCh {
_, err := conn.Socket.Write(b)
// log.Printf("wrote %q to %s", b, conn.Socket.RemoteAddr())
if err != nil {
break
}
}
+ conn.Close()
}
func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
- defer close(conn.write) // Responsible for notifying downstream routines.
- pending := list.New() // Message queue.
- var nextWrite []byte // Set to nil if we need to need to marshal the next message.
+ defer close(conn.writeCh) // Responsible for notifying downstream routines.
+ pending := list.New() // Message queue.
+ var nextWrite []byte // Set to nil if we need to need to marshal the next message.
timer := time.NewTimer(keepAliveDelay)
defer timer.Stop()
lastWrite := time.Now()
for {
- write := conn.write // Set to nil if there's nothing to write.
+ write := conn.writeCh // Set to nil if there's nothing to write.
if pending.Len() == 0 {
write = nil
} else if nextWrite == nil {
PeerMaxRequests: 1,
PeerPieces: []bool{false, true},
post: make(chan peer_protocol.Message),
- write: make(chan []byte),
+ writeCh: make(chan []byte),
}
if len(c.Requests) != 0 {
t.FailNow()
// Let a keep-alive through to verify there were no pending messages.
"\x00\x00\x00\x00",
} {
- bb := string(<-c.write)
+ bb := string(<-c.writeCh)
if b != bb {
t.Fatalf("received message %q is not expected: %q", bb, b)
}
}
close(c.post)
// Drain the write channel until it closes.
- for b := range c.write {
+ for b := range c.writeCh {
bs := string(b)
if bs != "\x00\x00\x00\x00" {
t.Fatal("got unexpected non-keepalive")