본문 바로가기
데이터과학/데이터 분석 실습

[pyspark] GraphFrames 다루기

by Augmentia 2022. 6. 7.

GraphFrames 생성하기

Vertex와 Edge DataFrames을 이용하여 GraphFrames를 만들 수 있습니다.

  • Vertex DataFrame은 그래프의 각 Vertex에 대해 고유한 ID를 지정하는 "id"라는 특수 열이 포함되어야 합니다.
  • Edge DataFrame은 "src"(source vertex ID of edge) 및 "dst"(destination vertex ID of edge)라는 두 개의 특수 열이 포함되어야 합니다.

두 DataFrame에는 임의의 다른 column을 포함할 수 있으며, 이러한 항목들은 edge 및 vertex의 속성을 나타낼 수 있습니다. GraphFrame은 edge 정보만을 포함하는 DataFrame을 통해서도 구성할 수도 있습니다. 이렇게 구성하는 경우, edge의 source와 target으로부터 자동으로 vertex id를 부여하게 됩니다.

# Vertex DataFrame
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)

Basic graph and DataFrame queries

 

GraphFrame은 node degree 등과 같은 몇 가지 간단한 그래프 쿼리를 제공합니다. 또한, GraphFrames는 그래프를 vertex와 edge DataFrame의 쌍으로 나타내기 때문에 vertex와 edge DataFrame로부터 유용한 쿼리를 쉽게 만들 수 있습니다. 이러한 DataFrame은 GraphFrame에서 vertex 및 edge 필드로 사용할 수 있습니다.

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Display the vertex and edge DataFrames
g.vertices.show()
# +--+-------+---+
# |id|   name|age|
# +--+-------+---+
# | a|  Alice| 34|
# | b|    Bob| 36|
# | c|Charlie| 30|
# | d|  David| 29|
# | e| Esther| 32|
# | f|  Fanny| 36|
# | g|  Gabby| 60|
# +--+-------+---+

g.edges.show()
# +---+---+------------+
# |src|dst|relationship|
# +---+---+------------+
# |  a|  b|      friend|
# |  b|  c|      follow|
# |  c|  b|      follow|
# |  f|  c|      follow|
# |  e|  f|      follow|
# |  e|  d|      friend|
# |  d|  a|      friend|
# |  a|  e|      friend|
# +---+---+------------+

# Get a DataFrame with columns "id" and "inDegree" (in-degree)
vertexInDegrees = g.inDegrees

# Find the youngest user's age in the graph.
# This queries the vertex DataFrame.
g.vertices.groupBy().min("age").show()

# Count the number of "follows" in the graph.
# This queries the edge DataFrame.
numFollows = g.edges.filter("relationship = 'follow'").count()

Motif finding

그래프에서 모티프를 찾는다는 것은 구조적인 패턴을 찾는 것을 의미합니다. GraphFrame 모티브 찾기는 간단한 Domain-Specific Language (DSL)을 사용합니다. 예를 들어, graph.find("(a)-[e]->(b); (b)-[e2]->(a)") 는 양방향 모서리로 연결된 정점 a,b 쌍을 검색합니다. 모티프의 각 명명된 요소(vertex or edge)에 대한 열과 함께 그래프에서 이러한 모든 구조의 DataFrame을 반환합니다. 이 경우 반환되는 열은 "a, b, e, e2"가 됩니다.

구조적 패턴을 표현하기 위한 DSL:

  • 패턴의 기본 단위는 edge 입니다. 예를 들어, "(a)-[e]->(b)"는 vertex a에서 vertex b까지의 edge e를 표현합니다. vertex는 괄호 (a)로 표시되고 edge는 대괄호 [e]로 표시됩니다.
  • 패턴은 edge의 합집합으로 표현됩니다. edge 패턴은 세미콜론으로 결합할 수 있습니다. 모티브 "(a)-[e]->(b); (b)-[e2]->(c)"는 b에서 c까지의 두개의 edge를 지정합니다.
  • 패턴 내에서 vertex와 edge에 이름을 지정할 수 있습니다. 예를 들어 "(a)-[e]->(b)"에서 vertices a,b 및 edge e의 세 가지 명명된 요소가 있습니다. 이러한 이름은 두 가지 용도로 사용됩니다.
    - 이름은 모서리 사이의 공통 요소를 식별할 수 있습니다. 예를 들어, "(a)-[e]->(b); (b)-[e2]->(c)"는 동일한 vertex b가 edge e의 target이고, edge e2의 source임을 지정합니다.
    - 이름은 결과로 생성되는 DataFrame에서 column 이름으로 사용됩니다. 모티프에 명명된 vertex a가 포함된 경우 결과 DataFrame에는 GraphFrame.vertices의 스키마(열)와 동일한 하위 필드가 있는 StructType인 열 "a"가 포함됩니다. 유사하게, 모티프의 edge e는 GraphFrame.edges의 스키마(열)와 동일한 하위 필드가 있는 결과 DataFrame에 열 "e"를 생성합니다.
    - 이름은 별개의 요소를 식별하지 않는다는 점에 유의하십시오. 이름이 다른 두 요소는 동일한 그래프 요소를 참조할 수 있습니다. 예를 들어, 모티브 "(a)-[e]->(b); (b)-[e2]->(c)"에서 이름 a와 c는 동일한 꼭짓점을 참조할 수 있습니다. 명명된 요소를 고유한 vertex 또는 가장자리로 제한하려면 resultDataframe.filter("a.id != c.id")와 같은 사후 필터를 사용합니다.
  • 필요하지 않은 경우 모티프의 vertex나 edge에 대한 이름을 생략하는 것이 허용됩니다. 예를 들어, "(a)-[]->(b)"는 vertices a,b 사이의 edge를 표현하지만 edge에 이름을 지정하지 않습니다. 결과적으로 DataFrame에 익명 가장자리에 대한 열이 없습니다. 유사하게, "(a)-[e]->()"는 정점 a의 바깥쪽 가장자리를 나타내지만 대상 정점의 이름은 지정하지 않습니다. 이를 익명 정점 및 모서리라고 합니다.
  • edge를 부정하여 그래프에 해당 edge가 없어야 함을 나타낼 수도 있습니다. 예를 들어, "(a)-[]->(b); !(b)-[]->(a)"는 b에서 a연결되는 edge는 없으면서, a에서 b로 가는 edge가 있는 그래프를 찾습니다.
from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

# More complex queries can be expressed by applying filters.
motifs.filter("b.age > 30").show()

많은 모티프 쿼리는 위의 예에서와 같이 표현하기 쉽습니다. 다음 예는 모티프의 경로를 따라 상태를 전달하는 더 복잡한 쿼리입니다. 이러한 쿼리는 결과에 대한 필터와 GraphFrame 모티브 찾기를 결합하여 표현할 수 있습니다. 여기서 필터는 시퀀스 작업을 사용하여 일련의 DataFrame 열을 구성합니다.

예를 들어, 일련의 함수에 의해 정의된 속성이 있는 4개의 vertices 체인을 식별하려고 한다고 가정합니다. 즉, 4개의 vertices a->b->c->d의 체인 중에서 이 복잡한 필터와 일치하는 체인의 하위 집합을 식별합니다

  • 경로에서 상태를 초기화합니다.
  • vertex a를 기반으로 상태 업데이트.
  • vertex b를 기반으로 상태 업데이트.
  • 기타 c 및 d.
  • 최종 상태가 일부 조건과 일치하면 필터에서 체인을 수락합니다.

아래 코드 스니펫은 3개의 edges 중 2개 이상이 "친구" 관계가 되도록 4개의 vertices 체인을 식별하는 쿼리입니다. 이 예에서 상태는 "친구" edge의 현재 개수입니다. 일반적으로 모든 DataFrame 열이 될 수 있습니다.

from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import IntegerType
from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
sumFriends =\
  lambda cnt,relationship: when(relationship == "friend", cnt+1).otherwise(cnt)
#  (b) Use sequence operation to apply method to sequence of elements in motif.
#      In this case, the elements are the 3 edges.
condition =\
  reduce(lambda cnt,e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
#  (c) Apply filter to DataFrame.
chainWith2Friends2 = chain4.where(condition >= 2)
chainWith2Friends2.show()

Connected components

각 vertex에 연결된 구성 요소를 계산하고 각 vertex에 구성 요소 ID가 할당된 그래프를 반환합니다. 참고: GraphFrames 0.3.0 이상 릴리스에서는 기본 연결된 구성 요소 알고리즘을 사용하려면 Spark 체크포인트 디렉터리를 설정해야 합니다. 사용자는 connectedComponents.setAlgorithm("graphx")를 사용하여 이전 알고리즘으로 되돌릴 수 있습니다.

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Weakly connected component
result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

# Strongly connected component
result = g.stronglyConnectedComponents(maxIter=10)
result.select("id", "component").orderBy("component").show()

Label Propagation Algorithm (LPA)

네트워크에서 Label Propagation Algorithm (LDA)을 이용하여 커뮤니티를 측정할 수 있습니다. 네트워크의 각 노드는 처음에 자체 커뮤니티에 할당됩니다. 모든 슈퍼 스텝에서 노드는 커뮤니티 가입을 모든 이웃에게 보내고 수신 메시지의 커뮤니티 가입 모드로 상태를 업데이트합니다.

LPA는 그래프에 대한 표준 커뮤니티 감지 알고리즘입니다. (1) 수렴이 보장되지 않고 (2) 사소한 솔루션으로 끝날 수 있지만(모든 노드가 단일 커뮤니티로 식별됨) 계산 비용이 매우 저렴합니다.

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

result = g.labelPropagation(maxIter=5)
result.select("id", "label").show()

PageRank

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Run PageRank until convergence to tolerance "tol".
results = g.pageRank(resetProbability=0.15, tol=0.01)
# Display resulting pageranks and final edge weights
# Note that the displayed pagerank may be truncated, e.g., missing the E notation.
# In Spark 1.5+, you can use show(truncate=False) to avoid truncation.
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()

# Run PageRank for a fixed number of iterations.
results2 = g.pageRank(resetProbability=0.15, maxIter=10)

# Run PageRank personalized for vertex "a"
results3 = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")

# Run PageRank personalized for vertex ["a", "b", "c", "d"] in parallel
results4 = g.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

Shortest Paths

각 vertex에서 지정된 랜드마크 vertices 집합까지의 최단 경로를 계산합니다. 여기서 랜드마크는 vertex ID로 지정됩니다. 최단거리 계산에서는 edge가 연결된 방향이 고려됨을 유의해주세요.

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

Triangle count

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

results = g.triangleCount()
results.select("id", "count").show()