import mimetypes from typing import BinaryIO, Union import requests from requests_toolbelt import MultipartEncoder from ..models.message import * class MessageService: def __init__(self, client): self.client = client def send_text(self, instance_id: str, message: TextMessage, instance_token: str): # Preparar os dados como JSON data = {"number": message.number, "text": message.text} if hasattr(message, "delay") and message.delay is not None: data["delay"] = message.delay # Usar o método post do cliente que já trata JSON corretamente return self.client.post( f"message/sendText/{instance_id}", data=data, instance_token=instance_token ) def send_media( self, instance_id: str, message: MediaMessage, instance_token: str, file: Union[BinaryIO, str] = None, ): # Preparar os dados do formulário fields = { "number": (None, message.number, "text/plain"), "mediatype": (None, message.mediatype, "text/plain"), "mimetype": (None, message.mimetype, "text/plain"), "caption": (None, message.caption, "text/plain"), "fileName": (None, message.fileName, "text/plain"), } # Adicionar delay apenas se existir if hasattr(message, "delay") and message.delay is not None: fields["delay"] = (None, str(message.delay), "text/plain; type=number") # Adicionar o arquivo se fornecido if file: if isinstance(file, str): mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream" fields["file"] = ("file", open(file, "rb"), mime_type) else: fields["file"] = ("file", file, "application/octet-stream") # Criar o multipart encoder multipart = MultipartEncoder(fields=fields) # Preparar os headers headers = self.client._get_headers(instance_token) headers["Content-Type"] = multipart.content_type # Fazer a requisição diretamente url = f"{self.client.base_url}/message/sendMedia/{instance_id}" response = requests.post(url, headers=headers, data=multipart) return response.json() def send_ptv( self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None, ): fields = {} # Adiciona todos os campos do message como text/plain for key, value in message.items(): if key == "delay" and value is not None: fields[key] = (None, str(value), "text/plain; type=number") else: fields[key] = (None, str(value), "text/plain") if file: if isinstance(file, str): mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream" fields["file"] = ("file", open(file, "rb"), mime_type) else: fields["file"] = ("file", file, "application/octet-stream") multipart = MultipartEncoder(fields=fields) headers = self.client._get_headers(instance_token) headers["Content-Type"] = multipart.content_type url = f"{self.client.base_url}/message/sendPtv/{instance_id}" response = requests.post(url, headers=headers, data=multipart) return response.json() def send_whatsapp_audio( self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None, ): fields = {} # Adiciona todos os campos do message como text/plain for key, value in message.items(): if key == "delay" and value is not None: fields[key] = (None, str(value), "text/plain; type=number") else: fields[key] = (None, str(value), "text/plain") if file: if isinstance(file, str): mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream" fields["file"] = ("file", open(file, "rb"), mime_type) else: fields["file"] = ("file", file, "application/octet-stream") multipart = MultipartEncoder(fields=fields) headers = self.client._get_headers(instance_token) headers["Content-Type"] = multipart.content_type url = f"{self.client.base_url}/message/sendWhatsAppAudio/{instance_id}" response = requests.post(url, headers=headers, data=multipart) return response.json() def send_status(self, instance_id: str, message: StatusMessage, instance_token: str): return self.client.post( f"message/sendStatus/{instance_id}", data=message.__dict__, instance_token=instance_token, ) def send_sticker( self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None, ): fields = {} # Adiciona todos os campos do message como text/plain for key, value in message.items(): if key == "delay" and value is not None: fields[key] = (None, str(value), "text/plain; type=number") else: fields[key] = (None, str(value), "text/plain") if file: if isinstance(file, str): mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream" fields["file"] = ("file", open(file, "rb"), mime_type) else: fields["file"] = ("file", file, "application/octet-stream") multipart = MultipartEncoder(fields=fields) headers = self.client._get_headers(instance_token) headers["Content-Type"] = multipart.content_type url = f"{self.client.base_url}/message/sendSticker/{instance_id}" response = requests.post(url, headers=headers, data=multipart) return response.json() def send_location(self, instance_id: str, message: LocationMessage, instance_token: str): data = message.__dict__.copy() if "delay" in data and data["delay"] is not None: data["delay"] = int(data["delay"]) return self.client.post( f"message/sendLocation/{instance_id}", data=data, instance_token=instance_token ) def send_contact(self, instance_id: str, message: ContactMessage, instance_token: str): return self.client.post( f"message/sendContact/{instance_id}", data=message.__dict__, instance_token=instance_token, ) def send_reaction(self, instance_id: str, message: ReactionMessage, instance_token: str): return self.client.post( f"message/sendReaction/{instance_id}", data=message.__dict__, instance_token=instance_token, ) def send_poll(self, instance_id: str, message: PollMessage, instance_token: str): data = message.__dict__.copy() if "delay" in data and data["delay"] is not None: data["delay"] = int(data["delay"]) return self.client.post( f"message/sendPoll/{instance_id}", data=data, instance_token=instance_token ) def send_list(self, instance_id: str, message: ListMessage, instance_token: str): data = message.__dict__.copy() if "delay" in data and data["delay"] is not None: data["delay"] = int(data["delay"]) return self.client.post( f"message/sendList/{instance_id}", data=data, instance_token=instance_token ) def send_buttons(self, instance_id: str, message: ButtonMessage, instance_token: str): data = message.__dict__.copy() if "delay" in data and data["delay"] is not None: data["delay"] = int(data["delay"]) return self.client.post( f"message/sendButtons/{instance_id}", data=data, instance_token=instance_token )