From b9037b9d7c751f3314fb049e0fdad559aee4a8f0 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Fri, 22 Apr 2016 18:15:07 -0700 Subject: [PATCH] converted stream reader to io.ReadCloser --- stream/reader.go | 16 +++++++++++++--- stream/stream.go | 4 ++-- stream/stream_impl.go | 2 +- stream/writer.go | 4 ++-- web/stream2.go | 2 +- 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/stream/reader.go b/stream/reader.go index dade6d7e9..935f0f93c 100644 --- a/stream/reader.go +++ b/stream/reader.go @@ -3,11 +3,13 @@ package stream import ( "bytes" "io" + "sync/atomic" ) type reader struct { - w *writer - off int + w *writer + off int + closed uint32 } // Read reads from the Buffer @@ -31,6 +33,10 @@ func (r *reader) Read(p []byte) (n int, err error) { err = io.EOF break } + if r.Closed() { + err = io.EOF + break + } r.w.Wait() } @@ -39,6 +45,10 @@ func (r *reader) Read(p []byte) (n int, err error) { } func (r *reader) Close() error { - // TODO close should remove reader from the parent! + atomic.StoreUint32(&r.closed, 1) return nil } + +func (r *reader) Closed() bool { + return atomic.LoadUint32(&r.closed) != 0 +} diff --git a/stream/stream.go b/stream/stream.go index 83cd3bc4d..2619b53cb 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -12,7 +12,7 @@ import ( type Stream interface { Create(string) error Delete(string) error - Reader(string) (io.Reader, error) + Reader(string) (io.ReadCloser, error) Writer(string) (io.WriteCloser, error) } @@ -22,7 +22,7 @@ func Create(c context.Context, key string) error { } // Reader opens the stream for reading. -func Reader(c context.Context, key string) (io.Reader, error) { +func Reader(c context.Context, key string) (io.ReadCloser, error) { return FromContext(c).Reader(key) } diff --git a/stream/stream_impl.go b/stream/stream_impl.go index 9be945c9a..8e21aaf9b 100644 --- a/stream/stream_impl.go +++ b/stream/stream_impl.go @@ -19,7 +19,7 @@ func New() Stream { } // Reader returns an io.Reader for reading from to the stream. -func (s *stream) Reader(name string) (io.Reader, error) { +func (s *stream) Reader(name string) (io.ReadCloser, error) { s.Lock() defer s.Unlock() diff --git a/stream/writer.go b/stream/writer.go index 0cb827a12..15a873604 100644 --- a/stream/writer.go +++ b/stream/writer.go @@ -31,7 +31,7 @@ func (w *writer) Write(p []byte) (n int, err error) { return w.buffer.Write(p) } -func (w *writer) Reader() (io.Reader, error) { +func (w *writer) Reader() (io.ReadCloser, error) { return &reader{w: w}, nil } @@ -48,5 +48,5 @@ func (w *writer) Close() error { } func (w *writer) Closed() bool { - return atomic.LoadUint32(&w.closed) == 1 + return atomic.LoadUint32(&w.closed) != 0 } diff --git a/web/stream2.go b/web/stream2.go index c5fff2ad1..91067dd83 100644 --- a/web/stream2.go +++ b/web/stream2.go @@ -99,7 +99,7 @@ func GetStream2(c *gin.Context) { go func() { <-c.Writer.CloseNotify() - // rc.Close() + rc.Close() }() var line int