Giter VIP home page Giter VIP logo

Comments (16)

jedbrown avatar jedbrown commented on June 12, 2024 1

In your example, the receive remote_op needs to actually have the size. In this case, that looks like vec![0; 12] instead of Vec::new(). Also, you can't use *ready_send unless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024 1

See also #182 -- I'm inclined to make ready send unsafe in the next release.

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024 1

Yeah, that's what I figured so I added this comment. (I'll merge this with some more housekeeping bits, which will probably include making ready-mode unsafe.)
1e5eeab#diff-08ccf39fba6d59419fdacc6fa9fb0003071407cc2792c0a8811369aea0fc57d1R34-R36

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024 1

Just replace immediate_ready_send with immediate_send.

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024

Can you look at immediate_multiple_requests.rs to see if it answers your question about scope? The issue is that your messages exceed the MPI implementation's "eager threshold" (usually an environment-tunable parameter, but your code shouldn't rely on it) and thus sends can't complete until receives are posted, but as you've written it, you can't get around to posting receives until your rank has waited on its sends.

from rsmpi.

skailasa avatar skailasa commented on June 12, 2024

I've tried the following, and it appears to work

  mpi::request::multiple_scope(nreqs as usize, |scope, coll| {

        for (i, packet) in packets.iter().enumerate() {
            let sreq = world
                .process_at_rank(packet_destinations[i])
                .immediate_send(scope, &packet[..]);
                coll.add(sreq);
        }

        for (i, buffer) in buffers.iter_mut().enumerate() {
            let rreq = world
                .process_at_rank(received_packet_sources[i])
                .immediate_receive_into(scope, &mut buffer[..]);
            coll.add(rreq);
        }
        let mut out = vec![];
        coll.wait_all(&mut out);
        assert_eq!(out.len(), nreqs as usize);
    });

thank you for your suggestion, there's an unrelated issue though, which I think is coming from my MPI implentation (OpenMPI 4.1.4). When I run this code with the max number of processors on my machine, I get a weird system call error, but only some of the time, which may be related to this code.

  System call: unlink(2) /var/folders/6g/xsc42sxx5lvdkl0z4_tr8_kh0000gn/T//ompi.eduroam-int-dhcp-97-1-127.501/pid.78254/1/vader_segment.eduroam-int-dhcp-97-1-127.501.77210001.3

Occasionally, the code also fails with the previous setup with the error,

[eduroam-int-dhcp-97-1-127:78465] *** An error occurred in MPI_Wait
[eduroam-int-dhcp-97-1-127:78465] *** reported by process [1947140097,0]
[eduroam-int-dhcp-97-1-127:78465] *** on communicator MPI COMMUNICATOR 4 DUP FROM 3

Do you have any idea what may be causing this?

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024

I don't see anything obviously wrong in the code. Could you share a test case (maybe a gist of the repo you're testing)? I like to try with both Open MPI and MPICH (with error checking configured) because they do different input validation and sometimes the difference in behavior help narrow down code issues. (I'm not sure there is a code issue, but if your MPI isn't broken with other apps, then something is going wrong.)

from rsmpi.

skailasa avatar skailasa commented on June 12, 2024

The second error seems to be something to do with sending packets of size 0, and attempting to insert into buffers initialized with 0 length. Actually, still appearing, not sure now.

The first error I'm still not sure though, and seems to only happen a minority of the time. The code is here

The code I'm testing is here, and is run using this example

from rsmpi.

skailasa avatar skailasa commented on June 12, 2024

Playing with this example more, it seems to only occur on my Arm M1 Mac (running openmpi 4.0, will try MPICH now), the code runs fine on Ubuntu 20.04 with same MPI version, with no warnings or errors at all.

from rsmpi.

hppritcha avatar hppritcha commented on June 12, 2024

You may want to take a look at the workarounds discussed at open-mpi/ompi#8531

from rsmpi.

skailasa avatar skailasa commented on June 12, 2024

You may want to take a look at the workarounds discussed at open-mpi/ompi#8531

I tried the tmpdir workaround but I'm getting the same error.

from rsmpi.

jedbrown avatar jedbrown commented on June 12, 2024

Is this issue believed to be an rsmpi issue or can it be closed?

from rsmpi.

DaveLanday avatar DaveLanday commented on June 12, 2024

I am seeing something similar, but it could just be that I am a MPI novice. The messages I am trying to send are small (24 bytes), I am using rustv1.7.3 and trying to execute on an intel-based MBP. I have OpenMPI version 5.0.2 (Could this be my issue?). I also set the temp dir to ~/tmp. I put more details in this discussion: #181 (reply in thread) , but here is the code that is failing:

use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{thread, time};

fn main() {
    #[allow(unused_variables)]
    // number of fake jobs to process (i.e: count up to this number)
    const NJOBS: u16 = 2000;

    // Represents the worst time job can take to finish
    let dur = time::Duration::from_millis(25);

    // Represents the probability of performing a large job:
    const P: f64 = 0.015; // 15% chance of a long running job (i.e: 2.5 seconds)
    let mut rng = thread_rng();

    // Init MPI
    let universe = mpi::initialize().unwrap();
    let world = universe.world();

    // Get the rank and size of the communicator
    let size = world.size();
    let rank = world.rank();

    // Define the processor that sends to this rank and  that receives from this rank:
    let prev_rank = (rank + size - 1) % size;
    let next_rank = (rank + 1) % size;

    // Init local replica of the count
    let mut local_counter = GCounter::new();

    println!(
        "Prev rank: {}, This rank: {}, Next rank: {}",
        prev_rank, rank, next_rank
    );
    for _n in 0..NJOBS {
        // simulate a long or short process between incrementing the count:
        if rng.gen_bool(P) {
             thread::sleep(dur);
             //println!("Running a long job on processor {}", rank);
        }

        // Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
        let inc_op = local_counter.inc(rank);

        // Apply the operation to our local counter first and then send the `Op` off to others
        local_counter.apply(inc_op.clone()); // Apply the increment:

        // Serialize the local operation for sending:
        let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();

        // For getting the most up-to-date count of the remote Dot
        let mut remote_op: Vec<u8> = Vec::new();

        // Simple point-to-point messaging in a ring configuration:
        mpi::request::scope(|scope| {
            // Receive the operation to increment the counter from the previous rank:
            let _rreq = WaitGuard::from(
                world
                    .process_at_rank(prev_rank)
                    .immediate_receive_into(scope, &mut remote_op),
            );

            // Send the change that was applied by the local counter:
            let _sreq = WaitGuard::from(
                world
                    .process_at_rank(next_rank)
                    .immediate_ready_send(scope, &send_op),
            );
        });

        // Deserialize the remote_op:
        let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();

        // Apply the change to the local counter:
        local_counter.apply(recv_op);
    }
    println!(
        "Processor {} says the count = {:?}",
        rank,
        local_counter.read()
    );
}

The error is:

❯ mpirun -np 4 --tmpdir ~/tmp  target/debug/MPI_Example
Prev rank: 1, This rank: 2, Next rank: 3
Prev rank: 3, This rank: 0, Next rank: 1
Prev rank: 0, This rank: 1, Next rank: 2
Prev rank: 2, This rank: 3, Next rank: 0
[MacBook-Pro:00000] *** An error occurred in MPI_Wait
[MacBook-Pro:00000] *** reported by process [783941633,2]
[MacBook-Pro:00000] *** on communicator MPI_COMM_WORLD
[MacBook-Pro:00000] *** MPI_ERR_TRUNCATE: message truncated
[MacBook-Pro:00000] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[MacBook-Pro:00000] ***    and MPI will try to terminate your MPI job as well)

from rsmpi.

DaveLanday avatar DaveLanday commented on June 12, 2024

In your example, the receive remote_op needs to actually have the size. In this case, that looks like vec![0; 12] instead of Vec::new(). Also, you can't use *ready_send unless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.

Thank you for the feedback @jedbrown. I am still getting familiar with MPI in general, so I appreciate the feedback. I was trying to follow the code example provided in examples/immediate.rs.

from rsmpi.

DaveLanday avatar DaveLanday commented on June 12, 2024

@jedbrown , I am a bit confused by some of the examples. Is there a pattern you would recommend? From my example I am simply trying to receive from the previous rank and store what is received into remote_op, and send &send_op to next_rank. Best, Dave

from rsmpi.

DaveLanday avatar DaveLanday commented on June 12, 2024

Oh, I am so sorry. I had done that and was trying to run a previous build on accident. Totally my fault; the code runs now!!!
Thank you @jedbrown, I am going to build off of this!

Here is the final example that compiles and runs:

use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{sync::WaitTimeoutResult, thread, time};

fn main() {
    #[allow(unused_variables)]
    // number of fake jobs to process (i.e: count up to this number)
    const NJOBS: u16 = 2000;

    // Represents the worst time job can take to finish
    let dur = time::Duration::from_millis(25);

    // Represents the probability of performing a large job:
    const P: f64 = 0.015; // 1.5% chance of a long running job (i.e: 25 milliseconds)
    let mut rng = thread_rng();

    // Init MPI
    let universe = mpi::initialize().unwrap();
    let world = universe.world();

    // Get the rank and size of the communicator
    let size = world.size();
    let rank = world.rank();

    // Define the processor that sends to this rank and  that receives from this rank:
    let prev_rank = (rank + size - 1) % size;
    let next_rank = (rank + 1) % size;

    // Init local replica of the count
    let mut local_counter = GCounter::new();

    println!(
        "Prev rank: {}, This rank: {}, Next rank: {}",
        prev_rank, rank, next_rank
    );
    for _n in 0..NJOBS {
        // simulate a long or short process between incrementing the count:
        if rng.gen_bool(P) {
             thread::sleep(dur);
             // println!("Running a long job on processor {}", rank);
        }

        // Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
        let inc_op = local_counter.inc(rank);

        // Apply the operation to our local counter first and then send the `Op` off to others
        local_counter.apply(inc_op.clone()); // Apply the increment:

        // Serialize the local operation for sending:
        let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();

        // For getting the most up-to-date count of the remote Dot
        let mut remote_op = vec![0u8; 24]; // the buffer must provide a size

        // Simple point-to-point messaging in a ring configuration:
        mpi::request::scope(|scope| {
            let _rreq = WaitGuard::from(
                world
                    .process_at_rank(prev_rank)
                    .immediate_receive_into(scope, &mut remote_op),
            );
            let _sreq = WaitGuard::from(
                world
                    .process_at_rank(next_rank)
                    .immediate_send(scope, &send_op),
            );
        });

        // Deserialize the remote_op:
        let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();

        // Apply the change to the local counter:
        local_counter.apply(recv_op);
    }
    println!(
        "Processor {} says the count = {:?}",
        rank,
        local_counter.read()
    );
}

from rsmpi.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.