46
46
47
47
opensearch_secret_id = os .environ ['OPENSEARCH_SECRET_ID' ]
48
48
bucket_name = os .environ ['OUTPUT_BUCKET' ]
49
- # TODO: add input_bucket for csv|images
50
49
opensearch_index = os .environ ['OPENSEARCH_INDEX' ]
51
50
opensearch_domain = os .environ ['OPENSEARCH_DOMAIN_ENDPOINT' ]
52
51
opensearch_api_name = os .environ ['OPENSEARCH_API_NAME' ]
@@ -112,9 +111,14 @@ def process_documents_in_es(index_exists, shards, http_auth,model_id):
112
111
def process_documents_in_aoss (index_exists , shards , http_auth ,model_id ):
113
112
# Reference: https://python.langchain.com/docs/integrations/vectorstores/opensearch#using-aoss-amazon-opensearch-service-serverless
114
113
bedrock_client = boto3 .client ('bedrock-runtime' )
114
+ # if(model_id=='amazon.titan-embed-image-v1'):
115
+ # print(f'image embeddings shards[0] {shards}')
116
+ # embeddings = image_loader.BedrockEmbeddings_image(docs=shards[0], model_id=model_id,)
117
+ # else:
118
+ # embeddings = BedrockEmbeddings(client=bedrock_client,model_id=model_id)
115
119
embeddings = BedrockEmbeddings (client = bedrock_client ,model_id = model_id )
116
-
117
- print (f' Bedrock embeddings model id :: { embeddings . model_id } ' )
120
+
121
+ print (f' check index with :: { shards [ 0 ] } ' )
118
122
119
123
shard_start_index = 0
120
124
if index_exists is False :
@@ -132,13 +136,14 @@ def process_documents_in_aoss(index_exists, shards, http_auth,model_id):
132
136
)
133
137
# we now need to start the loop below for the second shard
134
138
shard_start_index = 1
135
- print (f'statrt processing shard' )
136
139
for shard in shards [shard_start_index :]:
140
+ print (f'processing shard index { shard_start_index } ' )
137
141
results = process_shard (shard = shard ,
138
142
os_index_name = opensearch_index ,
139
143
os_domain_ep = opensearch_domain ,
140
144
os_http_auth = http_auth ,
141
145
model_id = model_id )
146
+
142
147
143
148
@logger .inject_lambda_context (log_event = True )
144
149
@tracer .capture_lambda_handler
@@ -183,46 +188,42 @@ def handler(event, context: LambdaContext) -> dict:
183
188
# Images are stored in s3 with presigned url, embeddings is not required.
184
189
185
190
for transformed_file in event :
186
- print (f" staus :: { transformed_file ['s3_transformer_result' ]['Payload' ]['status' ]} " )
187
191
if transformed_file ['s3_transformer_result' ]['Payload' ]['status' ] == 'File transformed' :
188
192
filename = transformed_file ['s3_transformer_result' ]['Payload' ]['name' ]
189
- name , extension = os .path .splitext (filename )
190
- print (f" the name { name } and extension { extension } " )
191
- # TODO: check file format , if pdf then read raw text from output bucket and update docs[]
192
- # if csv|image then read file from input bucket using langchain document loader and update docs[]
193
+ original_filename = transformed_file ['name' ]
194
+ name , extension = os .path .splitext (original_filename )
195
+ print (f" the original_filename { name } and extension { extension } " )
193
196
if (extension == '.pdf' ):
194
197
loader = S3TxtFileLoaderInMemory (bucket_name , filename )
195
198
sub_docs = loader .load ()
196
199
for doc in sub_docs :
197
- doc .metadata ['source' ] = filename
200
+ doc .metadata ['source' ] = original_filename
198
201
docs .extend (sub_docs )
202
+ process_text_embeddings (docs ,modelid ,http_auth ,files ,job_id )
199
203
if (extension == '.jpg' or extension == '.jpeg' or extension == '.png' or extension == '.svg' ):
200
- # Try adding text to document
201
- #image_detal_file is created by aws rekognition
202
- img_load = image_loader (bucket_name , filename ,f"{ name } .txt" )
203
- sub_docs = img_load .load ()
204
- for doc in sub_docs :
205
- doc .metadata ['source' ] = filename
206
- docs .extend (sub_docs )
207
- url = img_load .get_presigned_url ()
208
- print (f" source :: { filename } " )
209
- os_document = img_load .prepare_document_for_direct_load ()
210
-
204
+ img_load = image_loader (bucket_name , filename ,f"{ name } .txt" ,modelid )
205
+ docs = img_load .load ()
206
+ url = img_load .get_presigned_url ()
207
+ for doc in docs :
208
+ doc .metadata ['image_path' ] = url
209
+ process_image_embeddings (docs ,modelid ,http_auth ,files ,job_id ,url )
211
210
212
211
if not docs :
213
212
return {
214
213
'status' :'nothing to ingest'
215
214
}
216
215
216
+
217
+
218
+ def process_text_embeddings (docs ,modelid ,http_auth ,files ,job_id ):
219
+ logger .info ("process image embeddings with chunks" )
217
220
text_splitter = RecursiveCharacterTextSplitter (
218
221
# Set a really small chunk size, just to show.
219
222
chunk_size = CHUNCK_SIZE_DOC_SPLIT ,
220
223
chunk_overlap = OVERLAP_FOR_DOC_SPLIT ,
221
224
length_function = len ,
222
225
)
223
226
224
- print ('Documents loaded locally' )
225
-
226
227
# add a custom metadata field, such as timestamp
227
228
# we can augment data here probably (PII present ? ...)
228
229
for doc in docs :
@@ -233,14 +234,11 @@ def handler(event, context: LambdaContext) -> dict:
233
234
234
235
db_shards = (len (chunks ) // MAX_OS_DOCS_PER_PUT ) + 1
235
236
shards = np .array_split (chunks , db_shards )
236
-
237
237
# first check if index exists, if it does then call the add_documents function
238
238
# otherwise call the from_documents function which would first create the index
239
239
# and then do a bulk add. Both add_documents and from_documents do a bulk add
240
240
# but it is important to call from_documents first so that the index is created
241
241
# correctly for K-NN
242
-
243
- print (f'check if index exists shards' )
244
242
try :
245
243
index_exists = check_if_index_exists (opensearch_index ,
246
244
aws_region ,
@@ -254,19 +252,51 @@ def handler(event, context: LambdaContext) -> dict:
254
252
return {
255
253
'status' :'failed'
256
254
}
257
-
258
- print (f'job_id :: { job_id } ' )
259
- if (job_id == "101" ):
260
- print (f'running for job_id 101, use os directly' )
261
- create_index_for_image (os_document )
262
- else :
263
- print (f'Loading chunks into vector store ... using { db_shards } shards' )
264
- if opensearch_api_name == "es" :
255
+
256
+ if opensearch_api_name == "es" :
265
257
process_documents_in_es (index_exists , shards , http_auth ,modelid )
266
- elif opensearch_api_name == "aoss" :
258
+ elif opensearch_api_name == "aoss" :
267
259
process_documents_in_aoss (index_exists , shards , http_auth ,modelid )
268
260
261
+ for file in files :
262
+ if file ['status' ] == 'File transformed' :
263
+ file ['status' ] = 'Ingested'
264
+ else :
265
+ file ['status' ] = 'Error_' + file ['status' ]
266
+ updateIngestionJobStatus ({'jobid' : job_id , 'files' : files })
267
+
268
+ return {
269
+ 'status' :'succeed'
270
+ }
271
+
272
+ def process_image_embeddings (docs ,modelid ,http_auth ,files ,job_id ,url ):
273
+ logger .info ("process image embeddings" )
274
+ print (f' docs :: { docs } ' )
275
+
276
+ for doc in docs :
277
+ doc .metadata ['timestamp' ] = time .time ()
278
+ doc .metadata ['embeddings_model' ] = modelid
279
+
280
+ shards = np .array_split (docs ,1 )
281
+
282
+ try :
283
+ index_exists = check_if_index_exists (opensearch_index ,
284
+ aws_region ,
285
+ opensearch_domain ,
286
+ http_auth )
287
+ except Exception as e :
288
+ logger .exception (f'Failed to verify the existence of the os index : { e } ' )
289
+ for file in files :
290
+ file ['status' ] = 'Error - internal os error cannot connect'
291
+ updateIngestionJobStatus ({'jobid' : job_id , 'files' : files })
292
+ return {
293
+ 'status' :'failed'
294
+ }
269
295
296
+ if opensearch_api_name == "es" :
297
+ process_documents_in_es (index_exists , shards , http_auth ,modelid )
298
+ elif opensearch_api_name == "aoss" :
299
+ process_documents_in_aoss (index_exists , shards , http_auth ,modelid )
270
300
271
301
for file in files :
272
302
if file ['status' ] == 'File transformed' :
0 commit comments