|
| 1 | +# Extracted from https://github.com/pfmoore/pkg_metadata |
| 2 | + |
| 3 | +from email.header import Header, decode_header, make_header |
| 4 | +from email.message import Message |
| 5 | +from typing import Any, Dict, List, Union |
| 6 | + |
| 7 | +METADATA_FIELDS = [ |
| 8 | + # Name, Multiple-Use |
| 9 | + ("Metadata-Version", False), |
| 10 | + ("Name", False), |
| 11 | + ("Version", False), |
| 12 | + ("Dynamic", True), |
| 13 | + ("Platform", True), |
| 14 | + ("Supported-Platform", True), |
| 15 | + ("Summary", False), |
| 16 | + ("Description", False), |
| 17 | + ("Description-Content-Type", False), |
| 18 | + ("Keywords", False), |
| 19 | + ("Home-page", False), |
| 20 | + ("Download-URL", False), |
| 21 | + ("Author", False), |
| 22 | + ("Author-email", False), |
| 23 | + ("Maintainer", False), |
| 24 | + ("Maintainer-email", False), |
| 25 | + ("License", False), |
| 26 | + ("Classifier", True), |
| 27 | + ("Requires-Dist", True), |
| 28 | + ("Requires-Python", False), |
| 29 | + ("Requires-External", True), |
| 30 | + ("Project-URL", True), |
| 31 | + ("Provides-Extra", True), |
| 32 | + ("Provides-Dist", True), |
| 33 | + ("Obsoletes-Dist", True), |
| 34 | +] |
| 35 | + |
| 36 | + |
| 37 | +def json_name(field: str) -> str: |
| 38 | + return field.lower().replace("-", "_") |
| 39 | + |
| 40 | + |
| 41 | +def msg_to_json(msg: Message) -> Dict[str, Any]: |
| 42 | + """Convert a Message object into a JSON-compatible dictionary.""" |
| 43 | + |
| 44 | + def sanitise_header(h: Union[Header, str]) -> str: |
| 45 | + if isinstance(h, Header): |
| 46 | + chunks = [] |
| 47 | + for bytes, encoding in decode_header(h): |
| 48 | + if encoding == "unknown-8bit": |
| 49 | + try: |
| 50 | + # See if UTF-8 works |
| 51 | + bytes.decode("utf-8") |
| 52 | + encoding = "utf-8" |
| 53 | + except UnicodeDecodeError: |
| 54 | + # If not, latin1 at least won't fail |
| 55 | + encoding = "latin1" |
| 56 | + chunks.append((bytes, encoding)) |
| 57 | + return str(make_header(chunks)) |
| 58 | + return str(h) |
| 59 | + |
| 60 | + result = {} |
| 61 | + for field, multi in METADATA_FIELDS: |
| 62 | + if field not in msg: |
| 63 | + continue |
| 64 | + key = json_name(field) |
| 65 | + if multi: |
| 66 | + value: Union[str, List[str]] = [ |
| 67 | + sanitise_header(v) for v in msg.get_all(field) |
| 68 | + ] |
| 69 | + else: |
| 70 | + value = sanitise_header(msg.get(field)) |
| 71 | + if key == "keywords": |
| 72 | + # Accept both comma-separated and space-separated |
| 73 | + # forms, for better compatibility with old data. |
| 74 | + if "," in value: |
| 75 | + value = [v.strip() for v in value.split(",")] |
| 76 | + else: |
| 77 | + value = value.split() |
| 78 | + result[key] = value |
| 79 | + |
| 80 | + payload = msg.get_payload() |
| 81 | + if payload: |
| 82 | + result["description"] = payload |
| 83 | + |
| 84 | + return result |
0 commit comments