diff --git a/.gitignore b/.gitignore index e9f6fe9f2..09f4f040b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,7 @@ __pycache__/ *.py[cod] *$py.class - +.vennv # C extensions *.so /backend/graph @@ -170,4 +170,6 @@ google-cloud-cli-469.0.0-linux-x86_64.tar.gz /backend/chunks google-cloud-cli-linux-x86_64.tar.gz .vennv -newenv \ No newline at end of file +newenv +files + diff --git a/backend/src/graphDB_dataAccess.py b/backend/src/graphDB_dataAccess.py index 738665f88..c77a1e773 100644 --- a/backend/src/graphDB_dataAccess.py +++ b/backend/src/graphDB_dataAccess.py @@ -185,7 +185,7 @@ def execute_query(self, query, param=None): def get_current_status_document_node(self, file_name): query = """ - MATCH(d:Document {fileName : $file_name}) RETURN d.stats AS Status , d.processingTime AS processingTime, + MATCH(d:Document {fileName : $file_name}) RETURN d.status AS Status , d.processingTime AS processingTime, d.nodeCount AS nodeCount, d.model as model, d.relationshipCount as relationshipCount, d.total_pages AS total_pages, d.total_chunks AS total_chunks , d.fileSize as fileSize, d.is_cancelled as is_cancelled, d.processed_chunk as processed_chunk, d.fileSource as fileSource diff --git a/docker-compose.yml b/docker-compose.yml index d5e545388..cf0f95f5e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,6 +61,7 @@ services: - CHUNK_SIZE=${CHUNK_SIZE-5242880} - ENV=${ENV-DEV} - CHAT_MODES=${CHAT_MODES-""} + - BATCH_SIZE=${BATCH_SIZE-2} volumes: - ./frontend:/app - /app/node_modules diff --git a/frontend/Dockerfile b/frontend/Dockerfile index 3bf2e9409..ea9870ea4 100644 --- a/frontend/Dockerfile +++ b/frontend/Dockerfile @@ -12,10 +12,10 @@ ARG LARGE_FILE_SIZE=5242880 ARG CHUNK_SIZE=5242880 ARG CHAT_MODES="" ARG ENV="DEV" +ARG BATCH_SIZE=2 WORKDIR /app COPY package.json yarn.lock ./ -RUN yarn add @neo4j-nvl/base @neo4j-nvl/react RUN yarn install COPY . ./ RUN BACKEND_API_URL=$BACKEND_API_URL \ @@ -28,6 +28,7 @@ RUN BACKEND_API_URL=$BACKEND_API_URL \ ENV=$ENV \ LARGE_FILE_SIZE=${LARGE_FILE_SIZE} \ CHAT_MODES=$CHAT_MODES \ + BATCH_SIZE=$BATCH_SIZE \ yarn run build # Step 2: Serve the application using Nginx diff --git a/frontend/example.env b/frontend/example.env index 0c11aa4ef..d0945d785 100644 --- a/frontend/example.env +++ b/frontend/example.env @@ -8,4 +8,5 @@ TIME_PER_PAGE=50 CHUNK_SIZE=5242880 LARGE_FILE_SIZE=5242880 GOOGLE_CLIENT_ID="" -CHAT_MODES="" \ No newline at end of file +CHAT_MODES="" +BATCH_SIZE=2 \ No newline at end of file diff --git a/frontend/src/components/Content.tsx b/frontend/src/components/Content.tsx index bf875ece6..b486007ac 100644 --- a/frontend/src/components/Content.tsx +++ b/frontend/src/components/Content.tsx @@ -19,7 +19,7 @@ import { postProcessing } from '../services/PostProcessing'; import { triggerStatusUpdateAPI } from '../services/ServerSideStatusUpdateAPI'; import useServerSideEvent from '../hooks/useSse'; import { useSearchParams } from 'react-router-dom'; -import { buttonCaptions, defaultLLM, largeFileSize, llms, tooltips } from '../utils/Constants'; +import { batchSize, buttonCaptions, defaultLLM, largeFileSize, llms, tooltips } from '../utils/Constants'; import ButtonWithToolTip from './UI/ButtonWithToolTip'; import connectAPI from '../services/ConnectAPI'; import DropdownComponent from './Dropdown'; @@ -33,6 +33,7 @@ import GraphEnhancementDialog from './Popups/GraphEnhancementDialog'; import { tokens } from '@neo4j-ndl/base'; const ConnectionModal = lazy(() => import('./Popups/ConnectionModal/ConnectionModal')); const ConfirmationDialog = lazy(() => import('./Popups/LargeFilePopUp/ConfirmationDialog')); +let afterFirstRender = false; const Content: React.FC = ({ isLeftExpanded, @@ -70,6 +71,9 @@ const Content: React.FC = ({ setRowSelection, setSelectedRels, postProcessingTasks, + queue, + processedCount, + setProcessedCount, } = useFileContext(); const [viewPoint, setViewPoint] = useState<'tableView' | 'showGraphView' | 'chatInfoView'>('tableView'); const [showDeletePopUp, setshowDeletePopUp] = useState(false); @@ -137,13 +141,29 @@ const Content: React.FC = ({ }); }, [model]); + useEffect(() => { + if (afterFirstRender) { + localStorage.setItem('processedCount', JSON.stringify({ db: userCredentials?.uri, count: processedCount })); + } + if (processedCount == batchSize) { + handleGenerateGraph([]); + } + }, [processedCount, userCredentials]); + + useEffect(() => { + if (afterFirstRender) { + localStorage.setItem('waitingQueue', JSON.stringify({ db: userCredentials?.uri, queue: queue.items })); + } + afterFirstRender = true; + }, [queue.items.length, userCredentials]); + const handleDropdownChange = (selectedOption: OptionType | null | void) => { if (selectedOption?.value) { setModel(selectedOption?.value); } }; - const extractData = async (uid: string, isselectedRows = false) => { + const extractData = async (uid: string, isselectedRows = false, filesTobeProcess: CustomFile[]) => { if (!isselectedRows) { const fileItem = filesData.find((f) => f.id == uid); if (fileItem) { @@ -151,7 +171,7 @@ const Content: React.FC = ({ await extractHandler(fileItem, uid); } } else { - const fileItem = childRef.current?.getSelectedRows().find((f) => f.id == uid); + const fileItem = filesTobeProcess.find((f) => f.id == uid); if (fileItem) { setextractLoading(true); await extractHandler(fileItem, uid); @@ -189,7 +209,15 @@ const Content: React.FC = ({ userCredentials?.userName, userCredentials?.password, userCredentials?.database, - updateStatusForLargeFiles + updateStatusForLargeFiles, + () => { + setProcessedCount((prev) => { + if (prev == 2) { + return 1; + } + return prev + 1; + }); + } ); } @@ -258,33 +286,174 @@ const Content: React.FC = ({ } }; - const handleGenerateGraph = (allowLargeFiles: boolean, selectedFilesFromAllfiles: CustomFile[]) => { + const triggerBatchProcessing = ( + batch: CustomFile[], + selectedFiles: CustomFile[], + isSelectedFiles: boolean, + newCheck: boolean + ) => { const data = []; - if (selectedfileslength && allowLargeFiles) { - for (let i = 0; i < selectedfileslength; i++) { - const row = childRef.current?.getSelectedRows()[i]; - if (row?.status === 'New') { - data.push(extractData(row.id, true)); + setalertDetails({ + showAlert: true, + alertMessage: `Processing ${batch.length} files at a time.`, + alertType: 'info', + }) + for (let i = 0; i < batch.length; i++) { + if (newCheck) { + if (batch[i]?.status === 'New') { + data.push(extractData(batch[i].id, isSelectedFiles, selectedFiles as CustomFile[])); + } + } else { + data.push(extractData(batch[i].id, isSelectedFiles, selectedFiles as CustomFile[])); + } + } + return data; + }; + + const addFilesToQueue = (remainingFiles: CustomFile[]) => { + remainingFiles.forEach((f) => { + setFilesData((prev) => + prev.map((pf) => { + if (pf.id === f.id) { + return { + ...pf, + status: 'Waiting', + }; + } + return pf; + }) + ); + queue.enqueue(f); + }); + }; + + const scheduleBatchWiseProcess = ( + selectedRows: CustomFile[], + selectedNewFiles: CustomFile[], + isSelectedFiles: boolean + ) => { + let data = []; + if (queue.size() > batchSize) { + const batch = queue.items.slice(0, batchSize); + data = triggerBatchProcessing(batch, selectedRows as CustomFile[], isSelectedFiles, false); + } else { + let mergedfiles = [...queue.items, ...(selectedNewFiles as CustomFile[])]; + let filesToProcess: CustomFile[] = []; + if (mergedfiles.length > batchSize) { + filesToProcess = mergedfiles.slice(0, batchSize); + const remainingFiles = [...(mergedfiles as CustomFile[])].splice(batchSize); + addFilesToQueue(remainingFiles); + } else { + filesToProcess = mergedfiles; + } + data = triggerBatchProcessing(filesToProcess, selectedRows as CustomFile[], isSelectedFiles, false); + } + return data; + }; + + function getFilesToProcess( + processingFilesCount: number, + batchFiles: CustomFile[], + newFilesFromSelectedFiles: CustomFile[] + ) { + let filesToProcess: CustomFile[] = []; + if (processingFilesCount + batchFiles.length > batchSize) { + filesToProcess = batchFiles.slice(0, 1); + const remainingFiles = [...(newFilesFromSelectedFiles as CustomFile[])] + .splice(batchSize) + .concat(batchFiles.splice(1)); + addFilesToQueue(remainingFiles); + } else { + filesToProcess = batchFiles; + const remainingFiles = [...(newFilesFromSelectedFiles as CustomFile[])].splice(batchSize); + addFilesToQueue(remainingFiles); + } + return filesToProcess; + } + + /** + *@param selectedFilesFromAllfiles iles to process in two ways one from selected files from table other way all new files from table. + *we will check whether queue is empty or not if queue is not empty we process queued files. + *if queue is empty we check whether selected files count is greater than batch size we slice the selected till batch size and process them remaining files are pushed to queue. + *if selectedfiles count is less than batch size we check whether the sum of selectedfiles count and processing files count is greater than the batch size. + *if it is greater than batch size we slice the selectedfiles to the substraction of batchsize and selectedfileslength we process them remaining files are pushed to queue. + *if sum of selectedfiles count and processing files count is smaller than the batch size we process those + */ + const handleGenerateGraph = (selectedFilesFromAllfiles: CustomFile[]) => { + let data = []; + const processingFilesCount = filesData.filter((f) => f.status === 'Processing').length; + const newfiles = childRef.current?.getSelectedRows().filter((f) => f.status === 'New'); + if (selectedfileslength && processingFilesCount < batchSize) { + const selectedRows = childRef.current?.getSelectedRows(); + const selectedNewFiles = newfiles; + if (!queue.isEmpty()) { + data = scheduleBatchWiseProcess(selectedRows as CustomFile[], selectedNewFiles as CustomFile[], true); + } else if (selectedfileslength > batchSize) { + const filesToProcess = selectedNewFiles?.slice(0, batchSize) as CustomFile[]; + data = triggerBatchProcessing(filesToProcess, selectedRows as CustomFile[], true, false); + const remainingFiles = [...(selectedNewFiles as CustomFile[])].splice(batchSize); + addFilesToQueue(remainingFiles); + } else { + let filesTobeProcess = childRef.current?.getSelectedRows() as CustomFile[]; + if (selectedfileslength + processingFilesCount > batchSize) { + filesTobeProcess = childRef.current + ?.getSelectedRows() + .slice(0, batchSize - selectedfileslength) as CustomFile[]; + const remainingFiles = [...(childRef.current?.getSelectedRows() as CustomFile[])].splice(1); + addFilesToQueue(remainingFiles); } + data = triggerBatchProcessing(filesTobeProcess, selectedRows as CustomFile[], true, true); } Promise.allSettled(data).then(async (_) => { setextractLoading(false); await postProcessing(userCredentials as UserCredentials, postProcessingTasks); }); - } else if (selectedFilesFromAllfiles.length && allowLargeFiles) { - // @ts-ignore - for (let i = 0; i < selectedFilesFromAllfiles.length; i++) { - if (selectedFilesFromAllfiles[i]?.status === 'New') { - data.push(extractData(selectedFilesFromAllfiles[i].id as string)); - } + } else if (selectedFilesFromAllfiles.length && processingFilesCount < batchSize) { + const newFilesFromSelectedFiles = selectedFilesFromAllfiles.filter((f) => f.status === 'New'); + if (!queue.isEmpty()) { + data = scheduleBatchWiseProcess(selectedFilesFromAllfiles, newFilesFromSelectedFiles, false); + } else if (selectedFilesFromAllfiles.length > batchSize) { + const batchFiles = newFilesFromSelectedFiles.slice(0, batchSize) as CustomFile[]; + const filesToProcess = getFilesToProcess(processingFilesCount, batchFiles, newFilesFromSelectedFiles); + data = triggerBatchProcessing(filesToProcess, selectedFilesFromAllfiles as CustomFile[], false, false); + } else { + data = triggerBatchProcessing( + selectedFilesFromAllfiles, + selectedFilesFromAllfiles as CustomFile[], + false, + true + ); + Promise.allSettled(data).then(async (_) => { + setextractLoading(false); + await postProcessing(userCredentials as UserCredentials, postProcessingTasks); + }); + } + } else { + const selectedNewFiles = newfiles; + addFilesToQueue(selectedNewFiles as CustomFile[]); + } + }; + + function processWaitingFilesOnRefresh() { + let data = []; + const processingFilesCount = filesData.filter((f) => f.status === 'Processing').length; + + if (!queue.isEmpty() && processingFilesCount < batchSize) { + if (queue.size() > batchSize) { + const batch = queue.items.slice(0, batchSize); + data = triggerBatchProcessing(batch, queue.items as CustomFile[], true, false); + } else { + data = triggerBatchProcessing(queue.items, queue.items as CustomFile[], true, false); } Promise.allSettled(data).then(async (_) => { setextractLoading(false); await postProcessing(userCredentials as UserCredentials, postProcessingTasks); }); + } else { + const selectedNewFiles = childRef.current?.getSelectedRows().filter((f) => f.status === 'New'); + addFilesToQueue(selectedNewFiles as CustomFile[]); } - }; - + } const handleClose = () => { setalertDetails((prev) => ({ ...prev, showAlert: false, alertMessage: '' })); }; @@ -315,6 +484,8 @@ const Content: React.FC = ({ }; const disconnect = () => { + queue.clear(); + setProcessedCount(0); setConnectionStatus(false); localStorage.removeItem('password'); setUserCredentials({ uri: '', password: '', userName: '', database: '' }); @@ -337,7 +508,10 @@ const Content: React.FC = ({ [childRef.current?.getSelectedRows()] ); - const dropdowncheck = useMemo(() => !filesData.some((f) => f.status === 'New'), [filesData]); + const dropdowncheck = useMemo( + () => !filesData.some((f) => f.status === 'New' || f.status === 'Waiting'), + [filesData] + ); const disableCheck = useMemo( () => (!selectedfileslength ? dropdowncheck : !newFilecheck), @@ -372,6 +546,8 @@ const Content: React.FC = ({ childRef.current?.getSelectedRows() as CustomFile[], deleteEntities ); + queue.clear(); + setProcessedCount(0); setRowSelection({}); setdeleteLoading(false); if (response.data.status == 'Success') { @@ -460,17 +636,19 @@ const Content: React.FC = ({ let selectedLargeFiles: CustomFile[] = []; childRef.current?.getSelectedRows().forEach((f) => { const parsedData: CustomFile = f; - if (parsedData.fileSource === 'local file') { - if (typeof parsedData.size === 'number' && parsedData.status === 'New' && parsedData.size > largeFileSize) { - selectedLargeFiles.push(parsedData); - } + if ( + parsedData.fileSource === 'local file' && + typeof parsedData.size === 'number' && + parsedData.status === 'New' && + parsedData.size > largeFileSize + ) { + selectedLargeFiles.push(parsedData); } }); if (selectedLargeFiles.length) { setshowConfirmationModal(true); - handleGenerateGraph(false, []); } else { - handleGenerateGraph(true, filesData); + handleGenerateGraph(filesData); } } else if (filesData.length) { const largefiles = filesData.filter((f) => { @@ -489,9 +667,8 @@ const Content: React.FC = ({ setRowSelection(stringified); if (largefiles.length) { setshowConfirmationModal(true); - handleGenerateGraph(false, []); } else { - handleGenerateGraph(true, filesData); + handleGenerateGraph(filesData); } } }; @@ -623,6 +800,7 @@ const Content: React.FC = ({ setViewPoint('tableView'); }} ref={childRef} + handleGenerateGraph={processWaitingFilesOnRefresh} > ((props, ref) => { const { isExpanded, connectionStatus, setConnectionStatus, onInspect } = props; - const { filesData, setFilesData, model, rowSelection, setRowSelection, setSelectedRows } = useFileContext(); + const { filesData, setFilesData, model, rowSelection, setRowSelection, setSelectedRows, setProcessedCount, queue } = + useFileContext(); const { userCredentials } = useCredentials(); const columnHelper = createColumnHelper(); const [columnFilters, setColumnFilters] = useState([]); @@ -116,7 +117,10 @@ const FileTable = forwardRef((props, ref) => { {...{ checked: row.getIsSelected(), disabled: - !row.getCanSelect() || row.original.status == 'Uploading' || row.original.status === 'Processing', + !row.getCanSelect() || + row.original.status == 'Uploading' || + row.original.status === 'Processing' || + row.original.status === 'Waiting', indeterminate: row.getIsSomeSelected(), onChange: row.getToggleSelectedHandler(), }} @@ -556,6 +560,9 @@ const FileTable = forwardRef((props, ref) => { }, [filesData.length]); useEffect(() => { + const waitingQueue: CustomFile[] = JSON.parse( + localStorage.getItem('waitingQueue') ?? JSON.stringify({ queue: [] }) + ).queue; const fetchFiles = async () => { try { setIsLoading(true); @@ -563,11 +570,29 @@ const FileTable = forwardRef((props, ref) => { if (!res.data) { throw new Error('Please check backend connection'); } + const stringified = waitingQueue.reduce((accu, f) => { + const key = f.id; + // @ts-ignore + accu[key] = true; + return accu; + }, {}); + setRowSelection(stringified); if (res.data.status !== 'Failed') { const prefiles: CustomFile[] = []; if (res.data.data.length) { res.data.data.forEach((item: SourceNode) => { if (item.fileName != undefined && item.fileName.length) { + const waitingFile = + waitingQueue.length && waitingQueue.find((f: CustomFile) => f.name === item.fileName); + if (waitingFile && item.status === 'Completed') { + setProcessedCount((prev) => { + if (prev === 2) { + return 1; + } + return prev + 1; + }); + queue.remove(item.fileName); + } prefiles.push({ name: item?.fileName, size: item?.fileSize ?? 0, @@ -577,20 +602,22 @@ const FileTable = forwardRef((props, ref) => { NodesCount: item?.nodeCount ?? 0, processing: item?.processingTime ?? 'None', relationshipCount: item?.relationshipCount ?? 0, - status: - item?.fileSource === 's3 bucket' && localStorage.getItem('accesskey') === item?.awsAccessKeyId - ? item?.status - : item?.fileSource === 'local file' - ? item?.status - : item?.status === 'Completed' || item.status === 'Failed' - ? item?.status - : item?.fileSource == 'Wikipedia' || - item?.fileSource == 'youtube' || - item?.fileSource == 'gcs bucket' - ? item?.status - : 'N/A', + status: waitingFile + ? 'Waiting' + : item?.fileSource === 's3 bucket' && localStorage.getItem('accesskey') === item?.awsAccessKeyId + ? item?.status + : item?.fileSource === 'local file' + ? item?.status + : item?.status === 'Completed' || item.status === 'Failed' + ? item?.status + : item?.fileSource === 'Wikipedia' || + item?.fileSource === 'youtube' || + item?.fileSource === 'gcs bucket' || + item?.fileSource === 'web-url' + ? item?.status + : 'N/A', model: item?.model ?? model, - id: uuidv4(), + id: !waitingFile ? uuidv4() : waitingFile.id, source_url: item?.url != 'None' && item?.url != '' ? item.url : '', fileSource: item?.fileSource ?? 'None', gcsBucket: item?.gcsBucket, @@ -632,6 +659,12 @@ const FileTable = forwardRef((props, ref) => { ).catch((error: AxiosError) => { // @ts-ignore const errorfile = decodeURI(error?.config?.url?.split('?')[0].split('/').at(-1)); + setProcessedCount((prev) => { + if (prev == 2) { + return 1; + } + return prev + 1; + }); setFilesData((prevfiles) => { return prevfiles.map((curfile) => { if (curfile.name == errorfile) { @@ -651,7 +684,15 @@ const FileTable = forwardRef((props, ref) => { userCredentials.userName, userCredentials.password, userCredentials.database, - updateStatusForLargeFiles + updateStatusForLargeFiles, + () => { + setProcessedCount((prev) => { + if (prev == 2) { + return 1; + } + return prev + 1; + }); + } ); } } @@ -678,6 +719,30 @@ const FileTable = forwardRef((props, ref) => { } }, [connectionStatus, userCredentials]); + useEffect(() => { + if (connectionStatus && filesData.length && onlyfortheFirstRender) { + const processingFilesCount = filesData.filter((f) => f.status === 'Processing').length; + if (processingFilesCount) { + if (processingFilesCount === 1) { + setProcessedCount(1); + } + setalertDetails({ + showAlert: true, + alertType: 'info', + alertMessage: `Files are in processing please wait till previous batch completes`, + }); + } else { + const waitingQueue: CustomFile[] = JSON.parse( + localStorage.getItem('waitingQueue') ?? JSON.stringify({ queue: [] }) + ).queue; + if (waitingQueue.length) { + props.handleGenerateGraph(); + } + } + onlyfortheFirstRender = false; + } + }, [connectionStatus, filesData.length]); + const cancelHandler = async (fileName: string, id: string, fileSource: string) => { setFilesData((prevfiles) => prevfiles.map((curfile) => { @@ -764,6 +829,13 @@ const FileTable = forwardRef((props, ref) => { return curfile; }) ); + setProcessedCount((prev) => { + if (prev == 2) { + return 1; + } + return prev + 1; + }); + queue.remove(fileName); } }; diff --git a/frontend/src/components/Popups/LargeFilePopUp/ConfirmationDialog.tsx b/frontend/src/components/Popups/LargeFilePopUp/ConfirmationDialog.tsx index 9eab949d1..c35ded9f1 100644 --- a/frontend/src/components/Popups/LargeFilePopUp/ConfirmationDialog.tsx +++ b/frontend/src/components/Popups/LargeFilePopUp/ConfirmationDialog.tsx @@ -15,7 +15,7 @@ export default function ConfirmationDialog({ open: boolean; onClose: () => void; loading: boolean; - extractHandler: (allowLargeFiles: boolean, selectedFilesFromAllfiles: CustomFile[]) => void; + extractHandler: (selectedFilesFromAllfiles: CustomFile[]) => void; }) { const { setSelectedRows, filesData, setRowSelection, selectedRows } = useFileContext(); const [checked, setChecked] = useState([...largeFiles.map((f) => f.id)]); @@ -73,7 +73,6 @@ export default function ConfirmationDialog({ onClose={() => { setChecked([]); onClose(); - extractHandler(false, []); }} > @@ -87,7 +86,7 @@ export default function ConfirmationDialog({