_json.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # Extracted from https://github.com/pfmoore/pkg_metadata
  2. from email.header import Header, decode_header, make_header
  3. from email.message import Message
  4. from typing import Any, Dict, List, Union
  5. METADATA_FIELDS = [
  6. # Name, Multiple-Use
  7. ("Metadata-Version", False),
  8. ("Name", False),
  9. ("Version", False),
  10. ("Dynamic", True),
  11. ("Platform", True),
  12. ("Supported-Platform", True),
  13. ("Summary", False),
  14. ("Description", False),
  15. ("Description-Content-Type", False),
  16. ("Keywords", False),
  17. ("Home-page", False),
  18. ("Download-URL", False),
  19. ("Author", False),
  20. ("Author-email", False),
  21. ("Maintainer", False),
  22. ("Maintainer-email", False),
  23. ("License", False),
  24. ("Classifier", True),
  25. ("Requires-Dist", True),
  26. ("Requires-Python", False),
  27. ("Requires-External", True),
  28. ("Project-URL", True),
  29. ("Provides-Extra", True),
  30. ("Provides-Dist", True),
  31. ("Obsoletes-Dist", True),
  32. ]
  33. def json_name(field: str) -> str:
  34. return field.lower().replace("-", "_")
  35. def msg_to_json(msg: Message) -> Dict[str, Any]:
  36. """Convert a Message object into a JSON-compatible dictionary."""
  37. def sanitise_header(h: Union[Header, str]) -> str:
  38. if isinstance(h, Header):
  39. chunks = []
  40. for bytes, encoding in decode_header(h):
  41. if encoding == "unknown-8bit":
  42. try:
  43. # See if UTF-8 works
  44. bytes.decode("utf-8")
  45. encoding = "utf-8"
  46. except UnicodeDecodeError:
  47. # If not, latin1 at least won't fail
  48. encoding = "latin1"
  49. chunks.append((bytes, encoding))
  50. return str(make_header(chunks))
  51. return str(h)
  52. result = {}
  53. for field, multi in METADATA_FIELDS:
  54. if field not in msg:
  55. continue
  56. key = json_name(field)
  57. if multi:
  58. value: Union[str, List[str]] = [
  59. sanitise_header(v) for v in msg.get_all(field)
  60. ]
  61. else:
  62. value = sanitise_header(msg.get(field))
  63. if key == "keywords":
  64. # Accept both comma-separated and space-separated
  65. # forms, for better compatibility with old data.
  66. if "," in value:
  67. value = [v.strip() for v in value.split(",")]
  68. else:
  69. value = value.split()
  70. result[key] = value
  71. payload = msg.get_payload()
  72. if payload:
  73. result["description"] = payload
  74. return result