Giter VIP home page Giter VIP logo

1brc's Introduction

1๏ธโƒฃ๐Ÿ๐ŸŽ๏ธ The One Billion Row Challenge

The challenge: compute simple floating-point math over 1 billion rows. As fast as possible, without dependencies.

Implemented in standard C11 with POSIX threads (however, no SIMD). analyze.c contains the fastest implementation, while {1..7}.c contain slower versions of the same program.

I wrote up some implmentation details on my blog here: https://www.dannyvankooten.com/blog/2024/1brc/

Running the challenge

First, compile the two programs using any capable C-compiler.

make

To compile in debug mode:

DEBUG=1 make

By default, Make will attempt to find the number of threads to use from nproc or sysctl. To compile while specifying the number of threads to use explicitly:

NTHREADS=8 make

Create the measurements file with 1B rows

bin/create-sample 1000000000

This will create a 12 GB file with 1B rows named measurements.txt in your current working directory. The program to create this sample file will take a minute or two, but you only need to run it once.

Run the challenge:

time bin/analyze measurements.txt >/dev/null

real	0m1.392s
user	0m0.000s
sys	    0m0.010sys

Note: the performance difference between a warm and a hot pagecache is quite extreme. Run echo 3 > /proc/sys/vm/drop_caches to drop your pagecache, then run the program twice in a row. It's not uncommon for the second run to be well over twice as fast.

Benchmarks

Since I don't have access to a Hetzner CCX33 box, here are the reference times for the currently leading Java implementations from the official challenge when I run them on my machine.

# Result (m:s.ms) Implementation Language Submitter
? 00:01.590 link C Danny van Kooten
1. 00:06.131 link 21.0.1-graalce Sam Pullara
2. 00:06.421 link 21.0.1-graalce Roy van Rijn

Progressions

You can find the average runtime (across 5 consecutive runs) for the various states of the program below, from baseline to the final and fully optimized version. Because I have no patience, this was run on a measurements file with only 100M rows.

1.c runtime=[ 55.86 59.09 64.28 63.63 56.08 ] average=59.79s   linear-search by city name (baseline)
2.c runtime=[ 9.14 9.31 9.35 9.05 9.30 ] average=9.23s hashmap with linear probing
3.c runtime=[ 4.27 4.51 4.47 4.28 4.25 ] average=4.36s custom temperature float parser instead of strod
4.c runtime=[ 2.38 2.41 2.46 2.40 2.39 ] average=2.41s fread with 64MB chunks instead of line-by-line
5.c runtime=[ 2.13 1.99 1.99 2.00 2.05 ] average=2.03s unroll parsing of city name and generating hash
6.c runtime=[ 0.49 0.49 0.49 0.50 0.50 ] average=0.49s parallelize across 16 threads
7.c runtime=[ 0.30 0.25 0.23 0.24 0.24 ] average=0.25s mmap entire file instead of fread in chunks

You can run the benchmark script for all progressions by executing ./run-progressions.sh (needs bash, make, time and awk).

1brc's People

Contributors

dannyvankooten avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

1brc's Issues

OSX: warning: unknown warning option '-Wtrampolines'

lvmc@Luizs-MacBook-Pro 1brc % make      
mkdir -p bin/
cc -O2 -march=native -Wall -Wextra -Wpedantic -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough create-sample.c -lm -o bin/create-sample
warning: unknown warning option '-Wtrampolines' [-Wunknown-warning-option]
1 warning generated.
cc -O2 -march=native -Wall -Wextra -Wpedantic -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough analyze.c -o bin/analyze
warning: unknown warning option '-Wtrampolines' [-Wunknown-warning-option]
1 warning generated.

I ran your code on Dual EPYC 9354 128 threads machines

// https://github.com/gunnarmorling/1brc/discussions/46
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>

// Capacity of our hashmap
// Since we use linear probing this needs to be at least twice as big
// as the # of distinct strings in our dataset
// Also must be power of 2 so we can use bit-and instead of modulo
#define HCAP (4096 * 4)
#define MAX_DISTINCT_GROUPS 16384
#define MAX_GROUPBY_KEY_LENGTH 100
#define NTHREADS 128

// branchless min/max (on some machines at least)
#define min(a, b) (a ^ ((b ^ a) & -(b < a)));
#define max(a, b) (a ^ ((a ^ b) & -(a < b)));

// parses a floating point number as an integer
// this is only possible because we know our data file has only a single decimal
static inline const char *parse_number(int *dest, const char *s) {

  // parse sign
  int mod;
  if (*s == '-') {
    mod = -1;
    s++;
  } else {
    mod = 1;
  }

  if (s[1] == '.') {
    *dest = ((s[0] * 10) + s[2] - ('0' * 11)) * mod;
    return s + 4;
  }

  *dest = (s[0] * 100 + s[1] * 10 + s[3] - '0' * 111) * mod;
  return s + 5;
}

// hash returns a simple (but fast) hash for the first n bytes of data
static unsigned int hash(const unsigned char *data, int n) {
  unsigned int hash = 0;

  for (int i = 0; i < n; i++) {
    hash = (hash * 31) + data[i];
  }

  return hash;
}

struct Group {
  unsigned int count;
  long sum;
  int min;
  int max;
  char *label;
};

struct Result {
  int map[HCAP];
  int n;
  char labels[MAX_DISTINCT_GROUPS][MAX_GROUPBY_KEY_LENGTH];
  struct Group groups[MAX_DISTINCT_GROUPS];
};

struct Chunk {
  size_t start;
  size_t end;
  const char *data;
};

// qsort callback
static int cmp(const void *ptr_a, const void *ptr_b) {
  return strcmp(((struct Group *)ptr_a)->label, ((struct Group *)ptr_b)->label);
}

static inline unsigned int
hash_probe(int map[HCAP],
           char groups[MAX_DISTINCT_GROUPS][MAX_GROUPBY_KEY_LENGTH],
           const char *start, int len) {
  // probe map until free spot or match
  unsigned int h = hash((unsigned char *)start, len) & (HCAP - 1);
  while (map[h] >= 0 && memcmp(groups[map[h]], start, (size_t)len) != 0) {
    h = (h + 1) & (HCAP - 1);
  }

  return h;
}

static void *process_chunk(void *ptr) {
  struct Chunk *ch = (struct Chunk *)ptr;

  // skip start forward until SOF or after next newline
  if (ch->start > 0) {
    while (ch->data[ch->start - 1] != '\n') {
      ch->start++;
    }
  }

  while (ch->data[ch->end] != 0x0 && ch->data[ch->end - 1] != '\n') {
    ch->end++;
  }

  struct Result *result = malloc(sizeof(*result));
  if (!result) {
    perror("malloc error");
    exit(EXIT_FAILURE);
  }
  result->n = 0;
  memset(result->labels, 0,
         MAX_DISTINCT_GROUPS * MAX_GROUPBY_KEY_LENGTH * sizeof(char));
  memset(result->map, -1, HCAP * sizeof(int));

  const char *s = &ch->data[ch->start];
  const char *end = &ch->data[ch->end];
  const char *linestart;
  unsigned int h;
  int temperature;
  int len;
  int c;

  while (s != end) {
    linestart = s;

    // hash everything up to ';'
    // assumption: key is at least 1 char
    len = 1;
    h = (unsigned char)s[0];
    while (s[len] != ';') {
      h = (h * 31) + (unsigned char)s[len++];
    }

    // parse decimal number as int
    s = parse_number(&temperature, s + len + 1);

    // probe map until free spot or match
    h = h & (HCAP - 1);
    while (result->map[h] >= 0 && memcmp(result->labels[result->map[h]],
                                         linestart, (size_t)len) != 0) {
      h = (h + 1) & (HCAP - 1);
    }
    c = result->map[h];

    if (c < 0) {
      memcpy(result->labels[result->n], linestart, (size_t)len);
      result->labels[result->n][len] = 0x0;
      result->groups[result->n].label = result->labels[result->n];
      result->groups[result->n].count = 1;
      result->groups[result->n].sum = temperature;
      result->groups[result->n].min = temperature;
      result->groups[result->n].max = temperature;
      result->map[h] = result->n++;
    } else {
      result->groups[c].count += 1;
      result->groups[c].sum += temperature;
      result->groups[c].min = min(result->groups[c].min, temperature);
      result->groups[c].max = max(result->groups[c].max, temperature);
    }
  }

  return (void *)result;
}

void result_to_str(char *dest, const struct Result *result) {
  char buf[128];
  *dest++ = '{';
  for (int i = 0; i < result->n; i++) {
    size_t n = (size_t)sprintf(
        buf, "%s=%.1f/%.1f/%.1f", result->groups[i].label,
        (float)result->groups[i].min / 10.0,
        ((float)result->groups[i].sum / (float)result->groups[i].count) / 10.0,
        (float)result->groups[i].max / 10.0);

    memcpy(dest, buf, n);
    if (i < result->n - 1) {
      memcpy(dest + n, ", ", 2);
      n += 2;
    }

    dest += n;
  }
  *dest++ = '}';
  *dest = 0x0;
}

int main(int argc, char **argv) {
  struct timespec start_time, end_time;
  clock_gettime(CLOCK_MONOTONIC, &start_time);

  char *file = "measurements.txt";
  if (argc > 1) {
    file = argv[1];
  }

  int fd = open(file, O_RDONLY);
  if (!fd) {
    perror("error opening file");
    exit(EXIT_FAILURE);
  }

  struct stat sb;
  if (fstat(fd, &sb) == -1) {
    perror("error getting file size");
    exit(EXIT_FAILURE);
  }

  // mmap entire file into memory
  size_t sz = (size_t)sb.st_size;
  const char *data = mmap(NULL, sz, PROT_READ, MAP_PRIVATE, fd, 0);
  if (data == MAP_FAILED) {
    perror("error mmapping file");
    exit(EXIT_FAILURE);
  }

  // distribute work among N worker threads
  pthread_t workers[NTHREADS];
  struct Chunk chunks[NTHREADS];
  size_t chunk_size = sz / (size_t)NTHREADS;
  for (int i = 0; i < NTHREADS; i++) {
    chunks[i].data = data;
    chunks[i].start = chunk_size * (size_t)i;
    chunks[i].end = chunk_size * ((size_t)i + 1);
    pthread_create(&workers[i], NULL, process_chunk, &chunks[i]);
  }

  // wait for all threads to finish
  struct Result *results[NTHREADS];
  for (int i = 0; i < NTHREADS; i++) {
    pthread_join(workers[i], (void *)&results[i]);
  }

  // merge results
  char *label;
  struct Group *b;
  unsigned int h;
  int c;
  struct Result *result = results[0];
  for (int i = 1; i < NTHREADS; i++) {
    for (int j = 0; j < results[i]->n; j++) {
      b = &results[i]->groups[j];
      label = results[i]->labels[j];
      h = hash_probe(result->map, result->labels, label, (int)strlen(label));

      // TODO: Refactor lines below, we can share some logic with process_chunk
      c = result->map[h];
      if (c >= 0) {
        result->groups[c].count += b->count;
        result->groups[c].sum += b->sum;
        result->groups[c].min = min(result->groups[c].min, b->min);
        result->groups[c].max = max(result->groups[c].max, b->max);
      } else {
        // memcpy(&result->groups[result->n], b, sizeof(*b));
        strcpy(result->labels[result->n], label);
        result->groups[result->n].count = b->count;
        result->groups[result->n].sum = b->sum;
        result->groups[result->n].min = b->min;
        result->groups[result->n].max = b->max;
        result->groups[result->n].label = result->labels[result->n];
        result->map[h] = result->n++;
      }
    }
  }

  // sort results alphabetically
  qsort(result->groups, (size_t)result->n, sizeof(struct Group), cmp);

  // prepare output string
  char buf[(1 << 10) * 16];
  result_to_str(buf, result);
  puts(buf);

  clock_gettime(CLOCK_MONOTONIC, &end_time);
  double elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;

  printf("Runtime inside main = %fms\n", elapsed_time);

  // // clean-up

  clock_gettime(CLOCK_MONOTONIC, &start_time);
  munmap(data, sz);
  clock_gettime(CLOCK_MONOTONIC, &end_time);
  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("munmap cost = %fms\n", elapsed_time);

  clock_gettime(CLOCK_MONOTONIC, &start_time);
  close(fd);
  for (int i = 0; i < NTHREADS; i++) {
    free(results[i]);
  }
  clock_gettime(CLOCK_MONOTONIC, &end_time);

  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("free memory cost = %fms\n", elapsed_time);
  
  // exit(EXIT_SUCCESS);
}

// Runtime inside main = 343.277755ms
// munmap cost = 216.992869ms
// free memory cost = 19.699456ms
// real    0m0.582s
// user    0m29.590s
// sys     0m2.204s

// Dual EPYC 9354 128 threads
// ```
// Runtime inside main = 343.277755ms
// munmap cost = 216.992869ms
// free memory cost = 19.699456ms
// real    0m0.582s
// user    0m29.590s
// sys     0m2.204s
// ```

// AMD 2950X, 32 threads
// ```
// Runtime inside main = 979.854644ms
// munmap cost = 152.057272ms
// free memory cost = 3.055444ms
// real    0m1.137s
// user    0m28.855s
// sys     0m0.734s
// ```

// AMD 2950X, 1 thread
// ```
// Runtime inside main = 18154.117726ms
// munmap cost = 156.046306ms
// free memory cost = 0.126220ms
// real    0m18.312s
// user    0m17.956s
// sys     0m0.348s
// ```

Why not SIMD?

Perhaps it's a silly question, but why not SIMD? I think it would actually make the challenge much more interesting combine with the no external deps.

Even because unless you are using something like musl libc you are using SIMD via the glibc, but I didn't see this requirement.

bug: beyond the bound of chunk

// this assumes the file ends in a newline...
    const char *end = (char *)memchr(&data[chunk_end], '\n', chunk_size) + 1;

data[chunk_end] is over the limit, the end of the chunk is data[chunk_end-1]

when the last workthread access the data[chunk_end], make SIGSEGV. i try nthread of 2,4,8. the problem issues.

What's the memory bandwidth of your machine?

Curious about the memory bandwidth of your machine. If you can, I'd be interested in the results of the C++ program below. Compile with full optimization of course.

#include <random>
#include <chrono>
#include <iostream>
#include <future>

using uint = unsigned int;
using ulong = unsigned long;

// Size of data buffer used to test.
const std::size_t N = 1'000'000'000;
uint data[N];

// Worker function.  Sums a chunk of memory.
unsigned int
sum(const uint *const begin, const uint *const end) {

    uint sum = 0;
    for (const uint *p = begin; p < end; p++) {
        sum += *p;
    }

    return sum;
}

// Wrapper function that spawns threads and times them.
void
time(int n_threads) {

    std::vector<std::future<uint>> futures;

    // Make it a double because it might not divide evenly.
    double chunk_size = double(N)/n_threads;

    auto start = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < n_threads; i++) {
        futures.push_back(std::async(sum, data + ulong(i*chunk_size), data + ulong((i + 1)*chunk_size)));
    }

    // Add up all the individual sums.
    uint sum = 0;
    for (auto &f : futures) {
        sum += f.get();
    }

    auto stop = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> secs = stop - start;
    std::cerr << "    " << sizeof(data)/secs.count() << std::endl;

    std::cerr << "    To prevent optimizing out all ops: " << sum << std::endl;
}

int
main() {

    /*
     * Fill with some random numbers.  PRNGs are slow, though,
     * so mostly just use the index.
     */

    std::default_random_engine eng;
    std::uniform_int_distribution<uint> dist(0, 0xffffffffU);

    for (std::size_t i = 0; i < 1'000'000'000; i++) {
        // Only set every 1000 numbers to a random number.
        if (i%1000 == 0) {
            data[i] = dist(eng);
        } else {
            data[i] = i;
        }
    }

    /*
     * Now do the timing.
     */

    for (int i = 1; i < 10; i++) {
        std::cerr << i << " thread(s):" << std::endl;
        time(i);
    }
}

duplicate string identifiers in stdout?

Running bin/analyze measurements.txt , I noticed that an identifier was included twice, with differing values for min/mean/max. From running bin/analyze measurements.txt > out.txt and then running the below python script a few times, it seems that an identifier is consistently included twice , though the identifier that is duplicated is not the same each time.

with open("out.txt", "r") as f:
    data = f.read().split(',')

for i in range(1, len(data)):
    if data[i-1].split("=")[0] == data[i].split("=")[0]:
        print(data[i-1], data[i])

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.