Skip to content

Commit 3b9457c

Browse files
prakriti-solankeyaashipandyakaustubh-darekar
authored
removed ready to reprocess check (#979)
* removed ready to reprocess check * checkbox-graph-clenaup * additional instructions and graph clenaup backend * added additinal instructions * query update * checkbox changes * Updated renaming query, refined code * updated log * correct log --------- Co-authored-by: aashipandya <[email protected]> Co-authored-by: kaustubh-darekar <[email protected]>
1 parent 8961a26 commit 3b9457c

File tree

17 files changed

+381
-275
lines changed

17 files changed

+381
-275
lines changed

backend/example.env

+1
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ LLM_MODEL_CONFIG_bedrock_claude_3_5_sonnet="model_name,aws_access_key_id,aws_sec
4343
LLM_MODEL_CONFIG_ollama_llama3="model_name,model_local_url"
4444
YOUTUBE_TRANSCRIPT_PROXY="https://user:pass@domain:port"
4545
EFFECTIVE_SEARCH_RATIO=5
46+
GRAPH_CLEANUP_MODEL="openai_gpt_4o"
4647

backend/score.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from src.graphDB_dataAccess import graphDBdataAccess
1414
from src.graph_query import get_graph_results,get_chunktext_results
1515
from src.chunkid_entities import get_entities_from_chunkids
16-
from src.post_processing import create_vector_fulltext_indexes, create_entity_embedding
16+
from src.post_processing import create_vector_fulltext_indexes, create_entity_embedding, graph_cleanup
1717
from sse_starlette.sse import EventSourceResponse
1818
from src.communities import create_communities
1919
from src.neighbours import get_neighbour_nodes
@@ -180,7 +180,8 @@ async def extract_knowledge_graph_from_file(
180180
allowedRelationship=Form(None),
181181
language=Form(None),
182182
access_token=Form(None),
183-
retry_condition=Form(None)
183+
retry_condition=Form(None),
184+
additional_instructions=Form(None)
184185
):
185186
"""
186187
Calls 'extract_graph_from_file' in a new thread to create Neo4jGraph from a
@@ -203,22 +204,22 @@ async def extract_knowledge_graph_from_file(
203204
if source_type == 'local file':
204205
merged_file_path = os.path.join(MERGED_DIR,file_name)
205206
logging.info(f'File path:{merged_file_path}')
206-
uri_latency, result = await extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, file_name, allowedNodes, allowedRelationship, retry_condition)
207+
uri_latency, result = await extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
207208

208209
elif source_type == 's3 bucket' and source_url:
209-
uri_latency, result = await extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition)
210+
uri_latency, result = await extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
210211

211212
elif source_type == 'web-url':
212-
uri_latency, result = await extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
213+
uri_latency, result = await extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
213214

214215
elif source_type == 'youtube' and source_url:
215-
uri_latency, result = await extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
216+
uri_latency, result = await extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
216217

217218
elif source_type == 'Wikipedia' and wiki_query:
218-
uri_latency, result = await extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition)
219+
uri_latency, result = await extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
219220

220221
elif source_type == 'gcs bucket' and gcs_bucket_name:
221-
uri_latency, result = await extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition)
222+
uri_latency, result = await extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions)
222223
else:
223224
return create_api_response('Failed',message='source_type is other than accepted source')
224225
extract_api_time = time.time() - start_time
@@ -337,10 +338,15 @@ async def post_processing(uri=Form(), userName=Form(), password=Form(), database
337338
await asyncio.to_thread(create_entity_embedding, graph)
338339
api_name = 'post_processing/create_entity_embedding'
339340
logging.info(f'Entity Embeddings created')
341+
342+
if "graph_cleanup" in tasks :
343+
await asyncio.to_thread(graph_cleanup, graph)
344+
api_name = 'post_processing/graph_cleanup'
345+
logging.info(f'Updated nodes and relationship labels')
340346

341347
if "enable_communities" in tasks:
342348
api_name = 'create_communities'
343-
await asyncio.to_thread(create_communities, uri, userName, password, database)
349+
await asyncio.to_thread(create_communities, uri, userName, password, database)
344350

345351
logging.info(f'created communities')
346352
graph = create_graph_database_connection(uri, userName, password, database)
@@ -350,6 +356,8 @@ async def post_processing(uri=Form(), userName=Form(), password=Form(), database
350356
if count_response:
351357
count_response = [{"filename": filename, **counts} for filename, counts in count_response.items()]
352358
logging.info(f'Updated source node with community related counts')
359+
360+
353361
end = time.time()
354362
elapsed_time = end - start
355363
json_obj = {'api_name': api_name, 'db_url': uri, 'userName':userName, 'database':database, 'tasks':tasks, 'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{elapsed_time:.2f}'}

backend/src/llm.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from langchain_community.chat_models import ChatOllama
1414
import boto3
1515
import google.auth
16+
from src.shared.constants import ADDITIONAL_INSTRUCTIONS
1617

1718
def get_llm(model: str):
1819
"""Retrieve the specified language model based on the model name."""
@@ -160,7 +161,7 @@ def get_chunk_id_as_doc_metadata(chunkId_chunkDoc_list):
160161

161162

162163
async def get_graph_document_list(
163-
llm, combined_chunk_document_list, allowedNodes, allowedRelationship
164+
llm, combined_chunk_document_list, allowedNodes, allowedRelationship, additional_instructions=None
164165
):
165166
futures = []
166167
graph_document_list = []
@@ -180,6 +181,7 @@ async def get_graph_document_list(
180181
allowed_nodes=allowedNodes,
181182
allowed_relationships=allowedRelationship,
182183
ignore_tool_usage=True,
184+
additional_instructions=ADDITIONAL_INSTRUCTIONS+ (additional_instructions if additional_instructions else "")
183185
)
184186

185187
if isinstance(llm,DiffbotGraphTransformer):
@@ -189,7 +191,7 @@ async def get_graph_document_list(
189191
return graph_document_list
190192

191193

192-
async def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship):
194+
async def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship, additional_instructions=None):
193195
try:
194196
llm, model_name = get_llm(model)
195197
combined_chunk_document_list = get_combined_chunks(chunkId_chunkDoc_list)
@@ -204,7 +206,7 @@ async def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowed
204206
allowedRelationship = allowedRelationship.split(',')
205207

206208
graph_document_list = await get_graph_document_list(
207-
llm, combined_chunk_document_list, allowedNodes, allowedRelationship
209+
llm, combined_chunk_document_list, allowedNodes, allowedRelationship, additional_instructions
208210
)
209211
return graph_document_list
210212
except Exception as e:

backend/src/main.py

+23-23
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
222222
lst_file_name.append({'fileName':obj_source_node.file_name,'fileSize':obj_source_node.file_size,'url':obj_source_node.url, 'language':obj_source_node.language, 'status':'Success'})
223223
return lst_file_name,success_count,failed_count
224224

225-
async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, retry_condition):
225+
async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
226226

227227
logging.info(f'Process file name :{fileName}')
228228
if not retry_condition:
@@ -234,11 +234,11 @@ async def extract_graph_from_file_local_file(uri, userName, password, database,
234234
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
235235
if pages==None or len(pages)==0:
236236
raise Exception(f'File content is not available for file : {file_name}')
237-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
237+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path, additional_instructions=additional_instructions)
238238
else:
239-
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition)
239+
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition, additional_instructions=additional_instructions)
240240

241-
async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition):
241+
async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
242242
if not retry_condition:
243243
if(aws_access_key_id==None or aws_secret_access_key==None):
244244
raise Exception('Please provide AWS access and secret keys')
@@ -248,48 +248,48 @@ async def extract_graph_from_file_s3(uri, userName, password, database, model, s
248248

249249
if pages==None or len(pages)==0:
250250
raise Exception(f'File content is not available for file : {file_name}')
251-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
251+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, additional_instructions=additional_instructions)
252252
else:
253-
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
253+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition, additional_instructions=additional_instructions)
254254

255-
async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
255+
async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
256256
if not retry_condition:
257257
file_name, pages = get_documents_from_web_page(source_url)
258258
if pages==None or len(pages)==0:
259259
raise Exception(f'Content is not available for given URL : {file_name}')
260-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
260+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, additional_instructions=additional_instructions)
261261
else:
262-
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
262+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition, additional_instructions=additional_instructions)
263263

264-
async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
264+
async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
265265
if not retry_condition:
266266
file_name, pages = get_documents_from_youtube(source_url)
267267

268268
if pages==None or len(pages)==0:
269269
raise Exception(f'Youtube transcript is not available for file : {file_name}')
270-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
270+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, additional_instructions=additional_instructions)
271271
else:
272-
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
272+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition, additional_instructions=additional_instructions)
273273

274-
async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition):
274+
async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
275275
if not retry_condition:
276276
file_name, pages = get_documents_from_Wikipedia(wiki_query, language)
277277
if pages==None or len(pages)==0:
278278
raise Exception(f'Wikipedia page is not available for file : {file_name}')
279-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
279+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, additional_instructions=additional_instructions)
280280
else:
281-
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition)
281+
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition, additional_instructions=additional_instructions)
282282

283-
async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition):
283+
async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition, additional_instructions):
284284
if not retry_condition:
285285
file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token)
286286
if pages==None or len(pages)==0:
287287
raise Exception(f'File content is not available for file : {file_name}')
288-
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
288+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, additional_instructions=additional_instructions)
289289
else:
290-
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
290+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition, additional_instructions=additional_instructions)
291291

292-
async def processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, retry_condition=None):
292+
async def processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, retry_condition=None, additional_instructions=None):
293293
"""
294294
Extracts a Neo4jGraph from a PDF file based on the model.
295295
@@ -381,7 +381,7 @@ async def processing_source(uri, userName, password, database, model, file_name,
381381
break
382382
else:
383383
processing_chunks_start_time = time.time()
384-
node_count,rel_count,latency_processed_chunk = await processing_chunks(selected_chunks,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship,node_count, rel_count)
384+
node_count,rel_count,latency_processed_chunk = await processing_chunks(selected_chunks,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship,node_count, rel_count, additional_instructions)
385385
processing_chunks_end_time = time.time()
386386
processing_chunks_elapsed_end_time = processing_chunks_end_time - processing_chunks_start_time
387387
logging.info(f"Time taken {update_graph_chunk_processed} chunks processed upto {select_chunks_upto} completed in {processing_chunks_elapsed_end_time:.2f} seconds for file name {file_name}")
@@ -458,7 +458,7 @@ async def processing_source(uri, userName, password, database, model, file_name,
458458
logging.error(error_message)
459459
raise Exception(error_message)
460460

461-
async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count):
461+
async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count, additional_instructions=None):
462462
#create vector index and update chunk node with embedding
463463
latency_processing_chunk = {}
464464
if graph is not None:
@@ -476,7 +476,7 @@ async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password,
476476
logging.info("Get graph document list from models")
477477

478478
start_entity_extraction = time.time()
479-
graph_documents = await get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship)
479+
graph_documents = await get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship, additional_instructions)
480480
end_entity_extraction = time.time()
481481
elapsed_entity_extraction = end_entity_extraction - start_entity_extraction
482482
logging.info(f'Time taken to extract enitities from LLM Graph Builder: {elapsed_entity_extraction:.2f} seconds')
@@ -678,7 +678,7 @@ def get_labels_and_relationtypes(graph):
678678
return label order by label limit 100 } as labels,
679679
collect {
680680
CALL db.relationshipTypes() yield relationshipType as type
681-
WHERE NOT type IN ['PART_OF', 'NEXT_CHUNK', 'HAS_ENTITY', '_Bloom_Perspective_','FIRST_CHUNK']
681+
WHERE NOT type IN ['PART_OF', 'NEXT_CHUNK', 'HAS_ENTITY', '_Bloom_Perspective_','FIRST_CHUNK','SIMILAR','IN_COMMUNITY','PARENT_COMMUNITY']
682682
return type order by type LIMIT 100 } as relationshipTypes
683683
"""
684684
graphDb_data_Access = graphDBdataAccess(graph)

0 commit comments

Comments
 (0)