djherbis / bufit Goto Github PK
View Code? Open in Web Editor NEWA moving buffer which supports multiple concurrent readers #golang
License: MIT License
A moving buffer which supports multiple concurrent readers #golang
License: MIT License
Saw this earlier:
panic: runtime error: index out of range
goroutine 102 [running]:
github.com/tmm1/bufit.(*readerHeap).Swap(0xc4205d4568, 0x1, 0x0)
<autogenerated>:8 +0x10c
container/heap.Remove(0x4a90580, 0xc4205d4568, 0x1, 0x405f335, 0xc420973310)
/usr/local/Cellar/go/1.8/libexec/src/container/heap/heap.go:74 +0x9c
github.com/tmm1/bufit.(*Buffer).drop(0xc4205d4540, 0xc420973340)
go/src/github.com/tmm1/bufit/bufit.go:100 +0x85
github.com/tmm1/bufit.(*reader).Close(0xc420973340, 0xc420887600, 0x47a0570)
go/src/github.com/tmm1/bufit/heap.go:66 +0x5c
Possibly fixed already when we added locking to Close()
earlier tonight?
We've run into situations where having Write()
block when the capped buffer is full is not desirable.
It would be nice to have alternative strategies for handling this scenario. The ones that have come to mind are:
error
response to Read()
) as long as there in more than one readerI investigated what it would take to handle this, and it felt like something along the lines of this would be the right direction. I would appreciate feedback if this is a good avenue to pursue and if this is something you would be interested in:
diff --git a/bufit.go b/bufit.go
index ec3a955..2c68f0f 100644
--- a/bufit.go
+++ b/bufit.go
@@ -50,6 +50,14 @@ type Writer interface {
io.Writer
}
+type BufferFullBehavior int
+
+const (
+ BufferFullBehaviorWait BufferFullBehavior = iota
+ BufferFullBehaviorDropExtraReader
+ BufferFullBehaviorDropReader
+)
+
// Buffer is used to provide multiple readers with access to a shared buffer.
// Readers may join/leave at any time, however a joining reader will only
// see whats currently in the buffer onwards. Data is evicted from the buffer
@@ -63,6 +71,7 @@ type Buffer struct {
buf Writer
cap int
keep int
+ bfb BufferFullBehavior
life
callback atomic.Value
}
@@ -225,7 +234,25 @@ func (b *Buffer) Write(p []byte) (int, error) {
for len(p[n:]) > 0 && err == nil { // bytes left to write
for b.cap > 0 && b.buf.Len() == b.cap && b.alive() { // wait for space
- b.wwait.Wait()
+ switch b.bfb {
+ case BufferFullBehaviorWait:
+ b.wwait.Wait()
+ case BufferFullBehaviorDropExtraReader:
+ if len(b.rh) > 1 {
+ r := b.rh.Peek()
+ heap.Remove(&b.rh, r.i)
+ b.shift() // shift to next peek
+
+ continue
+ }
+ case BufferFullBehaviorDropReader:
+ r := b.rh.Peek()
+ heap.Remove(&b.rh, r.i)
+ b.shift() // shift to next peek
+
+ continue
+ }
+ }
}
if !b.alive() {
I noticed this project hasn't received any commits in a while.. is that because it's stable and works, or because it's not being actively used anywhere?
It seems like it would work really well for my use-case, so I'm trying to get a sense of what to expect before I try to replace what I'm currently using.
The only feature that I didn't see built-in is the ability to bound the size of the memory buffer. But it seems like that would be pretty straightforward to implement in a wrapper, by looking at the Len()
of the buffer after every write, and calling Discard()
to drop the oldest bytes once the length grows beyond the limit.
bufit
is useful when multiple consumers need to work off a live stream of data. In this use-case, a Buffer is created on-demand when a consumer tunes in, and can be re-used if other consumers are interested in concurrent access.
When the last reader disconnects, it would be useful to be able to tell the Buffer to automatically close itself.
WDyt about adding Peek()
to the Reader interface? Here's my use-case..
I'm using the new NextReaderFromNow()
on top of a mpegts video stream. mpegts packets are 188 bytes long, where each starts with the byte G
. Since a new reader does not necessarily start at a packet boundary, I'd like to be able to discard all bytes til the first G
.
One way to do this without peek would be to read 188 bytes, find the G
, and then calculate how many more bytes to read and discard. This would mean discarding one or more packets, whereas with Peek I could discard only the bytes required.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.