A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX
In this tutorial, we demonstrate how to construct an automated Knowledge Graphpipeline using LangGraph and NetworkX. The pipeline simulates a sequence of intelligent agents that collaboratively perform tasks such as data gathering, entity extraction, relation identification, entity resolution, and graph validation. Starting from a user-provided topic, such as “Artificial Intelligence,” the system methodically extracts relevant entities and relationships, resolves duplicates, and integrates the information into a cohesive graphical structure. By visualizing the final knowledge graph, developers and data scientists gain clear insights into complex interrelations among concepts, making this approach highly beneficial for applications in semantic analysis, natural language processing, and knowledge management.
!pip install langgraph langchain_core
We install two essential Python libraries: LangGraph, which is used for creating and orchestrating agent-based computational workflows, and LangChain Core, which provides foundational classes and utilities for building language model-powered applications. These libraries enable seamless integration of agents into intelligent data pipelines.
import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
We import essential libraries to build an automated knowledge graph pipeline. It includes re for regular expression-based text processing, NetworkX and matplotlib for creating and visualizing graphs, TypedDict and typing annotations for structured data handling, and LangGraph along with langchain_core for orchestrating the interaction between AI agents within the workflow.
class KGState:
topic: str
raw_text: str
entities: Listrelations: List]
resolved_relations: List]
graph: Any
validation: Dictmessages: Listcurrent_agent: str
We define a structured data type, KGState, using Python’s TypedDict. It outlines the schema for managing state across different steps of the knowledge graph pipeline. It includes details like the chosen topic, gathered text, identified entities and relationships, resolved duplicates, the constructed graph object, validation results, interaction messages, and tracking the currently active agent.
def data_gatherer-> KGState:
topic = stateprintcollected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."
state.append)
state= collected_text
state= "entity_extractor"
return state
This function, data_gatherer, acts as the first step in the pipeline. It simulates gathering raw text data about a provided topic. It then stores this simulated data into state, adds a message indicating the data collection completion, and updates the pipeline’s state by setting the next agentas active.
def entity_extractor-> KGState:
printtext = stateentities = re.findallentities =] + entities
state= list)
state.append)
printstate= "relation_extractor"
return state
The entity_extractor function identifies entities from the collected raw text using a simple regular expression pattern that matches terms like “EntityA”, “EntityB”, etc. It also includes the main topic as an entity and ensures uniqueness by converting the list to a set. The extracted entities are stored in the state, an AI message logs the result, and the pipeline advances to the relation_extractor agent.
def relation_extractor-> KGState:
printtext = stateentities = staterelations =relation_patterns =for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search, re.IGNORECASE) or \
re.search:
relations.append)
state= relations
state.append)
printstate= "entity_resolver"
return state
The relation_extractor function detects semantic relationships between entities within the raw text. It uses predefined regex patterns to identify phrases like “influences” or “is a type of” between entity pairs. When a match is found, it adds the corresponding relation as a tripleto the relations list. These extracted relations are stored in the state, a message is logged for agent communication, and control moves to the next agent: entity_resolver.
def entity_resolver-> KGState:
printentity_map = {}
for entity in state:
canonical_name = entity.lower.replaceentity_map= canonical_name
resolved_relations =for s, p, o in state:
s_resolved = entity_map.geto_resolved = entity_map.getresolved_relations.append)
state= resolved_relations
state.append)
state= "graph_integrator"
return state
The entity_resolver function standardizes entity names to avoid duplication and inconsistencies. It creates a mappingby converting each entity to lowercase and replacing spaces with underscores. Then, this mapping is applied to all subjects and objects in the extracted relations to produce resolved relations. These normalized triples are added to the state, a confirmation message is logged, and control is passed to the graph_integrator agent.
def graph_integrator-> KGState:
printG = nx.DiGraphfor s, p, o in state:
if not G.has_node:
G.add_nodeif not G.has_node:
G.add_nodeG.add_edgestate= G
state.append} nodes and {len} edges"))
state= "graph_validator"
return state
The graph_integrator function constructs the actual knowledge graph using networkx.DiGraphsupports directed relationships. It iterates over the resolved triples, ensures both nodes exist, and then adds a directed edge with the relation as metadata. The resulting graph is saved in the state, a summary message is appended, and the pipeline transitions to the graph_validator agent for final validation.
def graph_validator-> KGState:
printG = statevalidation_report = {
"num_nodes": len,
"num_edges": len,
"is_connected": nx.is_weakly_connectedif G.nodes else False,
"has_cycles": not nx.is_directed_acyclic_graphif G.nodes else False
}
state= validation_report
state.append)
printstate= END
return state
The graph_validator function performs a basic health check on the constructed knowledge graph. It compiles a validation report containing the number of nodes and edges, whether the graph is weakly connected, and whether the graph contains cycles. This report is added to the state and logged as an AI message. Once validation is complete, the pipeline is marked as finished by setting the current_agent to END.
def router-> str:
return statedef visualize_graph:
plt.figure)
pos = nx.spring_layoutnx.drawedge_labels = nx.get_edge_attributesnx.draw_networkx_edge_labelsplt.titleplt.tight_layoutplt.showThe router function directs the pipeline to the next agent based on the current_agent field in the state. Meanwhile, the visualize_graph function uses matplotlib and networkx to display the final knowledge graph, showing nodes, edges, and labeled relationships for intuitive visual understanding.
def build_kg_graph:
workflow = StateGraphworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.set_entry_pointreturn workflow.compileThe build_kg_graph function defines the complete knowledge graph workflow using LangGraph. It sequentially adds each agent as a node, from data collection to graph validation, and connects them through conditional transitions based on the current agent. The entry point is set to data_gatherer, and the graph is compiled into an executable workflow that guides the automated pipeline from start to finish.
def run_knowledge_graph_pipeline:
printinitial_state = {
"topic": topic,
"raw_text": "",
"entities":,
"relations":,
"resolved_relations":,
"graph": None,
"validation": {},
"messages":,
"current_agent": "data_gatherer"
}
kg_app = build_kg_graphfinal_state = kg_app.invokeprintreturn final_state
The run_knowledge_graph_pipeline function initializes the pipeline by setting up an empty state dictionary with the provided topic. It builds the workflow using build_kg_graph, then runs it by invoking the compiled graph with the initial state. As each agent processes the data, the state evolves, and the final result contains the complete knowledge graph, validated and ready for use.
if __name__ == "__main__":
topic = "Artificial Intelligence"
result = run_knowledge_graph_pipelinevisualize_graphFinally, this block serves as the script’s entry point. When executed directly, it triggers the knowledge graph pipeline for the topic “Artificial Intelligence,” runs through all agent stages, and finally visualizes the resulting graph using the visualize_graphfunction. It provides an end-to-end demonstration of automated knowledge graph generation.
Output Generated from Knowledge Graph Execution
In conclusion, we have learned how to seamlessly integrate multiple specialized agents into a cohesive knowledge graph pipeline through this structured approach, leveraging LangGraph and NetworkX. This workflow automates entity and relation extraction processes and visualizes intricate relationships, offering a clear and actionable representation of gathered information. By adjusting and enhancing individual agents, such as employing more sophisticated entity recognition methods or integrating real-time data sources, this foundational framework can be scaled and customized for advanced knowledge graph construction tasks across various domains.
Check out the Colab Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 90k+ ML SubReddit.
Asif RazzaqWebsite | + postsBioAsif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.Asif Razzaqhttps://www.marktechpost.com/author/6flvq/Google DeepMind Introduces AlphaEvolve: A Gemini-Powered Coding AI Agent for Algorithm Discovery and Scientific OptimizationAsif Razzaqhttps://www.marktechpost.com/author/6flvq/Rime Introduces Arcana and Rimecaster: Practical Voice AI Tools Built on Real-World SpeechAsif Razzaqhttps://www.marktechpost.com/author/6flvq/A Step-by-Step Guide to Build a Fast Semantic Search and RAG QA Engine on Web-Scraped Data Using Together AI Embeddings, FAISS Retrieval, and LangChainAsif Razzaqhttps://www.marktechpost.com/author/6flvq/Agent-Based Debugging Gets a Cost-Effective Alternative: Salesforce AI Presents SWERank for Accurate and Scalable Software Issue Localization
#stepbystep #guide #build #automated #knowledge
A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX
In this tutorial, we demonstrate how to construct an automated Knowledge Graphpipeline using LangGraph and NetworkX. The pipeline simulates a sequence of intelligent agents that collaboratively perform tasks such as data gathering, entity extraction, relation identification, entity resolution, and graph validation. Starting from a user-provided topic, such as “Artificial Intelligence,” the system methodically extracts relevant entities and relationships, resolves duplicates, and integrates the information into a cohesive graphical structure. By visualizing the final knowledge graph, developers and data scientists gain clear insights into complex interrelations among concepts, making this approach highly beneficial for applications in semantic analysis, natural language processing, and knowledge management.
!pip install langgraph langchain_core
We install two essential Python libraries: LangGraph, which is used for creating and orchestrating agent-based computational workflows, and LangChain Core, which provides foundational classes and utilities for building language model-powered applications. These libraries enable seamless integration of agents into intelligent data pipelines.
import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
We import essential libraries to build an automated knowledge graph pipeline. It includes re for regular expression-based text processing, NetworkX and matplotlib for creating and visualizing graphs, TypedDict and typing annotations for structured data handling, and LangGraph along with langchain_core for orchestrating the interaction between AI agents within the workflow.
class KGState:
topic: str
raw_text: str
entities: Listrelations: List]
resolved_relations: List]
graph: Any
validation: Dictmessages: Listcurrent_agent: str
We define a structured data type, KGState, using Python’s TypedDict. It outlines the schema for managing state across different steps of the knowledge graph pipeline. It includes details like the chosen topic, gathered text, identified entities and relationships, resolved duplicates, the constructed graph object, validation results, interaction messages, and tracking the currently active agent.
def data_gatherer-> KGState:
topic = stateprintcollected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."
state.append)
state= collected_text
state= "entity_extractor"
return state
This function, data_gatherer, acts as the first step in the pipeline. It simulates gathering raw text data about a provided topic. It then stores this simulated data into state, adds a message indicating the data collection completion, and updates the pipeline’s state by setting the next agentas active.
def entity_extractor-> KGState:
printtext = stateentities = re.findallentities =] + entities
state= list)
state.append)
printstate= "relation_extractor"
return state
The entity_extractor function identifies entities from the collected raw text using a simple regular expression pattern that matches terms like “EntityA”, “EntityB”, etc. It also includes the main topic as an entity and ensures uniqueness by converting the list to a set. The extracted entities are stored in the state, an AI message logs the result, and the pipeline advances to the relation_extractor agent.
def relation_extractor-> KGState:
printtext = stateentities = staterelations =relation_patterns =for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search, re.IGNORECASE) or \
re.search:
relations.append)
state= relations
state.append)
printstate= "entity_resolver"
return state
The relation_extractor function detects semantic relationships between entities within the raw text. It uses predefined regex patterns to identify phrases like “influences” or “is a type of” between entity pairs. When a match is found, it adds the corresponding relation as a tripleto the relations list. These extracted relations are stored in the state, a message is logged for agent communication, and control moves to the next agent: entity_resolver.
def entity_resolver-> KGState:
printentity_map = {}
for entity in state:
canonical_name = entity.lower.replaceentity_map= canonical_name
resolved_relations =for s, p, o in state:
s_resolved = entity_map.geto_resolved = entity_map.getresolved_relations.append)
state= resolved_relations
state.append)
state= "graph_integrator"
return state
The entity_resolver function standardizes entity names to avoid duplication and inconsistencies. It creates a mappingby converting each entity to lowercase and replacing spaces with underscores. Then, this mapping is applied to all subjects and objects in the extracted relations to produce resolved relations. These normalized triples are added to the state, a confirmation message is logged, and control is passed to the graph_integrator agent.
def graph_integrator-> KGState:
printG = nx.DiGraphfor s, p, o in state:
if not G.has_node:
G.add_nodeif not G.has_node:
G.add_nodeG.add_edgestate= G
state.append} nodes and {len} edges"))
state= "graph_validator"
return state
The graph_integrator function constructs the actual knowledge graph using networkx.DiGraphsupports directed relationships. It iterates over the resolved triples, ensures both nodes exist, and then adds a directed edge with the relation as metadata. The resulting graph is saved in the state, a summary message is appended, and the pipeline transitions to the graph_validator agent for final validation.
def graph_validator-> KGState:
printG = statevalidation_report = {
"num_nodes": len,
"num_edges": len,
"is_connected": nx.is_weakly_connectedif G.nodes else False,
"has_cycles": not nx.is_directed_acyclic_graphif G.nodes else False
}
state= validation_report
state.append)
printstate= END
return state
The graph_validator function performs a basic health check on the constructed knowledge graph. It compiles a validation report containing the number of nodes and edges, whether the graph is weakly connected, and whether the graph contains cycles. This report is added to the state and logged as an AI message. Once validation is complete, the pipeline is marked as finished by setting the current_agent to END.
def router-> str:
return statedef visualize_graph:
plt.figure)
pos = nx.spring_layoutnx.drawedge_labels = nx.get_edge_attributesnx.draw_networkx_edge_labelsplt.titleplt.tight_layoutplt.showThe router function directs the pipeline to the next agent based on the current_agent field in the state. Meanwhile, the visualize_graph function uses matplotlib and networkx to display the final knowledge graph, showing nodes, edges, and labeled relationships for intuitive visual understanding.
def build_kg_graph:
workflow = StateGraphworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_nodeworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.add_conditional_edgesworkflow.set_entry_pointreturn workflow.compileThe build_kg_graph function defines the complete knowledge graph workflow using LangGraph. It sequentially adds each agent as a node, from data collection to graph validation, and connects them through conditional transitions based on the current agent. The entry point is set to data_gatherer, and the graph is compiled into an executable workflow that guides the automated pipeline from start to finish.
def run_knowledge_graph_pipeline:
printinitial_state = {
"topic": topic,
"raw_text": "",
"entities":,
"relations":,
"resolved_relations":,
"graph": None,
"validation": {},
"messages":,
"current_agent": "data_gatherer"
}
kg_app = build_kg_graphfinal_state = kg_app.invokeprintreturn final_state
The run_knowledge_graph_pipeline function initializes the pipeline by setting up an empty state dictionary with the provided topic. It builds the workflow using build_kg_graph, then runs it by invoking the compiled graph with the initial state. As each agent processes the data, the state evolves, and the final result contains the complete knowledge graph, validated and ready for use.
if __name__ == "__main__":
topic = "Artificial Intelligence"
result = run_knowledge_graph_pipelinevisualize_graphFinally, this block serves as the script’s entry point. When executed directly, it triggers the knowledge graph pipeline for the topic “Artificial Intelligence,” runs through all agent stages, and finally visualizes the resulting graph using the visualize_graphfunction. It provides an end-to-end demonstration of automated knowledge graph generation.
Output Generated from Knowledge Graph Execution
In conclusion, we have learned how to seamlessly integrate multiple specialized agents into a cohesive knowledge graph pipeline through this structured approach, leveraging LangGraph and NetworkX. This workflow automates entity and relation extraction processes and visualizes intricate relationships, offering a clear and actionable representation of gathered information. By adjusting and enhancing individual agents, such as employing more sophisticated entity recognition methods or integrating real-time data sources, this foundational framework can be scaled and customized for advanced knowledge graph construction tasks across various domains.
Check out the Colab Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 90k+ ML SubReddit.
Asif RazzaqWebsite | + postsBioAsif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.Asif Razzaqhttps://www.marktechpost.com/author/6flvq/Google DeepMind Introduces AlphaEvolve: A Gemini-Powered Coding AI Agent for Algorithm Discovery and Scientific OptimizationAsif Razzaqhttps://www.marktechpost.com/author/6flvq/Rime Introduces Arcana and Rimecaster: Practical Voice AI Tools Built on Real-World SpeechAsif Razzaqhttps://www.marktechpost.com/author/6flvq/A Step-by-Step Guide to Build a Fast Semantic Search and RAG QA Engine on Web-Scraped Data Using Together AI Embeddings, FAISS Retrieval, and LangChainAsif Razzaqhttps://www.marktechpost.com/author/6flvq/Agent-Based Debugging Gets a Cost-Effective Alternative: Salesforce AI Presents SWERank for Accurate and Scalable Software Issue Localization
#stepbystep #guide #build #automated #knowledge
·79 Views