alexandrnikitin / mpmcqueue.net Goto Github PK
View Code? Open in Web Editor NEWBounded multiple producers multiple consumers queue for .NET
License: MIT License
Bounded multiple producers multiple consumers queue for .NET
License: MIT License
using System;
using System.Threading.Tasks;
using BenchmarkDotNet.Running;
namespace MPMCQueue.NET.Benchmarks
{
class Program
{
static MPMCQueue queue { get; } = new MPMCQueue(2);
static void Enqueue()
{
while (true) { queue.TryEnqueue(1); }
}
static void Dequeue()
{
while (true)
{
if (queue.TryDequeue(out object t) && t == null)
{
System.Console.WriteLine("Dequeue null");
break;
}
}
}
static void Main(string[] args)
{
var t1 = Task.Run(() => Enqueue());
var t2 = Task.Run(() => Dequeue());
Task.WaitAll(t1, t2);
}
}
}
output:
λ .\benchmarks\MPMCQueue.NET.Benchmarks\bin\Release\MPMCQueue.NET.Benchmarks.exe
Dequeue null
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
result = Volatile.Read(ref cell.Element);
buffer[index] = new Cell(pos + bufferMask + 1, null); // maybe here is the problem,
// buffer[index].Element = null;
// buffer[index].Sequence = pos + bufferMask + 1;
return true;
}
00007ffe`162b0b6a e891325f5f call clr!JIT_CheckedWriteBarrier (00007ffe`758a3e00)
00007ffe`162b0b6f 8d443701 lea eax,[rdi+rsi+1]
00007ffe`162b0b73 33d2 xor edx,edx
00007ffe`162b0b75 8903 mov dword ptr [rbx],eax
00007ffe`162b0b77 48895308 mov qword ptr [rbx+8],rdx
00007ffe`162b0b7b b801000000 mov eax,1
00007ffe`162b0b80 4883c420 add rsp,20h
two mov
here, if use struct constructor seems can not guarantee the order. Sequence
should be modified after cell.Element
https://github.com/alexandrnikitin/MPMCQueue.NET/blob/master/src/MPMCQueue.NET/MPMCQueue.cs#L47
https://github.com/alexandrnikitin/MPMCQueue.NET/blob/master/src/MPMCQueue.NET/MPMCQueue.cs#L70
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
Volatile.Write(ref buffer[index].Element, item);
buffer[index].Sequence = pos + 1;
return true;
}
....
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
result = Volatile.Read(ref cell.Element);
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
@alexandrnikitin want to know why we need volatile.read/write
cell.Element here ? can you give me some clue ?
if we have one producer tryEnqueue and one consumer tryDequeue at same time, could we get result below :
tryEnqueue return true
tryDequeue return true but the out result is null.
can an explicit Thread.MemoryBarrier
before read cell.element and after write cell.element help here? or the volatile.read/write here does the same thing (are they here to prevent things like above or for other purpose) ?
if (cell.Sequence < pos) and if (cell.Sequence < pos +1)
there will be an infinite loop when pos rolls over to int.minvalue, and the queue is either empty or full.
as per the original (1024 cores), enqueue should be:
int dif = cell._sequence- pos;
if(dif == 0)
if(dif < 0)
and dequeue
int dif = cell._sequence - (pos+1);
An example:
cell.Sequence = 2147483633
pos = -2147483648
dif = -15
however cell.Sequence > pos
FYI
_bufferMask
, but use & _buffer.Length - 1
. This probably removes bound checks (already or likely in the future)var ref
for Cell.With these changes performance with 12 threads (2x6 HT) is almost the same as with 1 thread at around 39 millions dequeue+enqueue operations per second. Before the changes it was much worse.
More details here.
Benchmarking code is here. Not sure how that compares to the numbers from readme here in nanos.
BTW, #6 has negative impact.
This code could be copy-pasted to .NET Standard 2.0.
[StructLayout(LayoutKind.Explicit, Size = 384)]
public class MPMCQueue1
{
/// <summary>
/// 128 bytes cache line already exists in some CPUs.
/// </summary>
/// <remarks>
/// Also "the spatial prefetcher strives to keep pairs of cache lines in the L2 cache."
/// https://stackoverflow.com/questions/29199779/false-sharing-and-128-byte-alignment-padding
/// </remarks>
internal const int SAFE_CACHE_LINE = 128;
[FieldOffset(SAFE_CACHE_LINE)]
private readonly Cell[] _enqueueBuffer;
[FieldOffset(SAFE_CACHE_LINE + 8)]
private volatile int _enqueuePos;
// Separate access to buffers from enqueue and dequeue.
// This removes false sharing and accessing a buffer
// reference also prefetches the following Pos with [(64 - (8 + 4 + 4)) = 52]/64 probability.
[FieldOffset(SAFE_CACHE_LINE * 2)]
private readonly Cell[] _dequeueBuffer;
[FieldOffset(SAFE_CACHE_LINE * 2 + 8)]
private volatile int _dequeuePos;
public MPMCQueue1(int bufferSize)
{
if (bufferSize < 2) throw new ArgumentException($"{nameof(bufferSize)} should be greater than or equal to 2");
if ((bufferSize & (bufferSize - 1)) != 0) throw new ArgumentException($"{nameof(bufferSize)} should be a power of 2");
_enqueueBuffer = new Cell[bufferSize];
for (var i = 0; i < bufferSize; i++)
{
_enqueueBuffer[i] = new Cell(i, null);
}
_dequeueBuffer = _enqueueBuffer;
_enqueuePos = 0;
_dequeuePos = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryEnqueue(object item)
{
var spinner = new SpinWait();
do
{
var buffer = _enqueueBuffer;
var pos = _enqueuePos;
var index = pos & (buffer.Length - 1);
ref var cell = ref buffer[index];
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
cell.Element = item;
cell.Sequence = pos + 1;
return true;
}
if (cell.Sequence < pos)
{
return false;
}
spinner.SpinOnce();
} while (true);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryDequeue(out object result)
{
result = null;
var spinner = new SpinWait();
do
{
var buffer = _dequeueBuffer;
var pos = _dequeuePos;
var index = pos & (buffer.Length - 1);
ref var cell = ref buffer[index];
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
result = cell.Element;
cell.Element = null;
cell.Sequence = pos + buffer.Length;
break;
}
if (cell.Sequence < pos + 1)
{
break;
}
spinner.SpinOnce();
} while (true);
return result != null;
}
[StructLayout(LayoutKind.Explicit, Size = 16)]
private struct Cell
{
[FieldOffset(0)]
public volatile int Sequence;
[FieldOffset(8)]
public object Element;
public Cell(int sequence, object element)
{
Sequence = sequence;
Element = element;
}
}
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.