delta-rho / datadr Goto Github PK
View Code? Open in Web Editor NEWDivide and Recombine
License: Other
Divide and Recombine
License: Other
The convert function needs an overwrite parameter. If I pass the function an already populated connection, it gives this warning message:
* Loading connection attributes
Warning message:
In addData.localDiskConn(to, b) :
Element 1 of input data has data already on disk of the same key. Set overwrite to TRUE or change key of input data.
An overwrite parameter should be added that is passed along to the call to addData.
It would be nice to have the package automatically deal with printing when packages are loaded...
Error below
Error in if (needs["totObjectSize"]) collect("totObjectSize", objSize) :
missing value where TRUE/FALSE needed
Autokill is true and terminating job_201402250850_1195
When printing a ddo/ddf, make it easier to access information about the transformation functions, in a way that doesn't get too verblse.
Replace drFilter, drSample, drJoin, drXtabs with filter, sample, join, in a way that is friendly to these functions in base R. Also, lapply for ddo and ddf objects should be able to take arguments such as control, verbose that get passed to MR jobs.
Enhancement request: a function that would force a transformation function to be calculated and persisted immediately, rather than waiting until recombine
or one of the other functions is called.
Note: if/when you do this, remove the comment I added to the addTransform
documentation that shows the workaround of calling recombine(dat, combDdo, output = …)
.
This may break some old analysis code, but after a lot of thought I think the current approach of how to apply transformations to functions is a bit unnatural. To review, the current approach is:
The reason this logic exists is that usually the user wants to operate on just the value, and we want to make that as simple as possible. But we also want to be able to get information from the key if needed. But what I have found is that almost every transformation I apply that needs the key, I still only want to return a value (I'm not doing anything to the key - just getting information from it), so having to return a key-value pair is a bit cumbersome. Therefore, I'm proposing the following change to the logic:
kvPair()
. Since this will occur so rarely, I think this is a good choice.This will be simple to do and simple to adapt new code to, but may effect some old code of others.
@amwhite, I think this might effect you most. What do you think?
Hi,
Is there anyway to say
"mrExec on a little of the data and then stop"
using datadr?
One of the techniques I used to debug/develop Rhipe jobs was to set the input to a single part file of a e.g. part-r-00001, and then let the MapReduce run.
I am trying to live in the datadr system, but I am having trouble seeing how to do this. For example, I loaded up some data drRead.csv then did a MapReduce (effectively as a ddo) using a map type hdfsConn as an output, and then I wanted to do a more complicated job and I went to specify an hdfsConn as a part file, and realized I couldn't.
This has to do with format() being applied to a matrix. Easy fix, just need to do it.
For grouping by multiple variables.
Hi,
I am not quite sure how to use drRead.csv with csv on HDFS. The help for drRead.csv seems to indicate that it is possible. I tried the paradigm of hdfsConn and ran into this (follows). I was intending to use that as an input argument. Any ideas?
> rhput(file.path(getwd(),flu_data_dir, "ili_2000_2013.csv"), "ili_2000_2013_csv")
> rhls("ili_2000_2013_csv")
permission owner group size modtime file
1 -rw-r--r-- roun308 roun308 6.282 gb 2015-09-15 11:32 /user/roun308/chiron/ili_2000_2013_csv
> rhchmod("ili_2000_2013_csv", 777)
> in1 = hdfsConn("/user/roun308/chiron/ili_2000_2013_csv", type="text")
* Saving connection attributes
Error in .jcall("RJavaTools", "Ljava/lang/Object;", "invokeMethod", cl, :
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /user/roun308/chiron/ili_2000_2013_csv ili_2000_2013_csv
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsRecursively(FSNamesystem.java:4146)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4098)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.cal
This code
bySpecies <- divide(iris, by = "Species")
bySpeciesFit <- addTransform(bySpecies, function(x) lm(Sepal.Length ~ Sepal.Width, data = x))
recombine(bySpeciesFit, combine = combMeanCoef)
Gives this:
numeric(0)
Could you create a prompt to overwrite the local disk when generating a new ddo?
I'm not sure how difficult this one is, but it would be nice to have it.
The drFilter function returns data in a slightly different format when applied to data on local disk instead of HDFS or memory. The value portion has an extra level of list object for the local disk data. Example:
irisDdf <- ddf(iris) ## in memory data
irisHdfs <- convert(irisDdf, hdfsConn("iris/iris", autoYes=TRUE)) ## HDFS data
irisLocal <- convert(irisDdf, localDiskConn("iris/iris", autoYes=TRUE)) ## local disk data
irisHDFS.by.species <- divide(irisHdfs, by="Species",
out=hdfsConn("iris/bySpecies", autoYes=TRUE),
overwrite=TRUE)
irisLocal.by.species <- divide(irisLocal, by="Species",
out=localDiskConn("iris/bySpecies", autoYes=TRUE),
overwrite=TRUE)
irisMemory.by.species <- divide(irisDdf, by="Species")
irisHdfs.filtered <- drFilter(irisHDFS.by.species,
function(v) mean(v$Sepal.Length) < 6.0,
out=hdfsConn("iris/filtered", autoYes=TRUE),
overwrite=TRUE)
irisLocal.filtered <- drFilter(irisLocal.by.species,
function(v) mean(v$Sepal.Length) < 6.0,
out=localDiskConn("iris/filtered", autoYes=TRUE),
overwrite=TRUE)
irisMemory.filtered <- drFilter(irisMemory.by.species,
function(v) mean(v$Sepal.Length) < 6.0)
Output looks like:
> irisHdfs.filtered[[1]]
Read 1 objects(1.84 KB) in 0.05 seconds
$key
[1] "Species=setosa"
$value
Sepal.Length Sepal.Width Petal.Length Petal.Width
1 5.1 3.5 1.4 0.2
2 4.9 3.0 1.4 0.2
3 4.7 3.2 1.3 0.2
4 4.6 3.1 1.5 0.2
5 5.0 3.6 1.4 0.2
...
Note the extra [[1]] after $value here:
> irisLocal.filtered[[1]]
$key
[1] "Species=setosa"
$value
[[1]]
Sepal.Length Sepal.Width Petal.Length Petal.Width
1 5.1 3.5 1.4 0.2
2 4.9 3.0 1.4 0.2
3 4.7 3.2 1.3 0.2
4 4.6 3.1 1.5 0.2
5 5.0 3.6 1.4 0.2
...
Memory output looks just like HDFS output:
> irisMemory.filtered[[1]]
$key
[1] "Species=setosa"
$value
Sepal.Length Sepal.Width Petal.Length Petal.Width
1 5.1 3.5 1.4 0.2
2 4.9 3.0 1.4 0.2
3 4.7 3.2 1.3 0.2
4 4.6 3.1 1.5 0.2
5 5.0 3.6 1.4 0.2
...
is missing from Rhipe 0.73.1
hdfsConn: no visible global function definition for 'rhmkdir'
hdfsConn: no visible global function definition for 'rhchmod'
saveAttrs.hdfsConn: no visible global function definition for 'rhchmod'
I was inspect ddo internals. Just out of curiosity does one of the function calls inspect the environment for control? I was trying to track where it got used by following arguments and it doesn't appear, but I am pretty sure it gets used.
ddo <- function(conn, update = FALSE, reset = FALSE, control = NULL, verbose = TRUE) {
# ddoInit should attach the conn attribute and add the ddo class to the object
res <- ddoInit(conn)
class(res) <- c("ddo", class(res))
conn <- ddoInitConn(conn)
attrs <- loadAttrs(conn, type = "ddo")
if(length(attrs) == 0 || reset) {
# if(verbose)
# message("* Getting basic 'ddo' attributes...")
attrs <- getBasicDdoAttrs(res, conn)
attrs <- initAttrs(res, attrs, type = "ddo")
} else {
if(verbose)
message("* Reading in existing 'ddo' attributes")
}
res <- setAttributes(res, attrs)
# update attributes based on conn
if(update)
res <- updateAttributes(res)
res
}
The current error is
I think the error should be much simpler. divide variable not found in ddf object
It would be best if any time there is an error in a MapReduce job for the key to be printed along with the error message, so that the user has an idea of with what data the error occurred.
Not available for R 3.+
:-(
Hi,
I call it an "issue". I have a data set with a 2 character code in a vector, and I noticed this scenario:
V1 = c("NA", "DO", "CO", "SH", "VA", as.character(NA))
V1
V2 = runif(length(V1))
data = data.frame(V1=V1, V2=V2)
from = divide(data, by="V1")
tmp = tempfile()
to = localDiskConn(tmp, autoYes=TRUE)
convert(from, to)
Error in getFileLocs(conn, keys) :
There are duplicate keys - not currently supported
The solution is either define the NA (missing) as a character string that is unused or change the "NA" character code to something else.
The nature of this error brings it all the way down to the key choice of datadr. Both the "NA" and the NA have key
"V1=NA"
To be clear, this issue is not pressing for me. I have obvious work arounds. Just thought I would note it.
To scale to very large numbers of subsets, it is unrealistic to expect to be able to retrieve all the keys in updateAttributes. When the number of subsets is very large, it is most likely that Hadoop is being used, in which case it is not as important to have all the keys at once.
When making a new connection to an hdfs object it might be nice to have an option or something report that there is already and _rh_meta file and then ask if we want to delete or use it....
After several discussions and a lot of thought, I am proposing and working on the following major change:
Suppose I have a ddf of the iris data:
irisDdf <- ddf(iris)
I want a new division of this data, split by slices of petal length. I want a new variable, plCut
that represents the slicing. If I were to add this variable to a regular data frame, I would do something like this:
iris$plCut <- cut(iris$Petal.Length, seq(0, 7, length = 3))
To add the variable to a ddf, the current approach is to use preTransFn
in divide()
, so that prior to the divide operations being applied to each subset, the transformation function is applied:
byPL <- divide(irisDdf, by = "plCut", preTransFn = function(x) {
x$plCut <- cut(x$Petal.Length, seq(0, 7, length = 3))
x
})
My proposed approach is to provide a function addTransform()
, which allows you to add a transformation function to a distributed data object, creating a new object that is then passed to the divide()
or any other method.
irisDdfPL <- addTransform(irisDdf, function(x) {
x$plCut <- cut(x$Petal.Length, seq(0, 7, length = 3))
x
})
byPL <- divide(irisDdfPL, by = "plCut")
This seems like a much more natural approach. When you call divide()
, you know the object you are passing contains the plCut
variable instead of having to think about supplying a preTransFn
in the same step.
From a design point of view, calling addTransform()
does not result in any computation - the computation is deferred until any of the dr
-prefixed functions are called (drJoin()
, drLapply()
, etc.), as well as divide()
and recombine()
. irisDdfPL
still points to the same data as irisDdf
, but behaves as if it were already transformed. For example, irisDdfPL[[1]]
will return the first key-value pair including the new column.
This approach is even more natural when thinking about how recombine()
is currently implemented. Currently, you specify both the apply
and combine
in recombine()
, which is not completely intuitive. With the proposed approach, the apply would be adding a transform to any ddo/ddf object, and the recombine would just be applying a combine method to the transformed object.
This change would make the preTransFn
argument in all of the dr
-prefixed functions as well as divide()
and recombine()
redundant and unnecessary. It would be cleanest to completely remove preTransFn
, but that would be a breaking change. We could accomodate both approaches to remain compatible, but with the user base being relatively small and changes to code negligible, I'd rather go with the clean approach, with the option for those who are using this in a project-critical way can stick with version 0.72 until they are ready to update their code.
Any thoughts on whether to remove preTransFn
? Also, is the name addTransform
clear enough?
There should be a function like drWrite.csv which will be used to write large ddf store in hdfs to hdfs which later can be ingested to different module like Hive or Impala etc.
Reference [Issue in RHIPE]
It would be most clean for transformation functions to be executed in a clean environment that has all related data added to it.
I'm trying to overwrite an old ddo using drRead.csv
. With overwrite=TRUE
inside the read function, it gives me a warning for each chunk:
21: In addData.localDiskConn(output, list(list(i, postTransFn(data)))) :
Element 1 of input data has data already on disk of the same key.
Set overwrite to TRUE or change key of input data.
If I divide by A and B, then want to re-group to just A, the columns aren't found as they are conditioning vars. This is very confusing to me.
example:
mt <- mtcars
mt$gear <- paste("Gear_", mt$gear, sep = "")
mt$carb <- paste("Carb_", mt$carb, sep = "")
byGearCarb <- divide(
mt,
by = c("gear", "carb")
)
byGear <- divide(
byGearCarb,
by = c("gear")
)
# * Verifying parameters...
# Error in validateDivSpec.condDiv(by, data, ex) :
# 'by' variables for conditioning division are not matched in data. Look at a subset of the data, e.g. 'data[[1]]' to see what to expect.
byGearCarb
#
# Distributed data frame backed by 'kvMemory' connection
#
# attribute | value
# ----------------+---------------------------------------------------------------------------------------------------------------
# names | mpg(num), cyl(num), disp(num), hp(num), drat(num), wt(num), qsec(num), vs(num), am(num)
# nrow | 32
# size (stored) | 35.45 KB
# size (object) | 35.45 KB
# # subsets | 11
#
# * Other attributes: getKeys()
# * Missing attributes: splitSizeDistn, splitRowDistn, summary
# * Conditioning variables: gear, carb
byGearCarb[[1]]
# $key
# [1] "gear=Gear_3|carb=Carb_1"
#
# $value
# mpg cyl disp hp drat wt qsec vs am
#1 21.4 6 258.0 110 3.08 3.215 19.44 1 0
#2 18.1 6 225.0 105 2.76 3.460 20.22 1 0
#3 21.5 4 120.1 97 3.70 2.465 20.01 1 0
Can this be fixed by a preTransFn? Should this default to add the conditioning vars?
devtools::install_github("tesseradata/datadr")
Downloading GitHub repo tesseradata/datadr@master
Error in loadNamespace(i, c(lib.loc, .libPaths()), versionCheck = vI[[i]]) :
there is no package called ‘stringi’
When I tried installing stringi before it said that it was the wrong one.
parLapply hangs on R CMD CHECK (see commented out lines of inst/tests/test-kvLocalDisk in the section "local disk parallel check"). It would be good to figure out why so the parallel capability to be part of the unit tests.
I'm thinking about adding a Redis connection for storing ddf and ddo data. However, I'm having a little bit of trouble understanding. Is there a list of functions that need to be implemented to do this? Will I only need to implement a new connection? Is implementing a constructor, addData, removeData, print, loadAttrs and saveAttrs sufficient?
Found the following calls to attach():
File 'datadr/R/mapreduce_kvLocalDisk.R':
attach(.tmp, warn.conflicts = FALSE, name = "custom .tmp with detach two lines later")
See section 'Good practice' in '?attach'.
Good practice:
‘attach’ has the side effect of altering the search path and this
can easily lead to the wrong object of a particular name being
found. People do often forget to ‘detach’ databases.
In interactive use, ‘with’ is usually preferable to the use of
‘attach’/‘detach’.
In programming, functions should not change the search path unless
that is their purpose. Often ‘with’ can be used within a
function. If not, good practice is to
• Always use a distinctive ‘name’ argument, and
• To immediately follow the ‘attach’ call by an ‘on.exit’ call
to ‘detach’ using the distinctive name.
This ensures that the search path is left unchanged even if the
function is interrupted or if code after the ‘attach’ call changes
the search path.
Can you try it with a on.exit(detach(""custom .tmp with detach two lines later"))
? (or your branch on.exit(detach(".tmp"))
)
Thank you,
Barret
The java error does not communicate to an R user. When attaching to an empty hdfsConn the ddo and ddf function could warn that no data exists in the folder.
It would be nice to call str() on a distributed dataframe and get back the number of key-value pairs and the standard str() information for column names/classes/lengths.
as.list.key = function(key){
keys = strsplit(key,"|", fixed=TRUE)
keys = unlist(lapply(keys, function(k) strsplit(unlist(k), "=", fixed=TRUE)), recursive = FALSE) # the need for the unlist is a mystery to me
fid = lapply(keys, function(k) k[[1]])
fv = lapply(keys, function(k) k[[2]])
names(fv) = unlist(fid)
return(fv)
}
I find myself using the above a lot on datadr keys. Anything like this in datadr? (basically invert the paste code).
It is used like:
> k = ddo_local[[1]]$key
> k
[1] "DMISID=0001|year=2010"
> as.list.key(k)
$DMISID
[1] "0001"
$year
[1] "2010"
On a side note I was sitting around last night and amusing myself with a smirky thought. You ever noticed that the "|" in the key is the logical "or" symbol in R? It reads DMISID=0001 or year = 2010. Shouldn't it be "&" =)
Currently, first class R Data Frame/RDBMS style joins are not accessible from the datadr API unless one divides by a unique key (creating singleton chunks of data) and then combines on keys. Considering that data from one cohesive set may be stored in numerous files but with shared unique keys, having joins for such data is necessary before the full set can be used.
Searching 55 files for "require(digest)"
/Users/barret/Dropbox/school/research/git/datadr/datadr/R/dataops_read.R:
122
123 setup <- expression({
124: suppressWarnings(suppressMessages(require(digest)))
125 })
126
/Users/barret/Dropbox/school/research/git/datadr/datadr/R/mapreduce_kvLocalDisk.R:
14 setup <- appendExpression(setup,
15 expression({
16: suppressWarnings(suppressMessages(require(digest)))
17 })
18 )
2 matches across 2 files
Searching 55 files for "require(data.table)"
/Users/barret/Dropbox/school/research/git/datadr/datadr/R/agnostic_summary.R:
32 #' @rdname mrSummaryStats
33 tabulateReduce <- function(result, reduce.values, maxUnique = NULL) {
34: suppressWarnings(suppressMessages(require(data.table)))
35 tmp <- data.frame(rbindlist(reduce.values))
36 tmp <- rbind(result, tmp)
/Users/barret/Dropbox/school/research/git/datadr/datadr/R/ddo_ddf_updateAttrs.R:
226
227 setup <- expression({
228: suppressWarnings(suppressMessages(require(data.table)))
229 })
230 } else {
/Users/barret/Dropbox/school/research/git/datadr/datadr/R/recombine_combine.R:
63 reduce = expression(
64 pre = {
65: suppressWarnings(suppressMessages(require(data.table)))
66 res <- list()
67 n <- as.numeric(0)
3 matches across 3 files
I assume these are all inside expressions that are to be evaluated not in the current r session. Sooooooo, it's almost as if they are depends and also imports. :-(
ideas?
Add BSV table functionality. When a ddo/ddf object has between-subset-variables attached to it, make this an attribute that gets updated with updateAttributes, such that all the BSVs are pulled together into a bsv data frame, with a column including the key (or digested key for more complex keys). We can then use this table to interact with the data (pull out subsets that have interesting BSV values).
Currently drSubset only allows the result to be returned in memory. It would be helpful if it had the option to save the result using and hdfsConn or localDiskConn (similar to the output parameter in functions such as drLapply, drSample, drFilter, etc).
Investigate storing ddo/ddf object attributes simply as list elements as opposed to attributes. This might be a cleaner way to go. I'm still set on S3 for these objects.
I have tested below code with local connection. Which works perfectly with gz files as read.csv or read.table does. However when I tried to replicate the same this in HDFS it did not work.
This is working :
# local disk
sampleDiskStore <- localDiskConn(file.path(tempdir(), "sample_submission"), autoYes = TRUE)
sampleDiskData <- drRead.csv("Data/sample_submission.csv.gz", rowsPerBlock = 20000, output = sampleDiskStore)
updateAttributes(sampleDiskData)
But when I tried uploading same data in HDFSand pointing to the folder that contains the gz it is not working. I think somehow I need to specify that the file is gz file but I could not do that
rhdel("/user/ruser/DataDR/Test/Sample/RB")
sampleStore_RB<- hdfsConn(paste0(hdfs_root_path,"/Sample/RB"), autoYes = T)
# data in "/user/ruser/DataDR/Test/Raw"
# Thi will definitely not gonna work sampleData_RB <- drRead.csv(gzfile(hdfsConn(loc = "/user/ruser/DataDR/Test/RawZip", type = "text", autoYes = T)), rowsPerBlock = 20000, output = sampleStore_RB)
sampleData_RB <- drRead.csv(hdfsConn(loc = "/user/ruser/DataDR/Test/RawZip",
type = "text", autoYes = T),
rowsPerBlock = 20000,
output = sampleStore_RB)
sampleData_RB <- updateAttributes(sampleData_RB) # this is failing
[Note in above /user/ruser/DataDR/Test/RawZip
the only file present is sample_submission.csv.gz and all paths are in HDFS]
Kindly tell me what would be the solution for the same.
Track down all warnings so that we pass R CMD CHECK with no warnings
It is important for categorical variables in a distributed data frame to be able to be treated as factors. We have the problem of not knowing all the possible factor levels without looking at the entire data. But when we call updateAttributes, we get frequency tables for all categorical variables. We can extract the factor levels from this - we just need to make this part of how ddfs are handled.
* checking R code for possible problems ... NOTE
LDcollect: no visible binding for '<<-' assignment to 'taskCounter'
LDcollect: no visible binding for '<<-' assignment to 'taskRes'
LDflushKV: no visible binding for '<<-' assignment to 'taskRes'
LDflushKV: no visible binding for '<<-' assignment to 'taskCounter'
I don't know how to remove these. :-(
Is there a way we can use assign
? To my knowledge, assign
only goes inward in scope, while <<-
goes upward in scope. Could we do assign with env
set to parent.env()
?
There are lines all over that state something like:
setup <- expression({
suppressWarnings(suppressMessages(require(data.table)))
})
Why?
Aren't these already 'require'd when the environment is made?
(haven't run the code with/out the items to figure it out why)
Modify drFindGlobals to find functions and also search functions for their possible global dependencies.
When we divide by a categorical variable, each subset is defined by a unique value for that variable. To save space, that variable is omitted from the data frame before it is stored (for example, if I divide the iris data by Species, Species is removed from the resulting data frames of the subsets). The question is how to handle this when we are doing operations on the data. Should we add it in each time? For example, if I want to do a subsequent division on the data that will partition it across Species, I need the Species variable put back in. It should be like the variable is always there. Perhaps we should not remove it to begin with.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.