Giter VIP home page Giter VIP logo

Comments (13)

riomus avatar riomus commented on June 8, 2024

Betweenness centrality is quite complex measure to compute efficiently in distributed environment, that is why it is not present for now. You can look at NetworkX code. Betweenness requires APSP (All Pair Shortest Paths) and you must know how many SP are coming thru each vertex.

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

@riomus do you think there is a simpler method to calculate the metric I want to achieve (maybe using kernels)?

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024

@geoHeil there is k-betweenness

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

@riomus thanks for that link. Do you think that betweenness centrality in general, is sort of what I would want to compute? Or that a different metric would serve better as a starting point of what I want to achieve?

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024

@geoHeil it is not betweenness centrality at all, it is not even close to it. That measure can be easily implemented using Pregel operator from GraphX, check out how Eigenvector centrality is computed in sparkling and read about BSP for graphs.

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

@riomus thanks for this hint. Indeed, I read some more about BSP and graphX. Still it is not really clear how to make the following snipped achieve what I want:

  • Why do I need to swap messages
  • Why is this only one level into the graph? How to specify how many levels to track?
  • How to compute the percentage? i.e. filtered / total?
val AM = AggregateMessages
val fraudNeighbourWeight = 1.0
val msgToSrc: Column = when(AM.src("fraud") === 1, lit(fraudNeighbourWeight) * lit(1))// + AM.dst("fraud")))
    .otherwise(lit(0))
 val msgToDst: Column = when(AM.dst("fraud") === 1, lit(fraudNeighbourWeight) * lit(2))// + AM.src("fraud")))
    .otherwise(lit(0))

// todo: why do I need to swap messages, why is this only one level into the graph? how to specify how many levels to track?
val agg = g.aggregateMessages
    .sendToSrc(msgToDst) // send destination user's fraud to source
    .sendToDst(msgToSrc) // send source user's fraud to destination
    .agg(sum(AM.msg).as("summedFraud")) // sum up fraud, stored in AM.msg column
  agg.show()

g.aggregateMessages
    .sendToSrc(msgToDst)
    .sendToDst(msgToSrc)
    .agg(sum(AM.msg) / AM) // TODO this does not work. How to compute the percentage? i.e. filtered / total
    .show

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024
  • Why do I need to swap messages

What do you mean by swap?

  • Why is this only one level into the graph? How to specify how many levels to track?

That is because Spark is distributed and graph is hold in memory using Edge Triplet representation. Sou y have only access to nearest neighbors. In order to do computations on further levels, use pregel operator and pass data from neighbours to neighbours and so on.

  • How to compute the percentage? i.e. filtered / total?

Do that at the end of computation, just sum values and then map.

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

@riomus msgToSrcand msgToDst need to be swapped as outlined in

.sendToSrc(msgToDst)
.sendToDst(msgToSrc)

to produce useful output.

To my understanding graph frames aggregate messages is a pregel operator

And as shown in the documentation of graphX or here https://github.com/geoHeil/graphFrameStarter/blob/master/src/main/scala/myOrg/ExampleSQL.scala

val avgAgeOfOlderFollowers: VertexRDD[Double] =
    olderFollowers.mapValues((id, value) =>
      value match {
        case (count, totalAge) => totalAge / count
      })

easily would provide the right aggregation to calculate a percentage. But so far I did not manage to perform a similar aggregation in graph frames via .agg(sum(AM.msg) / AM)

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024

Message swap must by done probably because as GraphX, graphframes represent directed graphs while you want to compute on undirected. The aggregate message is just one iteration of pregel, you need to execute it multiple times.

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

Thanks for clarifying this. How could I run this iteratively? In https://github.com/sparkling-graph/sparkling-graph/blob/master/operators/src/main/scala/ml/sparkling/graph/operators/measures/vertex/eigenvector/EigenvectorCentrality.scala I see you use

while(continuePredicate(iteration,oldValue,newValue)||iteration==0){
      val iterationRDD=computationGraph.aggregateMessages[Double](
      sendMsg = context=>{
        context.sendToDst(num.toDouble(context.attr)*context.srcAttr)
        context.sendToSrc(0d)
        if(vertexMeasureConfiguration.treatAsUndirected){
          context.sendToSrc(num.toDouble(context.attr)*context.dstAttr)
          context.sendToDst(0d)
        }
      },
      mergeMsg = (a,b)=>a+b)

What is unclear for me is how context is switching from vertex to vertex. And why sometimes only a literal i.e. float value is passed (context.sendToDst(0d)) whereas sometimes the whole attribute filed context.sendToDst(num.toDouble(context.attr)*context.srcAttr) is passed.

Do I assume correctly, that:

val msgToSrc: Column = when(AM.src("fraud") === 1, lit(1)).otherwise(lit(0))

is not enough and similar to passing the whole attribute I rather should rather use

val msgToSrc: Column = when(AM.src("fraud") === 1, AM.dst("fraud")).otherwise(lit(0))

to iterate further into the graph?

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024

I am passing 0d in order to guarantee that vertex will receive data, otherwise, it can happen that there are no messages for vertex what cause problems with eigenvalue update. I am not sure what is lit in your code, but yes, if you want vertices to exchange data you should attributes, not exact values. At the beginning, you can map vertices values to correct data and then exchange it.

To make it clear, it seems that it is not an issue with SparklingGraph, but you try to implement some measure using GraphFrames. I think we can close the issue and move the discussion to a different channel like gitter. If you agree please close.

from sparkling-graph.

geoHeil avatar geoHeil commented on June 8, 2024

from sparkling-graph.

riomus avatar riomus commented on June 8, 2024

Discussion moved to gitter

from sparkling-graph.

Related Issues (19)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.