import os import asyncio import time import logging import requests from dotenv import load_dotenv from nio import AsyncClient, LoginResponse, RoomMessageText import markdown2 import re import mimetypes import io from urllib.parse import urlparse import functools import coloredlogs load_dotenv() HOMESERVER = os.getenv("MATRIX_HOMESERVER") USER = os.getenv("MATRIX_USER") PASSWORD = os.getenv("MATRIX_PASSWORD") ACCESS_TOKEN = os.getenv("MATRIX_ACCESS_TOKEN") ROOM_ID = os.getenv("MATRIX_ROOM_ID") STORE_PATH = os.getenv("MATRIX_STORE_PATH", ".matrixstore") FLASK_INCOMING = os.getenv("FLASK_ENDPOINT_INCOMING", "http://localhost:5000/api/matrix/incoming") FLASK_SHARED_SECRET = os.getenv("FLASK_SHARED_SECRET", "secret_flask_matrix") logging.basicConfig(level=logging.INFO) log = logging.getLogger("matrix-bot") # install coloredlogs for nicer colored output in terminals try: coloredlogs.install(level=logging.INFO, logger=log, fmt='%(asctime)s %(levelname)s %(message)s') except Exception: # coloredlogs optional; fallback silently pass class MatrixBot: def __init__(self): self.client = AsyncClient(HOMESERVER, USER, store_path=STORE_PATH) if ACCESS_TOKEN: self.client.access_token = ACCESS_TOKEN self.client.user_id = USER self.client.add_event_callback(self.message_callback, RoomMessageText) # timestamp (ms) when the bot considers "connected"; events older than this are ignored self.start_ts = 0 async def login(self): if not ACCESS_TOKEN: resp = await self.client.login(PASSWORD, device_name="LiaBot") if isinstance(resp, LoginResponse): log.info("Connecté en tant que %s", resp.user_id) # mark connection time to discard past history (epoch ms) self.start_ts = int(time.time() * 1000) else: raise RuntimeError(f"Login failed: {resp}") else: # if using ACCESS_TOKEN we still consider now as the connection time self.start_ts = int(time.time() * 1000) async def message_callback(self, room, event): # ignore historical events that arrived before the bot connected try: evt_ts = None for attr in ("server_timestamp", "server_ts", "origin_server_ts", "server_time", "ts"): if hasattr(event, attr): evt_ts = getattr(event, attr) break # Some nio events expose integer ms, some expose nested dicts; try to coerce if evt_ts is None and hasattr(event, "__dict__"): evt_ts = getattr(event, "__dict__", {}).get("server_timestamp") if evt_ts and self.start_ts and int(evt_ts) < int(self.start_ts): log.debug("Ignoring historical event (ts %s < start %s)", evt_ts, self.start_ts) return except Exception: pass log.error("received") if event.sender != "@magali:conduit.blackdrop.fr": return body = (event.body or "").strip() # Commande admin reset if body.startswith("/reset"): parts = body.split() if len(parts) == 2: target = parts[1] else: target = event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_RESET_PLAYER", "http://localhost:5000/api/admin/reset_player"), json={ "secret": FLASK_SHARED_SECRET, "matrix_id": target }, timeout=15 ) if resp.status_code == 200: try: body = resp.json() reply = body.get("reply") if reply: await self.send_message(room.room_id, reply) else: await self.send_message(room.room_id, f"Reset de {target} effectué. Bon nouveau départ !") except Exception: await self.send_message(room.room_id, f"Reset de {target} effectué. Bon nouveau départ ! (Exception)") else: await self.send_message(room.room_id, f"Reset impossible ({resp.status_code}).") except Exception as e: await self.send_message(room.room_id, f"Erreur reset: {e}") return # Commande admin: regenere le texte de success pour debug if body.startswith("/gen_success"): parts = body.split() target = parts[1] if len(parts) == 2 else event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_GEN_SUCCESS", "http://localhost:5000/api/admin/gen_success"), json={"secret": FLASK_SHARED_SECRET, "matrix_id": target}, timeout=15, ) if resp.status_code == 200: try: reply = resp.json().get("reply") or resp.text except Exception: reply = resp.text else: reply = f"gen_success failed ({resp.status_code})" except Exception as e: reply = f"Erreur gen_success: {e}" await self.send_message(room.room_id, reply) return # Commande admin: regenere la question pour debug if body.startswith("/gen_question"): parts = body.split() target = parts[1] if len(parts) == 2 else event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_GEN_QUESTION", "http://localhost:5000/api/admin/gen_question"), json={"secret": FLASK_SHARED_SECRET, "matrix_id": target}, timeout=15, ) if resp.status_code == 200: try: reply = resp.json().get("reply") or resp.text except Exception: reply = resp.text else: reply = f"gen_question failed ({resp.status_code})" except Exception as e: reply = f"Erreur gen_question: {e}" await self.send_message(room.room_id, reply) return # Commande admin: regenere l'intro pour debug if body.startswith("/gen_intro"): parts = body.split() target = parts[1] if len(parts) == 2 else event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_GEN_INTRO", "http://localhost:5000/api/admin/gen_intro"), json={"secret": FLASK_SHARED_SECRET, "matrix_id": target}, timeout=15, ) if resp.status_code == 200: try: reply = resp.json().get("reply") or resp.text except Exception: reply = resp.text else: reply = f"gen_intro failed ({resp.status_code})" except Exception as e: reply = f"Erreur gen_intro: {e}" await self.send_message(room.room_id, reply) return # Commande admin: regenere un hint pour debug if body.startswith("/gen_hint"): parts = body.split() target = parts[1] if len(parts) == 2 else event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_GEN_HINT", "http://localhost:5000/api/admin/gen_hint"), json={"secret": FLASK_SHARED_SECRET, "matrix_id": target}, timeout=15, ) if resp.status_code == 200: try: reply = resp.json().get("reply") or resp.text except Exception: reply = resp.text else: reply = f"gen_hint failed ({resp.status_code})" except Exception as e: reply = f"Erreur gen_hint: {e}" await self.send_message(room.room_id, reply) return # Commande admin: regenere le texte de success pour debug if body.startswith("/gen_success"): parts = body.split() if len(parts) == 2: target = parts[1] else: target = event.sender try: resp = requests.post( os.getenv("FLASK_ENDPOINT_GEN_SUCCESS", "http://localhost:5000/api/admin/gen_success"), json={"secret": FLASK_SHARED_SECRET, "matrix_id": target}, timeout=15, ) if resp.status_code == 200: try: body_json = resp.json() reply = body_json.get("reply") or body_json.get("success") or "(Pas de réponse)" except Exception: reply = resp.text or "(Réponse invalide du serveur)" else: reply = f"gen_success impossible ({resp.status_code})." except Exception as e: reply = f"Erreur gen_success: {e}" await self.send_message(room.room_id, reply) return log.info(f"Message de {event.sender}: {event.body}") payload = { "sender": event.sender, "body": event.body, "room_id": room.room_id, "secret": FLASK_SHARED_SECRET } try: resp = requests.post(FLASK_INCOMING, json=payload, timeout=15) resp.raise_for_status() body = resp.json() reply = body.get("reply", "(Pas de réponse)") followup = body.get("followup") except Exception as e: log.error(f"Erreur appel Flask: {e}") reply = "(Erreur Lia)" await self.send_message(room.room_id, reply) if followup: # small pause then send followup as a separate message await asyncio.sleep(1.0) await self.send_message(room.room_id, followup) async def send_message(self, room_id, text): # Extract bracketed URLs like [https://...] to send as media later matches = re.findall(r"\[(https?://[^\]\s]+)\]", text) # Remove bracketed URLs from the chat text cleaned_text = re.sub(r"\[(https?://[^\]\s]+)\]", "", text).strip() if cleaned_text == "": # If nothing left, keep original text as a fallback (so user still sees context) cleaned_text = text # Convert cleaned markdown to HTML try: text_html = markdown2.markdown(cleaned_text) except Exception: text_html = (cleaned_text.replace("&", "&").replace("<", "<").replace(">", ">").replace("\n", "
")) await self.client.room_send( room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": cleaned_text, "format": "org.matrix.custom.html", "formatted_body": text_html} ) # Send each extracted URL as a separate image/media event for url in matches: if not url: continue # small pause to avoid hammering the homeserver await asyncio.sleep(0.3) await self.send_media(room_id, url) async def send_media(self, room_id, url): """Download the media at `url`, upload it to the Matrix media repo and send an m.image event. Supports Google Drive share links by converting them to a direct download URL. If download or upload fails, falls back to sending an m.room.message with the external URL. """ try: # Convert Google Drive share URL to direct download URL if applicable drive_match = re.search(r"/d/([a-zA-Z0-9_-]+)", url) if drive_match: file_id = drive_match.group(1) download_url = f"https://drive.google.com/uc?export=download&id={file_id}" dl_url = download_url filename = f"{file_id}" else: dl_url = url parsed = urlparse(url) filename = os.path.basename(parsed.path) or parsed.netloc # Download content in a thread to avoid blocking the event loop get_fn = functools.partial(requests.get, dl_url, timeout=30, allow_redirects=True) resp = await asyncio.to_thread(get_fn) if resp.status_code != 200: raise RuntimeError(f"Download failed: {resp.status_code}") data = resp.content mimetype = resp.headers.get("content-type") or mimetypes.guess_type(filename)[0] or "application/octet-stream" # Try upload with small retry in case of transient error upload_resp = None mxc = None for attempt in range(2): try: upload_resp = await self.client.upload(io.BytesIO(data), content_type=mimetype, filename=filename) except Exception as e: log.warning(f"upload attempt {attempt+1} failed: {e}") upload_resp = e # Try various ways to extract content URI try: if hasattr(upload_resp, "content_uri"): mxc = upload_resp.content_uri elif isinstance(upload_resp, dict): mxc = upload_resp.get("content_uri") or upload_resp.get("content_uri") elif hasattr(upload_resp, "response") and isinstance(upload_resp.response, dict): mxc = upload_resp.response.get("content_uri") or upload_resp.response.get("content_uri") else: # fallback: try to stringify and look for mxc:// pattern s = repr(upload_resp) m = re.search(r"(mxc://[A-Za-z0-9/._=-]+)", s) if m: mxc = m.group(1) except Exception: mxc = None if mxc: break # small backoff await asyncio.sleep(0.2) # If still no mxc, log debug info and raise if not mxc: log.warning("upload_resp (for debugging): %r", upload_resp) raise RuntimeError("No mxc returned from upload") info = {"mimetype": mimetype, "size": len(data)} content = { "msgtype": "m.image", "body": filename, "url": mxc, "info": info, } await self.client.room_send(room_id, message_type="m.room.message", content=content) return except Exception as e: log.warning(f"Upload/send media failed for {url}: {e}; falling back to external link") # Fallback: send external link as a message so clients can still access it try: content = {"msgtype": "m.text", "body": url} await self.client.room_send(room_id, message_type="m.room.message", content=content) except Exception as e2: log.error(f"Failed fallback send for {url}: {e2}") async def run(self): await self.login() await self.client.sync_forever(timeout=30000) if __name__ == "__main__": bot = MatrixBot() asyncio.run(bot.run())