加入收藏 | 设为首页 | 会员中心 | 我要投稿 牡丹江站长网 (https://www.0453zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Spark Graphx 达成图中极大团挖掘, 伪并行化算法

发布时间:2021-08-22 19:55:14 所属栏目:大数据 来源:互联网
导读:spark graphx并未提供极大团挖掘算法 当下的极大团算法都是串行化的算法,基于BronKerbosch算法 ####思路:#### spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集 利用spark graphx 找出连通图,在从各个连通图中,利用串
spark graphx并未提供极大团挖掘算法
当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法
####思路:####
spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集
利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)
对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限
期待真正的并行化的极大团算法
####配置文件:####
graph_data_path=hdfs://localhost/graph_data 
out_path=hdfs://localhost/clique 
ck_path=hdfs://localhost/checkpoint 
numIter=50      剪枝次数 
count=3         极大团顶点数大小 
algorithm=2     极大团算法,1:个人实现  2:jgrapht 
percent=90      剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高 
spark.master=local 
spark.app.name=graph 
spark.serializer=org.apache.spark.serializer.KryoSerializer 
spark.yarn.executor.memoryOverhead=20480 
spark.yarn.driver.memoryOverhead=20480 
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC 
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC 
spark.driver.maxResultSize=10g 
spark.default.parallelism=60 
jgrapht
####样本数据:####
{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}
####样本图:####
####输出:####
0,1,2 0,2,3 3,4,5 4,5,6
####代码实现:####
import java.util import java.util.Properties 
import org.apache.spark.broadcast.Broadcast 
import org.apache.spark.graphx.{Edge, Graph} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.{SparkConf, SparkContext} 
import org.jgrapht.alg.BronKerboschCliqueFinder 
import org.jgrapht.graph.{DefaultEdge, SimpleGraph} 
 
import scala.collection.JavaConverters._ 
import scala.collection.mutable 
 
object ApplicationTitan { 
    def main(args: Array[String]) { 
        val prop = new Properties() 
        prop.load(getClass.getResourceAsStream("/config.properties")) 
     
        val graph_data_path = prop.getProperty("graph_data_path") 
        val out_path = prop.getProperty("out_path") 
        val ck_path = prop.getProperty("ck_path") 
        val count = Integer.parseInt(prop.getProperty("count")) 
        val numIter = Integer.parseInt(prop.getProperty("numIter")) 
        val algorithm = Integer.parseInt(prop.getProperty("algorithm")) 
        val percent = Integer.parseInt(prop.getProperty("percent")) 
        val conf = new SparkConf() 
        try { 
          Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path) 
//            Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path) 
        } catch { 
          case ex: Exception => 
            ex.printStackTrace(System.out) 
        } 
     
        prop.stringPropertyNames().asScala.foreach(s => { 
          if (s.startsWith("spark")) { 
            conf.set(s, prop.getProperty(s)) 
          } 
        }) 
        conf.registerKryoClasses(Array(getClass)) 
        val sc = new SparkContext(conf) 
        sc.setLogLevel("ERROR") 
        sc.setCheckpointDir(ck_path) 
        val sqlc = new SQLContext(sc) 
        try { 
          val e_df = sqlc.read 
//                        .json(graph_data_path) 
        .parquet(graph_data_path) 
 
          var e_rdd = e_df 
            .mapPartitions(it => { 
              it.map({ 
                case Row(dst: String, src: String) => 
                  val src_long = src.toLong 
                  val dst_long = dst.toLong 
                  if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long) 
              }) 
            }).distinct() 
          e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 
     
          var bc: Broadcast[Set[Long]] = null 
          var iter = 0 
          var bc_size = 0 
         //剪枝 
          while (iter <= numIter) { 
            val temp = e_rdd 
              .flatMap(x => List((x._1, 1), (x._2, 1))) 
              .reduceByKey((x, y) => x + y) 
              .filter(x => x._2 >= count - 1) 
              .mapPartitions(it => it.map(x => x._1)) 
            val bc_value = temp.collect().toSet 
            bc = sc.broadcast(bc_value) 
            e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2)) 
            e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 
            iter += 1 
            if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) { 
              println("total iter : "+ iter) 
              iter = Int.MaxValue 
            } 
            bc_size = bc_value.size 
          } 
     
          // 构造图 
          val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2))) 
          val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) 
     
          //连通图 
          val cc = graph.connectedComponents().vertices 
          cc.persist(StorageLevel.MEMORY_AND_DISK_SER) 
     
          cc.join(e_rdd) 
            .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2)))) 
            .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2) 
            .mapPartitions(it => it.map(x => (x._1.substring(1), x._2))) 
            .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4) 
            .filter(x => x._2.size >= count - 1) 
            .flatMap(x => { 
              if (algorithm == 1) 
                find(x, count) 
              else 
                find2(x, count) 
            }) 
            .mapPartitions(it => { 
              it.map({ 
                case set => 
                  var temp = "" 
                  set.asScala.foreach(x => temp += x + ",") 
                  temp.substring(0, temp.length - 1) 
                case _ => 
              }) 
            }) 
    //                .coalesce(1) 
    .saveAsTextFile(out_path) 
 
    catch { 
  case ex: Exception => 
    ex.printStackTrace(System.out) 
    } 
    sc.stop() 
//自己实现的极大团算法 
 def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = { 
    println(x._1 + "|s|" + x._2.size) 
    println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) 
    val neighbors = new util.HashMap[String, util.Set[String]] 
    val finder = new CliqueFinder(neighbors, count) 
    x._2.foreach(r => { 
      val v1 = r._1.toString 
      val v2 = r._2.toString 
      if (neighbors.containsKey(v1)) { 
        neighbors.get(v1).add(v2) 
      } else { 
        val temp = new util.HashSet[String]() 
        temp.add(v2) 
        neighbors.put(v1, temp) 
      } 
      if (neighbors.containsKey(v2)) { 
        neighbors.get(v2).add(v1) 
      } else { 
        val temp = new util.HashSet[String]() 
        temp.add(v1) 
        neighbors.put(v2, temp) 
      } 
    }) 
    println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) 
    finder.findMaxCliques().asScala 
//jgrapht 中的极大团算法 
 def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = { 
    println(x._1 + "|s|" + x._2.size) 
    println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) 
    val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge]) 
    x._2.foreach(r => { 
      val v1 = r._1.toString 
      val v2 = r._2.toString 
      to_clique.addVertex(v1) 
      to_clique.addVertex(v2) 
      to_clique.addEdge(v1, v2) 
    }) 
    val finder = new BronKerboschCliqueFinder(to_clique) 
    val list = finder.getAllMaximalCliques.asScala 
    var result = Set[util.Set[String]]() 
    list.foreach(x => { 
      if (x.size() >= count) 
        result = result + x 
    }) 
    println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) 
    result 
//自己实现的极大团算法
import java.util.*; 
 
/** 
 * [@author](https://my.oschina.net/arthor) mopspecial@gmail.com 
 * [@date](https://my.oschina.net/u/2504391) 2017/7/31 
 */ 
public class CliqueFinder { 
    private Map<String, Set<String>> neighbors; 
    private Set<String> nodes; 
    private Set<Set<String>> maxCliques = new HashSet<>(); 
    private Integer minSize; 
 
    public CliqueFinder(Map<String, Set<String>> neighbors, Integer minSize) { 
        this.neighbors = neighbors; 
        this.nodes = neighbors.keySet(); 
        this.minSize = minSize; 
    } 
 
    private void bk3(Set<String> clique, List<String> candidates, List<String> excluded) { 
        if (candidates.isEmpty() && excluded.isEmpty()) { 
            if (!clique.isEmpty() && clique.size() >= minSize) { 
                maxCliques.add(clique); 
            } 
            return; 
        } 
 
        for (String s : degeneracy_order(candidates)) { 
            List<String> new_candidates = new ArrayList<>(candidates); 
            new_candidates.retainAll(neighbors.get(s)); 
 
            List<String> new_excluded = new ArrayList<>(excluded); 
            new_excluded.retainAll(neighbors.get(s)); 
            Set<String> nextClique = new HashSet<>(clique); 
            nextClique.add(s); 
            bk2(nextClique, new_candidates, new_excluded); 
            candidates.remove(s); 
            excluded.add(s); 
        } 
    } 
 
    private void bk2(Set<String> clique, List<String> candidates, List<String> excluded) { 
        if (candidates.isEmpty() && excluded.isEmpty()) { 
            if (!clique.isEmpty() && clique.size() >= minSize) { 
                maxCliques.add(clique); 
            } 
            return; 
        } 
        String pivot = pick_random(candidates); 
        if (pivot == null) { 
            pivot = pick_random(excluded); 
        } 
        List<String> tempc = new ArrayList<>(candidates); 
        tempc.removeAll(neighbors.get(pivot)); 
 
        for (String s : tempc) { 
            List<String> new_candidates = new ArrayList<>(candidates); 
            new_candidates.retainAll(neighbors.get(s)); 
 
            List<String> new_excluded = new ArrayList<>(excluded); 
            new_excluded.retainAll(neighbors.get(s)); 
            Set<String> nextClique = new HashSet<>(clique); 
            nextClique.add(s); 
            bk2(nextClique, new_candidates, new_excluded); 
            candidates.remove(s); 
            excluded.add(s); 
        } 
    } 
 
    private List<String> degeneracy_order(List<String> innerNodes) { 
        List<String> result = new ArrayList<>(); 
        Map<String, Integer> deg = new HashMap<>(); 
        for (String node : innerNodes) { 
            deg.put(node, neighbors.get(node).size()); 
        } 
        while (!deg.isEmpty()) { 
            Integer min = Collections.min(deg.values()); 
            String minKey = null; 
            for (String key : deg.keySet()) { 
                if (deg.get(key).equals(min)) { 
                    minKey = key; 
                    break; 
                } 
            } 
            result.add(minKey); 
            deg.remove(minKey); 
            for (String k : neighbors.get(minKey)) { 
                if (deg.containsKey(k)) { 
                    deg.put(k, deg.get(k) - 1); 
                } 
            } 
 
        } 
        return result; 
    } 
 
 
    private String pick_random(List<String> random) { 
        if (random != null && !random.isEmpty()) { 
            return random.get(0); 
        } else { 
            return null; 
        } 
    } 
 
    public Set<Set<String>> findMaxCliques() { 
        this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>()); 
        return maxCliques; 
    } 
 
    public static void main(String[] args) { 
        Map<String, Set<String>> neighbors = new HashMap<>(); 
        neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3"))); 
        neighbors.put("1", new HashSet<>(Arrays.asList("0", "2"))); 
        neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6"))); 
        neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5"))); 
        neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6"))); 
        neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6"))); 
        neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5"))); 
        neighbors.put("7", new HashSet<>(Arrays.asList("6"))); 
        CliqueFinder finder = new CliqueFinder(neighbors, 3); 
        finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>()); 
        System.out.println(finder.maxCliques); 
    } 

(编辑:牡丹江站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!