def search(subject_fragment, body_fragment=None):
"""
Search emails by subject and optionally body. Use "*" to match any subject.
"""
EMAIL_SENDER = os.environ["MAIL_SENDER"]
EMAIL_PASSWORD = os.environ["MAIL_PASSWORD"]
IMAP_SERVER = os.environ["IMAP_SERVER"]
IMAP_PORT = os.environ["IMAP_PORT"]
mail = None
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(EMAIL_SENDER, EMAIL_PASSWORD)
mail.select('inbox')
search_criteria = []
if subject_fragment and subject_fragment != "*":
search_criteria.append(f'(SUBJECT "{subject_fragment}")')
if body_fragment:
search_criteria.append(f'(BODY "{body_fragment}")')
if not search_criteria:
gw.warning("No search criteria provided.")
return None
combined_criteria = ' '.join(search_criteria)
status, data = mail.search(None, combined_criteria)
mail_ids = data[0].split()
if not mail_ids:
gw.warning("No emails found with the specified criteria.")
return None
latest_mail_id = mail_ids[-1]
status, data = mail.fetch(latest_mail_id, '(RFC822)')
email_msg = message_from_bytes(data[0][1])
gw.info(f"Fetching email with ID {latest_mail_id}")
attachments = []
email_content = None
if email_msg.is_multipart():
for part in email_msg.walk():
content_type = part.get_content_type()
if content_type in ["text/plain", "text/html"]:
email_content = part.get_payload(decode=True).decode()
elif part.get('Content-Disposition') is not None:
file_data = part.get_payload(decode=True)
file_name = part.get_filename()
attachments.append((file_name, file_data))
elif email_msg.get_content_type() in ["text/plain", "text/html"]:
email_content = email_msg.get_payload(decode=True).decode()
if not email_content:
gw.warning("Matching email found, but unsupported content type.")
return None
return email_content, attachments
except Exception as e:
gw.error(f"Error searching email: {str(e)}")
raise
finally:
if mail:
mail.close()
mail.logout()