stream.go | 40 ++++++++++++++++++++++++++++++++++++---- stream_test.go | 7 ++----- streams_map.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/stream.go b/stream.go index af6cff4acca772bcdca36621ad1b5bc1437f4361..2903d95bb9b874bd807b130e69d2dd50658f5fd3 100644 --- a/stream.go +++ b/stream.go @@ -13,19 +13,29 @@ #include // Uses the same signature as Go, no need for proxy int go_readcallback(void *p, unsigned char *buf, int nbytes); + */ import "C" type Stream struct { + id uintptr oggfile *C.OggOpusFile read io.Reader // Preallocated buffer to pass to the reader buf []byte } +var streams = newStreamsMap() + //export go_readcallback func go_readcallback(p unsafe.Pointer, cbuf *C.uchar, cmaxbytes C.int) C.int { - stream := (*Stream)(p) + streamId := uintptr(p) + stream := streams.Get(streamId) + if stream == nil { + // This is bad + return -1 + } + maxbytes := int(cmaxbytes) if maxbytes > cap(stream.buf) { maxbytes = cap(stream.buf) @@ -33,8 +43,15 @@ } // Don't bother cleaning up old data because that's not required by the // io.Reader API. n, err := stream.read.Read(stream.buf[:maxbytes]) - if (err != nil && err != io.EOF) || n == 0 { - return 0 + // Go allows returning non-nil error (like EOF) and n>0, libopusfile doesn't + // expect that. So return n first to indicate the valid bytes, let the + // subsequent call (which will be n=0, same-error) handle the actual error. + if n == 0 && err != nil { + if err == io.EOF { + return 0 + } else { + return -1 + } } C.memcpy(unsafe.Pointer(cbuf), unsafe.Pointer(&stream.buf[0]), C.size_t(n)) return C.int(n) @@ -67,11 +84,22 @@ } if read == nil { return fmt.Errorf("Reader must be non-nil") } + s.read = read s.buf = make([]byte, maxEncodedFrameSize) + s.id = streams.NextId() var errno C.int + + // Immediately delete the stream after .Init to avoid leaking if the + // caller forgets to (/ doesn't want to) call .Delete(). No need for that, + // since the callback is only ever called during a .Read operation; just + // Save and Delete from the map around that every time a reader function is + // called. + streams.Save(s) + defer streams.Del(s) oggfile := C.op_open_callbacks( - unsafe.Pointer(s), + // "C code may not keep a copy of a Go pointer after the call returns." + unsafe.Pointer(s.id), &callbacks, nil, 0, @@ -100,6 +128,8 @@ } if len(pcm) == 0 { return 0, nil } + streams.Save(s) + defer streams.Del(s) n := C.op_read( s.oggfile, (*C.opus_int16)(&pcm[0]), @@ -121,6 +151,8 @@ } if len(pcm) == 0 { return 0, nil } + streams.Save(s) + defer streams.Del(s) n := C.op_read_float( s.oggfile, (*C.float)(&pcm[0]), diff --git a/stream_test.go b/stream_test.go index a2c02038f6b5ba60659e986ad2cd5f6755f2c386..20eb4a26ba4d6d388a097c600cfcb7eedec90727 100644 --- a/stream_test.go +++ b/stream_test.go @@ -64,10 +64,7 @@ } func opus2pcm(t *testing.T, fname string, buffersize int) []int16 { reader := mustOpenFile(t, fname) - stream, err := NewStream(reader) - if err != nil { - t.Fatalf("Error while creating opus stream: %v", err) - } + stream := mustOpenStream(t, reader) return readStreamPcm(t, stream, buffersize) } @@ -115,7 +112,7 @@ t.Fatalf("Unexpected length of decoded opus file: %d (.wav: %d)", len(opuspcm), len(wavpcm)) } d := maxDiff(opuspcm, wavpcm) // No science behind this number - const epsilon = 12 + const epsilon = 18 if d > epsilon { t.Errorf("Maximum difference between decoded streams too high: %d", d) } diff --git a/streams_map.go b/streams_map.go new file mode 100644 index 0000000000000000000000000000000000000000..ef666c0d6603500779c2181960f83d92dabc3f19 --- /dev/null +++ b/streams_map.go @@ -0,0 +1,58 @@ +package opus + +import ( + "sync" + "sync/atomic" +) + +// A map of simple integers to the actual pointers to stream structs. Avoids +// passing pointers into the Go heap to C. +// +// As per the CGo pointers design doc for go 1.6: +// +// A particular unsafe area is C code that wants to hold on to Go func and +// pointer values for future callbacks from C to Go. This works today but is not +// permitted by the invariant. It is hard to detect. One safe approach is: Go +// code that wants to preserve funcs/pointers stores them into a map indexed by +// an int. Go code calls the C code, passing the int, which the C code may store +// freely. When the C code wants to call into Go, it passes the int to a Go +// function that looks in the map and makes the call. An explicit call is +// required to release the value from the map if it is no longer needed, but +// that was already true before. +// +// - https://github.com/golang/proposal/blob/master/design/12416-cgo-pointers.md +type streamsMap struct { + sync.RWMutex + m map[uintptr]*Stream + counter uintptr +} + +func (sm *streamsMap) Get(id uintptr) *Stream { + sm.RLock() + defer sm.RUnlock() + return sm.m[id] +} + +func (sm *streamsMap) Del(s *Stream) { + sm.Lock() + defer sm.Unlock() + delete(sm.m, s.id) +} + +// NextId returns a unique ID for each call. +func (sm *streamsMap) NextId() uintptr { + return atomic.AddUintptr(&sm.counter, 1) +} + +func (sm *streamsMap) Save(s *Stream) { + sm.Lock() + defer sm.Unlock() + sm.m[s.id] = s +} + +func newStreamsMap() *streamsMap { + return &streamsMap{ + counter: 0, + m: map[uintptr]*Stream{}, + } +}