Giter VIP home page Giter VIP logo

mpmcqueue.net's People

Contributors

alexandrnikitin avatar lust4life avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

mpmcqueue.net's Issues

TryDequeue return True but get null result

using System;
using System.Threading.Tasks;
using BenchmarkDotNet.Running;

namespace MPMCQueue.NET.Benchmarks
{
  class Program
  {
    static MPMCQueue queue { get; } = new MPMCQueue(2);
    static void Enqueue()
    {
      while (true) { queue.TryEnqueue(1); }
    }

    static void Dequeue()
    {
      while (true)
      {
        if (queue.TryDequeue(out object t) && t == null)
        {
          System.Console.WriteLine("Dequeue null");
          break;
        }
      }
    }

    static void Main(string[] args)
    {
      var t1 = Task.Run(() => Enqueue());
      var t2 = Task.Run(() => Dequeue());
      Task.WaitAll(t1, t2);
    }
  }
}

output:

λ  .\benchmarks\MPMCQueue.NET.Benchmarks\bin\Release\MPMCQueue.NET.Benchmarks.exe
Dequeue null
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
    result = Volatile.Read(ref cell.Element);
    buffer[index] = new Cell(pos + bufferMask + 1, null);  // maybe here is the problem, 
//    buffer[index].Element = null;
//    buffer[index].Sequence = pos + bufferMask + 1;
    return true;
}
00007ffe`162b0b6a e891325f5f      call    clr!JIT_CheckedWriteBarrier (00007ffe`758a3e00)
00007ffe`162b0b6f 8d443701        lea     eax,[rdi+rsi+1]
00007ffe`162b0b73 33d2            xor     edx,edx
00007ffe`162b0b75 8903            mov     dword ptr [rbx],eax
00007ffe`162b0b77 48895308        mov     qword ptr [rbx+8],rdx
00007ffe`162b0b7b b801000000      mov     eax,1
00007ffe`162b0b80 4883c420        add     rsp,20h

two mov here, if use struct constructor seems can not guarantee the order. Sequence should be modified after cell.Element

wondering about the usage of volatile in MPMCQueue

https://github.com/alexandrnikitin/MPMCQueue.NET/blob/master/src/MPMCQueue.NET/MPMCQueue.cs#L47
https://github.com/alexandrnikitin/MPMCQueue.NET/blob/master/src/MPMCQueue.NET/MPMCQueue.cs#L70

if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
    Volatile.Write(ref buffer[index].Element, item);
    buffer[index].Sequence = pos + 1;
    return true;
}

....

if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
    result = Volatile.Read(ref cell.Element);
    buffer[index] = new Cell(pos + bufferMask + 1, null);
    return true;
}

@alexandrnikitin want to know why we need volatile.read/write cell.Element here ? can you give me some clue ?

if we have one producer tryEnqueue and one consumer tryDequeue at same time, could we get result below :
tryEnqueue return true
tryDequeue return true but the out result is null.

can an explicit Thread.MemoryBarrier before read cell.element and after write cell.element help here? or the volatile.read/write here does the same thing (are they here to prevent things like above or for other purpose) ?

Infinite loop at int.maxvalue

if (cell.Sequence < pos) and if (cell.Sequence < pos +1)
there will be an infinite loop when pos rolls over to int.minvalue, and the queue is either empty or full.

as per the original (1024 cores), enqueue should be:
int dif = cell._sequence- pos;
if(dif == 0)
if(dif < 0)
and dequeue
int dif = cell._sequence - (pos+1);

An example:
cell.Sequence = 2147483633
pos = -2147483648
dif = -15
however cell.Sequence > pos

Much higher performance with these changes

FYI

  • Do spin with Spinwait, especially given that this queue is not lock-free per author's description.
  • Do not use _bufferMask, but use & _buffer.Length - 1. This probably removes bound checks (already or likely in the future)
  • Use two fields for the buffer, which point to the same buffer. Enqueue and Dequeue load their own field and prefetch their position value. Most importantly, this avoids false sharing while accessing the buffer pointer.
  • Use var ref for Cell.

With these changes performance with 12 threads (2x6 HT) is almost the same as with 1 thread at around 39 millions dequeue+enqueue operations per second. Before the changes it was much worse.

More details here.

Benchmarking code is here. Not sure how that compares to the numbers from readme here in nanos.

BTW, #6 has negative impact.

This code could be copy-pasted to .NET Standard 2.0.

[StructLayout(LayoutKind.Explicit, Size = 384)]
    public class MPMCQueue1
    {
        /// <summary>
        /// 128 bytes cache line already exists in some CPUs.
        /// </summary>
        /// <remarks>
        /// Also "the spatial prefetcher strives to keep pairs of cache lines in the L2 cache."
        /// https://stackoverflow.com/questions/29199779/false-sharing-and-128-byte-alignment-padding
        /// </remarks>
        internal const int SAFE_CACHE_LINE = 128;

        [FieldOffset(SAFE_CACHE_LINE)]
        private readonly Cell[] _enqueueBuffer;

        [FieldOffset(SAFE_CACHE_LINE + 8)]
        private volatile int _enqueuePos;

        // Separate access to buffers from enqueue and dequeue.
        // This removes false sharing and accessing a buffer 
        // reference also prefetches the following Pos with [(64 - (8 + 4 + 4)) = 52]/64 probability.

        [FieldOffset(SAFE_CACHE_LINE * 2)]
        private readonly Cell[] _dequeueBuffer;

        [FieldOffset(SAFE_CACHE_LINE * 2 + 8)]
        private volatile int _dequeuePos;

        public MPMCQueue1(int bufferSize)
        {
            if (bufferSize < 2) throw new ArgumentException($"{nameof(bufferSize)} should be greater than or equal to 2");
            if ((bufferSize & (bufferSize - 1)) != 0) throw new ArgumentException($"{nameof(bufferSize)} should be a power of 2");

            _enqueueBuffer = new Cell[bufferSize];

            for (var i = 0; i < bufferSize; i++)
            {
                _enqueueBuffer[i] = new Cell(i, null);
            }

            _dequeueBuffer = _enqueueBuffer;
            _enqueuePos = 0;
            _dequeuePos = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool TryEnqueue(object item)
        {
            var spinner = new SpinWait();
            do
            {
                var buffer = _enqueueBuffer;
                var pos = _enqueuePos;
                var index = pos & (buffer.Length - 1);
                ref var cell = ref buffer[index];
                if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
                {
                    cell.Element = item;
                    cell.Sequence = pos + 1;
                    return true;
                }

                if (cell.Sequence < pos)
                {
                    return false;
                }

                spinner.SpinOnce();
            } while (true);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool TryDequeue(out object result)
        {
            result = null;
            var spinner = new SpinWait();
            do
            {
                var buffer = _dequeueBuffer;
                var pos = _dequeuePos;
                var index = pos & (buffer.Length - 1);
                ref var cell = ref buffer[index];
                if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
                {
                    result = cell.Element;
                    cell.Element = null;
                    cell.Sequence = pos + buffer.Length;
                    break;
                }

                if (cell.Sequence < pos + 1)
                {
                    break;
                }

                spinner.SpinOnce();
            } while (true);

            return result != null;
        }

        [StructLayout(LayoutKind.Explicit, Size = 16)]
        private struct Cell
        {
            [FieldOffset(0)]
            public volatile int Sequence;

            [FieldOffset(8)]
            public object Element;

            public Cell(int sequence, object element)
            {
                Sequence = sequence;
                Element = element;
            }
        }
    }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.