Skip to content

Commit 2d39b8b

Browse files
authored
Async way to create entities from multiple chunks (#788)
* LLMs with latest langchain dev libraries * conflict resolved * all llm models with latest library changes * async way to get graph documents * indentation correction
1 parent 1546ddd commit 2d39b8b

File tree

3 files changed

+44
-50
lines changed

3 files changed

+44
-50
lines changed

backend/score.py

+6-12
Original file line numberDiff line numberDiff line change
@@ -178,28 +178,22 @@ async def extract_knowledge_graph_from_file(
178178
if source_type == 'local file':
179179
merged_file_path = os.path.join(MERGED_DIR,file_name)
180180
logging.info(f'File path:{merged_file_path}')
181-
result = await asyncio.to_thread(
182-
extract_graph_from_file_local_file, uri, userName, password, database, model, merged_file_path, file_name, allowedNodes, allowedRelationship, retry_condition)
181+
result = await extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, file_name, allowedNodes, allowedRelationship, retry_condition)
183182

184183
elif source_type == 's3 bucket' and source_url:
185-
result = await asyncio.to_thread(
186-
extract_graph_from_file_s3, uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition)
184+
result = await extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition)
187185

188186
elif source_type == 'web-url':
189-
result = await asyncio.to_thread(
190-
extract_graph_from_web_page, uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
187+
result = await extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
191188

192189
elif source_type == 'youtube' and source_url:
193-
result = await asyncio.to_thread(
194-
extract_graph_from_file_youtube, uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
190+
result = await extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition)
195191

196192
elif source_type == 'Wikipedia' and wiki_query:
197-
result = await asyncio.to_thread(
198-
extract_graph_from_file_Wikipedia, uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition)
193+
result = await extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition)
199194

200195
elif source_type == 'gcs bucket' and gcs_bucket_name:
201-
result = await asyncio.to_thread(
202-
extract_graph_from_file_gcs, uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition)
196+
result = await extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition)
203197
else:
204198
return create_api_response('Failed',message='source_type is other than accepted source')
205199
extract_api_time = time.time() - start_time

backend/src/llm.py

+16-16
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def get_combined_chunks(chunkId_chunkDoc_list):
141141
return combined_chunk_document_list
142142

143143

144-
def get_graph_document_list(
144+
async def get_graph_document_list(
145145
llm, combined_chunk_document_list, allowedNodes, allowedRelationship
146146
):
147147
futures = []
@@ -165,23 +165,23 @@ def get_graph_document_list(
165165
ignore_tool_usage=True,
166166
#prompt = ChatPromptTemplate.from_messages(["system",PROMPT_TO_ALL_LLMs])
167167
)
168-
with ThreadPoolExecutor(max_workers=10) as executor:
169-
for chunk in combined_chunk_document_list:
170-
chunk_doc = Document(
171-
page_content=chunk.page_content.encode("utf-8"), metadata=chunk.metadata
172-
)
173-
futures.append(
174-
executor.submit(llm_transformer.convert_to_graph_documents, [chunk_doc])
175-
)
176-
177-
for i, future in enumerate(concurrent.futures.as_completed(futures)):
178-
graph_document = future.result()
179-
graph_document_list.append(graph_document[0])
180-
168+
# with ThreadPoolExecutor(max_workers=10) as executor:
169+
# for chunk in combined_chunk_document_list:
170+
# chunk_doc = Document(
171+
# page_content=chunk.page_content.encode("utf-8"), metadata=chunk.metadata
172+
# )
173+
# futures.append(
174+
# executor.submit(llm_transformer.convert_to_graph_documents, [chunk_doc])
175+
# )
176+
177+
# for i, future in enumerate(concurrent.futures.as_completed(futures)):
178+
# graph_document = future.result()
179+
# graph_document_list.append(graph_document[0])
180+
graph_document_list = await llm_transformer.aconvert_to_graph_documents(combined_chunk_document_list)
181181
return graph_document_list
182182

183183

184-
def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship):
184+
async def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship):
185185

186186
llm, model_name = get_llm(model)
187187
combined_chunk_document_list = get_combined_chunks(chunkId_chunkDoc_list)
@@ -195,7 +195,7 @@ def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelati
195195
else:
196196
allowedRelationship = allowedRelationship.split(',')
197197

198-
graph_document_list = get_graph_document_list(
198+
graph_document_list = await get_graph_document_list(
199199
llm, combined_chunk_document_list, allowedNodes, allowedRelationship
200200
)
201201
return graph_document_list

backend/src/main.py

+22-22
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
208208
lst_file_name.append({'fileName':obj_source_node.file_name,'fileSize':obj_source_node.file_size,'url':obj_source_node.url, 'language':obj_source_node.language, 'status':'Success'})
209209
return lst_file_name,success_count,failed_count
210210

211-
def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, retry_condition):
211+
async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, retry_condition):
212212

213213
logging.info(f'Process file name :{fileName}')
214214
if retry_condition is None:
@@ -220,11 +220,11 @@ def extract_graph_from_file_local_file(uri, userName, password, database, model,
220220
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
221221
if pages==None or len(pages)==0:
222222
raise Exception(f'File content is not available for file : {file_name}')
223-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
223+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
224224
else:
225-
return processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition)
225+
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition)
226226

227-
def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition):
227+
async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition):
228228
if retry_condition is None:
229229
if(aws_access_key_id==None or aws_secret_access_key==None):
230230
raise Exception('Please provide AWS access and secret keys')
@@ -234,49 +234,49 @@ def extract_graph_from_file_s3(uri, userName, password, database, model, source_
234234

235235
if pages==None or len(pages)==0:
236236
raise Exception(f'File content is not available for file : {file_name}')
237-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
237+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
238238
else:
239-
return processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
239+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
240240

241-
def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
241+
async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
242242
if retry_condition is None:
243243
file_name, pages = get_documents_from_web_page(source_url)
244244

245245
if pages==None or len(pages)==0:
246246
raise Exception(f'Content is not available for given URL : {file_name}')
247-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
247+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
248248
else:
249-
return processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
249+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
250250

251-
def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
251+
async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, retry_condition):
252252
if retry_condition is None:
253253
file_name, pages = get_documents_from_youtube(source_url)
254254

255255
if pages==None or len(pages)==0:
256256
raise Exception(f'Youtube transcript is not available for file : {file_name}')
257-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
257+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
258258
else:
259-
return processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
259+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
260260

261-
def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition):
261+
async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, retry_condition):
262262
if retry_condition is None:
263263
file_name, pages = get_documents_from_Wikipedia(wiki_query, language)
264264
if pages==None or len(pages)==0:
265265
raise Exception(f'Wikipedia page is not available for file : {file_name}')
266-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
266+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
267267
else:
268-
return processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition)
268+
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition)
269269

270-
def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition):
270+
async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, retry_condition):
271271
if retry_condition is None:
272272
file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token)
273273
if pages==None or len(pages)==0:
274274
raise Exception(f'File content is not available for file : {file_name}')
275-
return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
275+
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
276276
else:
277-
return processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
277+
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
278278

279-
def processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, retry_condition=None):
279+
async def processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, retry_condition=None):
280280
"""
281281
Extracts a Neo4jGraph from a PDF file based on the model.
282282
@@ -366,7 +366,7 @@ def processing_source(uri, userName, password, database, model, file_name, pages
366366
break
367367
else:
368368
processing_chunks_start_time = time.time()
369-
node_count,rel_count,latency_processed_chunk = processing_chunks(selected_chunks,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship,node_count, rel_count)
369+
node_count,rel_count,latency_processed_chunk = await processing_chunks(selected_chunks,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship,node_count, rel_count)
370370
processing_chunks_end_time = time.time()
371371
processing_chunks_elapsed_end_time = processing_chunks_end_time - processing_chunks_start_time
372372
logging.info(f"Time taken {update_graph_chunk_processed} chunks processed upto {select_chunks_upto} completed in {processing_chunks_elapsed_end_time:.2f} seconds for file name {file_name}")
@@ -439,7 +439,7 @@ def processing_source(uri, userName, password, database, model, file_name, pages
439439
logging.error(error_message)
440440
raise Exception(error_message)
441441

442-
def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count):
442+
async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count):
443443
#create vector index and update chunk node with embedding
444444
if graph is not None:
445445
if graph._driver._closed:
@@ -456,7 +456,7 @@ def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, datab
456456
logging.info("Get graph document list from models")
457457

458458
start_entity_extraction = time.time()
459-
graph_documents = get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship)
459+
graph_documents = await get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship)
460460
end_entity_extraction = time.time()
461461
elapsed_entity_extraction = end_entity_extraction - start_entity_extraction
462462
logging.info(f'Time taken to extract enitities from LLM Graph Builder: {elapsed_entity_extraction:.2f} seconds')

0 commit comments

Comments
 (0)