|
16 | 16 | # [START documentai_batch_process_document]
|
17 | 17 | import re
|
18 | 18 |
|
| 19 | +from google.api_core.client_options import ClientOptions |
19 | 20 | from google.cloud import documentai_v1 as documentai
|
20 | 21 | from google.cloud import storage
|
21 | 22 |
|
22 | 23 | # TODO(developer): Uncomment these variables before running the sample.
|
23 |
| -# project_id= 'YOUR_PROJECT_ID' |
24 |
| -# location = 'YOUR_PROJECT_LOCATION' # Format is 'us' or 'eu' |
| 24 | +# project_id = 'YOUR_PROJECT_ID' |
| 25 | +# location = 'YOUR_PROCESSOR_LOCATION' # Format is 'us' or 'eu' |
25 | 26 | # processor_id = 'YOUR_PROCESSOR_ID' # Create processor in Cloud Console
|
26 |
| -# gcs_input_uri = "YOUR_INPUT_URI" |
27 |
| -# gcs_output_uri = "YOUR_OUTPUT_BUCKET_URI" |
28 |
| -# gcs_output_uri_prefix = "YOUR_OUTPUT_URI_PREFIX" |
| 27 | +# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf |
| 28 | +# input_mime_type = "application/pdf" |
| 29 | +# gcs_output_bucket = "YOUR_OUTPUT_BUCKET_NAME" # Format: gs://bucket |
| 30 | +# gcs_output_uri_prefix = "YOUR_OUTPUT_URI_PREFIX" # Format: directory/subdirectory/ |
29 | 31 |
|
30 | 32 |
|
31 | 33 | def batch_process_documents(
|
32 |
| - project_id, |
33 |
| - location, |
34 |
| - processor_id, |
35 |
| - gcs_input_uri, |
36 |
| - gcs_output_uri, |
37 |
| - gcs_output_uri_prefix, |
| 34 | + project_id: str, |
| 35 | + location: str, |
| 36 | + processor_id: str, |
| 37 | + gcs_input_uri: str, |
| 38 | + input_mime_type: str, |
| 39 | + gcs_output_bucket: str, |
| 40 | + gcs_output_uri_prefix: str, |
38 | 41 | timeout: int = 300,
|
39 | 42 | ):
|
40 | 43 |
|
41 | 44 | # You must set the api_endpoint if you use a location other than 'us', e.g.:
|
42 |
| - opts = {} |
43 |
| - if location == "eu": |
44 |
| - opts = {"api_endpoint": "eu-documentai.googleapis.com"} |
| 45 | + opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") |
45 | 46 |
|
46 | 47 | client = documentai.DocumentProcessorServiceClient(client_options=opts)
|
47 | 48 |
|
48 |
| - destination_uri = f"{gcs_output_uri}/{gcs_output_uri_prefix}/" |
49 |
| - |
50 |
| - gcs_documents = documentai.GcsDocuments( |
51 |
| - documents=[{"gcs_uri": gcs_input_uri, "mime_type": "application/pdf"}] |
| 49 | + gcs_document = documentai.GcsDocument( |
| 50 | + gcs_uri=gcs_input_uri, mime_type=input_mime_type |
52 | 51 | )
|
53 | 52 |
|
54 |
| - # 'mime_type' can be 'application/pdf', 'image/tiff', |
55 |
| - # and 'image/gif', or 'application/json' |
| 53 | + # Load GCS Input URI into a List of document files |
| 54 | + gcs_documents = documentai.GcsDocuments(documents=[gcs_document]) |
56 | 55 | input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
|
57 | 56 |
|
58 |
| - # Where to write results |
59 |
| - output_config = documentai.DocumentOutputConfig( |
60 |
| - gcs_output_config={"gcs_uri": destination_uri} |
| 57 | + # NOTE: Alternatively, specify a GCS URI Prefix to process an entire directory |
| 58 | + # |
| 59 | + # gcs_input_uri = "gs://bucket/directory/" |
| 60 | + # gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri) |
| 61 | + # input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix) |
| 62 | + # |
| 63 | + |
| 64 | + # Cloud Storage URI for the Output Directory |
| 65 | + destination_uri = f"{gcs_output_bucket}/{gcs_output_uri_prefix}/" |
| 66 | + |
| 67 | + gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig( |
| 68 | + gcs_uri=destination_uri |
61 | 69 | )
|
62 | 70 |
|
63 |
| - # Location can be 'us' or 'eu' |
64 |
| - name = f"projects/{project_id}/locations/{location}/processors/{processor_id}" |
65 |
| - request = documentai.types.document_processor_service.BatchProcessRequest( |
| 71 | + # Where to write results |
| 72 | + output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config) |
| 73 | + |
| 74 | + # The full resource name of the processor, e.g.: |
| 75 | + # projects/project_id/locations/location/processor/processor_id |
| 76 | + # You must create new processors in the Cloud Console first |
| 77 | + name = client.processor_path(project_id, location, processor_id) |
| 78 | + |
| 79 | + request = documentai.BatchProcessRequest( |
66 | 80 | name=name,
|
67 | 81 | input_documents=input_config,
|
68 | 82 | document_output_config=output_config,
|
69 | 83 | )
|
70 | 84 |
|
| 85 | + # BatchProcess returns a Long Running Operation (LRO) |
71 | 86 | operation = client.batch_process_documents(request)
|
72 | 87 |
|
73 |
| - # Wait for the operation to finish |
| 88 | + # Continually polls the operation until it is complete. |
| 89 | + # This could take some time for larger files |
| 90 | + # Format: projects/PROJECT_NUMBER/locations/LOCATION/operations/OPERATION_ID |
| 91 | + print(f"Waiting for operation {operation.operation.name} to complete...") |
74 | 92 | operation.result(timeout=timeout)
|
75 | 93 |
|
76 |
| - # Results are written to GCS. Use a regex to find |
77 |
| - # output files |
78 |
| - match = re.match(r"gs://([^/]+)/(.+)", destination_uri) |
79 |
| - output_bucket = match.group(1) |
80 |
| - prefix = match.group(2) |
| 94 | + # NOTE: Can also use callbacks for asynchronous processing |
| 95 | + # |
| 96 | + # def my_callback(future): |
| 97 | + # result = future.result() |
| 98 | + # |
| 99 | + # operation.add_done_callback(my_callback) |
81 | 100 |
|
82 |
| - storage_client = storage.Client() |
83 |
| - bucket = storage_client.get_bucket(output_bucket) |
84 |
| - blob_list = list(bucket.list_blobs(prefix=prefix)) |
85 |
| - print("Output files:") |
| 101 | + # Once the operation is complete, |
| 102 | + # get output document information from operation metadata |
| 103 | + metadata = documentai.BatchProcessMetadata(operation.metadata) |
86 | 104 |
|
87 |
| - for i, blob in enumerate(blob_list): |
88 |
| - # If JSON file, download the contents of this blob as a bytes object. |
89 |
| - if ".json" in blob.name: |
90 |
| - blob_as_bytes = blob.download_as_bytes() |
| 105 | + if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED: |
| 106 | + raise ValueError(f"Batch Process Failed: {metadata.state_message}") |
91 | 107 |
|
92 |
| - document = documentai.types.Document.from_json(blob_as_bytes) |
93 |
| - print(f"Fetched file {i + 1}") |
| 108 | + storage_client = storage.Client() |
| 109 | + |
| 110 | + print("Output files:") |
| 111 | + # One process per Input Document |
| 112 | + for process in metadata.individual_process_statuses: |
| 113 | + # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/ |
| 114 | + # The Cloud Storage API requires the bucket name and URI prefix separately |
| 115 | + matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination) |
| 116 | + if not matches: |
| 117 | + print( |
| 118 | + "Could not parse output GCS destination:", |
| 119 | + process.output_gcs_destination, |
| 120 | + ) |
| 121 | + continue |
| 122 | + |
| 123 | + output_bucket, output_prefix = matches.groups() |
| 124 | + |
| 125 | + # Get List of Document Objects from the Output Bucket |
| 126 | + output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix) |
| 127 | + |
| 128 | + # Document AI may output multiple JSON files per source file |
| 129 | + for blob in output_blobs: |
| 130 | + # Document AI should only output JSON files to GCS |
| 131 | + if ".json" not in blob.name: |
| 132 | + print( |
| 133 | + f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}" |
| 134 | + ) |
| 135 | + continue |
| 136 | + |
| 137 | + # Download JSON File as bytes object and convert to Document Object |
| 138 | + print(f"Fetching {blob.name}") |
| 139 | + document = documentai.Document.from_json( |
| 140 | + blob.download_as_bytes(), ignore_unknown_fields=True |
| 141 | + ) |
94 | 142 |
|
95 | 143 | # For a full list of Document object attributes, please reference this page:
|
96 |
| - # https://cloud.google.com/document-ai/docs/reference/rpc/google.cloud.documentai.v1beta3#document |
| 144 | + # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document |
97 | 145 |
|
98 | 146 | # Read the text recognition output from the processor
|
99 |
| - for page in document.pages: |
100 |
| - for form_field in page.form_fields: |
101 |
| - field_name = get_text(form_field.field_name, document) |
102 |
| - field_value = get_text(form_field.field_value, document) |
103 |
| - print("Extracted key value pair:") |
104 |
| - print(f"\t{field_name}, {field_value}") |
105 |
| - for paragraph in page.paragraphs: |
106 |
| - paragraph_text = get_text(paragraph.layout, document) |
107 |
| - print(f"Paragraph text:\n{paragraph_text}") |
108 |
| - else: |
109 |
| - print(f"Skipping non-supported file type {blob.name}") |
110 |
| - |
111 |
| - |
112 |
| -# Extract shards from the text field |
113 |
| -def get_text(doc_element: dict, document: dict): |
114 |
| - """ |
115 |
| - Document AI identifies form fields by their offsets |
116 |
| - in document text. This function converts offsets |
117 |
| - to text snippets. |
118 |
| - """ |
119 |
| - response = "" |
120 |
| - # If a text segment spans several lines, it will |
121 |
| - # be stored in different text segments. |
122 |
| - for segment in doc_element.text_anchor.text_segments: |
123 |
| - start_index = ( |
124 |
| - int(segment.start_index) |
125 |
| - if segment in doc_element.text_anchor.text_segments |
126 |
| - else 0 |
127 |
| - ) |
128 |
| - end_index = int(segment.end_index) |
129 |
| - response += document.text[start_index:end_index] |
130 |
| - return response |
| 147 | + print("The document contains the following text:") |
| 148 | + print(document.text) |
131 | 149 |
|
132 | 150 |
|
133 | 151 | # [END documentai_batch_process_document]
|
0 commit comments