bpresume fails frequently generating empty error messages when used with BatchJobsParam(). It works fine with SerialParam(). Please see examples below.
fun <- function(x) { if (x >= 0) x else y }
bplapply(-5:5, fun, BPPARAM = BatchJobsParam())
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:11 E:0] |++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Error: Errors occurred; first error message:
Error in FUN(...): object 'y' not found
For more information, use bplasterror(). To resume calculation, re-call
the function and set the argument 'BPRESUME' to TRUE or wrap the
previous call in bpresume().
First traceback:
41: bplapply(-5:5, fun, BPPARAM = BatchJobsParam())
40: bplapply(-5:5, fun, BPPARAM = BatchJobsParam())
39: bpmapply(FUN, X, MoreArgs = list(...), SIMPLIFY = FALSE, USE.NAMES = FALSE,
BPRESUME = BPRESUME, BPPARAM = BPPARAM)
38: bpmapply(FUN, X, MoreArgs = list(...), SIMPLIFY = FALSE, USE.NAMES = FALSE,
BPRESUME = BPRESUME, BPPARAM = BPPARAM)
37: suppressMessages(do.call(submitJobs, pars))
36: withCallingHandlers(expr, message = function(c) invokeRestart("muffleMessage"))
35: do.call(submitJobs, pars)
34: (function (reg, ids, resources = list(), wait, max.retries = 10L,
job.delay = FALSE)
{
chunks.as.arrayjobs = FALSE
getDelays = function(cf, job.delay, n) {
if (is.logical(job.delay)) {
if (job.delay && n > 100L && cf$name %nin% c("Interactive",
"Multicore", "SSH")) {
return(runif(n, n * 0.1, n * 0.2))
}
return(delays = rep.int(0, n))
}
vapply(seq_along(ids), job.delay, numeric(1L), n = n)
}
checkArg(reg, cl = "Registry")
syncRegistry(reg)
if (missing(ids)) {
ids = dbFindSubmitted(reg, negate = TRUE)
if (length(ids) == 0L) {
message("All jobs submitted, nothing to do!")
return(invisible(NULL))
}
}
else {
if (is.list(ids)) {
ids = lapply(ids, checkIds, reg = reg, check.present = FALSE)
dbCheckJobIds(reg, unlist(ids))
}
else if (is.numeric(ids)) {
ids = checkIds(reg, ids)
}
else {
stop("Parameter 'ids' must be a integer vector of job ids or a list of chunked job ids (list of integer vectors)!")
}
}
conf = getBatchJobsConf()
cf = getClusterFunctions(conf)
limit.concurrent.jobs = is.finite(conf$max.concurrent.jobs)
n = length(ids)
checkArg(resources, "list")
resources = resrc(resources)
if (missing(wait))
wait = function(retries) 10 * 2^retries
else checkArg(wait, formals = "retries")
if (is.logical(job.delay)) {
checkArg(job.delay, "logical", len = 1L, na.ok = FALSE)
}
else {
checkArg(job.delay, formals = c("n", "i"))
}
if (is.finite(max.retries)) {
max.retries = convertInteger(max.retries)
checkArg(max.retries, "integer", len = 1L, na.ok = FALSE)
}
checkArg(chunks.as.arrayjobs, "logical", na.ok = FALSE)
if (chunks.as.arrayjobs && is.na(cf$getArrayEnvirName())) {
warningf("Cluster functions '%s' do not support array jobs, falling back on chunks",
cf$name)
chunks.as.arrayjobs = FALSE
}
if (!is.null(cf$listJobs)) {
ids.intersect = intersect(unlist(ids), dbFindOnSystem(reg,
unlist(ids)))
if (length(ids.intersect) > 0L) {
stopf("Some of the jobs you submitted are already present on the batch system! E.g. id=%i.",
ids.intersect[1L])
}
}
if (limit.concurrent.jobs && (cf$name %in% c("Interactive",
"Local", "Multicore", "SSH") || is.null(cf$listJobs))) {
warning("Option 'max.concurrent.jobs' is enabled, but your cluster functions implementation does not support the listing of system jobs.\n",
"Option disabled, sleeping 5 seconds for safety reasons.")
limit.concurrent.jobs = FALSE
Sys.sleep(5)
}
if (n > 5000L) {
warningf(collapse(c("You are about to submit '%i' jobs.",
"Consider chunking them to avoid heavy load on the scheduler.",
"Sleeping 5 seconds for safety reasons."), sep = "\n"),
n)
Sys.sleep(5)
}
saveConf(reg)
is.chunked = is.list(ids)
messagef("Submitting %i chunks / %i jobs.", n, if (is.chunked)
sum(vapply(ids, length, integer(1L)))
else n)
messagef("Cluster functions: %s.", cf$name)
messagef("Auto-mailer settings: start=%s, done=%s, error=%s.",
conf$mail.start, conf$mail.done, conf$mail.error)
interrupted = FALSE
submit.msgs = buffer("list", 1000L, dbSendMessages, reg = reg,
max.retries = 10000L, sleep = function(r) 5, staged = useStagedQueries())
logger = makeSimpleFileLogger(file.path(reg$file.dir, "submit.log"),
touch = FALSE, keep = 1L)
on.exit({
if (interrupted && exists("batch.result", inherits = FALSE)) {
submit.msgs$push(dbMakeMessageSubmitted(reg, id,
time = submit.time, batch.job.id = batch.result$batch.job.id,
first.job.in.chunk.id = if (is.chunked) id1 else NULL,
resources.timestamp = resources.timestamp))
}
messagef("Sending %i submit messages...\nMight take some time, do not interrupt this!",
submit.msgs$pos())
submit.msgs$clear()
if (logger$getSize()) messagef("%i temporary submit errors logged to file '%s'.\nFirst message: %s",
logger$getSize(), logger$getLogfile(), logger$getMessages(1L))
})
messagef("Writing %i R scripts...", n)
resources.timestamp = saveResources(reg, resources)
writeRscripts(reg, cf, ids, chunks.as.arrayjobs, resources.timestamp,
disable.mail = FALSE, delays = getDelays(cf, job.delay,
n), interactive.test = !is.null(conf$interactive))
dbSendMessage(reg, dbMakeMessageKilled(reg, unlist(ids)),
staged = FALSE)
bar = makeProgressBar(max = n, label = "SubmitJobs")
bar$set()
tryCatch({
for (id in ids) {
id1 = id[1L]
retries = 0L
repeat {
if (limit.concurrent.jobs && length(cf$listJobs(conf,
reg)) >= conf$max.concurrent.jobs) {
batch.result = makeSubmitJobResult(status = 10L,
batch.job.id = NA_character_, "Max concurrent jobs exhausted")
}
else {
interrupted = TRUE
submit.time = now()
batch.result = cf$submitJob(conf = conf, reg = reg,
job.name = sprintf("%s-%i", reg$id, id1),
rscript = getRScriptFilePath(reg, id1), log.file = getLogFilePath(reg,
id1), job.dir = getJobDirs(reg, id1), resources = resources,
arrayjobs = if (chunks.as.arrayjobs)
length(id)
else 1L)
}
if (batch.result$status == 0L) {
submit.msgs$push(dbMakeMessageSubmitted(reg,
id, time = submit.time, batch.job.id = batch.result$batch.job.id,
first.job.in.chunk.id = if (is.chunked)
id1
else NULL, resources.timestamp = resources.timestamp))
interrupted = FALSE
bar$inc(1L)
break
}
interrupted = FALSE
if (batch.result$statu
bplasterror()
6/11 partial results stored. First 5 error messages:
[1]: Error: Error in FUN(...): object 'y' not found
[2]: Error: Error in FUN(...): object 'y' not found
[3]: Error: Error in FUN(...): object 'y' not found
[4]: Error: Error in FUN(...): object 'y' not found
[5]: Error: Error in FUN(...): object 'y' not found
bpresume(bplapply(abs(-5:5), fun, BPPARAM = BatchJobsParam()))
Resuming previous calculation...
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Syncing registry ...
Waiting [S:0 R:0 D:0 E:0] |++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Error in LastError$store(results = results, is.error = !ok, throw.error = TRUE) :
Errors occurred; first error message:
For more information, use bplasterror(). To resume calculation, re-call
the function and set the argument 'BPRESUME' to TRUE or wrap the
previous call in bpresume().
Error in LastError$store(results = replace(results, is.error, LastError$results), :
Errors occurred; first error message:
Error:
For more information, use bplasterror(). To resume calculation, re-call
the function and set the argument 'BPRESUME' to TRUE or wrap the
previous call in bpresume().
bplasterror()
6/11 partial results stored. First 5 error messages:
[1]: Error: Error:
[2]: Error: Error:
[3]: Error: Error:
[4]: Error: Error:
[5]: Error: Error:
bpresume(bplapply(abs(-5:5), fun, BPPARAM = BatchJobsParam()))
Resuming previous calculation...
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Syncing registry ...
Waiting [S:0 R:0 D:5 E:0] |++++++++++++++++++++++++++++++++++| 100% (00:00:00)
[[1]]
[1] 5
[[2]]
[1] 4
[[3]]
[1] 3
[[4]]
[1] 2
[[5]]
[1] 1
[[6]]
[1] 0
[[7]]
[1] 1
[[8]]
[1] 2
[[9]]
[1] 3
[[10]]
[1] 4
[[11]]
[1] 5
sessionInfo()
R Under development (unstable) (2013-12-03 r64376)
Platform: x86_64-unknown-linux-gnu (64-bit)
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=en_US.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] BatchJobs_1.1-1135 BBmisc_1.4 BiocParallel_0.5.5
loaded via a namespace (and not attached):
[1] brew_1.0-6 codetools_0.2-8 DBI_0.2-7 digest_0.6.4
[5] fail_1.2 foreach_1.4.1 iterators_1.0.6 parallel_3.1.0
[9] plyr_1.8 RSQLite_0.11.4 sendmailR_1.1-2 tools_3.1.0