mirror of
https://github.com/EvolutionAPI/evolution-client-python.git
synced 2025-12-12 19:39:33 -06:00
231 lines
8.1 KiB
Python
231 lines
8.1 KiB
Python
import mimetypes
|
|
from typing import BinaryIO, Union
|
|
|
|
import requests
|
|
from requests_toolbelt import MultipartEncoder
|
|
|
|
from ..models.message import (
|
|
Button,
|
|
ButtonMessage,
|
|
Contact,
|
|
ContactMessage,
|
|
ListMessage,
|
|
ListRow,
|
|
ListSection,
|
|
LocationMessage,
|
|
MediaMessage,
|
|
PollMessage,
|
|
QuotedMessage,
|
|
ReactionMessage,
|
|
StatusMessage,
|
|
TextMessage,
|
|
)
|
|
|
|
|
|
class MessageService:
|
|
def __init__(self, client):
|
|
self.client = client
|
|
|
|
def send_text(self, instance_id: str, message: TextMessage, instance_token: str):
|
|
# Preparar os dados como JSON
|
|
data = {"number": message.number, "text": message.text}
|
|
|
|
if hasattr(message, "delay") and message.delay is not None:
|
|
data["delay"] = message.delay
|
|
|
|
# Usar o método post do cliente que já trata JSON corretamente
|
|
return self.client.post(
|
|
f"message/sendText/{instance_id}", data=data, instance_token=instance_token
|
|
)
|
|
|
|
def send_media(
|
|
self,
|
|
instance_id: str,
|
|
message: MediaMessage,
|
|
instance_token: str,
|
|
file: Union[BinaryIO, str] = None,
|
|
):
|
|
# Preparar os dados do formulário
|
|
fields = {
|
|
"number": (None, message.number, "text/plain"),
|
|
"mediatype": (None, message.mediatype, "text/plain"),
|
|
"mimetype": (None, message.mimetype, "text/plain"),
|
|
"caption": (None, message.caption, "text/plain"),
|
|
"fileName": (None, message.fileName, "text/plain"),
|
|
}
|
|
|
|
# Adicionar delay apenas se existir
|
|
if hasattr(message, "delay") and message.delay is not None:
|
|
fields["delay"] = (None, str(message.delay), "text/plain; type=number")
|
|
|
|
# Adicionar o arquivo se fornecido
|
|
if file:
|
|
if isinstance(file, str):
|
|
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
|
|
fields["file"] = ("file", open(file, "rb"), mime_type)
|
|
else:
|
|
fields["file"] = ("file", file, "application/octet-stream")
|
|
|
|
# Criar o multipart encoder
|
|
multipart = MultipartEncoder(fields=fields)
|
|
|
|
# Preparar os headers
|
|
headers = self.client._get_headers(instance_token)
|
|
headers["Content-Type"] = multipart.content_type
|
|
|
|
# Fazer a requisição diretamente
|
|
url = f"{self.client.base_url}/message/sendMedia/{instance_id}"
|
|
response = requests.post(url, headers=headers, data=multipart)
|
|
|
|
return response.json()
|
|
|
|
def send_ptv(
|
|
self,
|
|
instance_id: str,
|
|
message: dict,
|
|
instance_token: str,
|
|
file: Union[BinaryIO, str] = None,
|
|
):
|
|
fields = {}
|
|
|
|
# Adiciona todos os campos do message como text/plain
|
|
for key, value in message.items():
|
|
if key == "delay" and value is not None:
|
|
fields[key] = (None, str(value), "text/plain; type=number")
|
|
else:
|
|
fields[key] = (None, str(value), "text/plain")
|
|
|
|
if file:
|
|
if isinstance(file, str):
|
|
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
|
|
fields["file"] = ("file", open(file, "rb"), mime_type)
|
|
else:
|
|
fields["file"] = ("file", file, "application/octet-stream")
|
|
|
|
multipart = MultipartEncoder(fields=fields)
|
|
headers = self.client._get_headers(instance_token)
|
|
headers["Content-Type"] = multipart.content_type
|
|
|
|
url = f"{self.client.base_url}/message/sendPtv/{instance_id}"
|
|
response = requests.post(url, headers=headers, data=multipart)
|
|
return response.json()
|
|
|
|
def send_whatsapp_audio(
|
|
self,
|
|
instance_id: str,
|
|
message: dict,
|
|
instance_token: str,
|
|
file: Union[BinaryIO, str] = None,
|
|
):
|
|
fields = {}
|
|
|
|
# Adiciona todos os campos do message como text/plain
|
|
for key, value in message.items():
|
|
if key == "delay" and value is not None:
|
|
fields[key] = (None, str(value), "text/plain; type=number")
|
|
else:
|
|
fields[key] = (None, str(value), "text/plain")
|
|
|
|
if file:
|
|
if isinstance(file, str):
|
|
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
|
|
fields["file"] = ("file", open(file, "rb"), mime_type)
|
|
else:
|
|
fields["file"] = ("file", file, "application/octet-stream")
|
|
|
|
multipart = MultipartEncoder(fields=fields)
|
|
headers = self.client._get_headers(instance_token)
|
|
headers["Content-Type"] = multipart.content_type
|
|
|
|
url = f"{self.client.base_url}/message/sendWhatsAppAudio/{instance_id}"
|
|
response = requests.post(url, headers=headers, data=multipart)
|
|
return response.json()
|
|
|
|
def send_status(self, instance_id: str, message: StatusMessage, instance_token: str):
|
|
return self.client.post(
|
|
f"message/sendStatus/{instance_id}",
|
|
data=message.__dict__,
|
|
instance_token=instance_token,
|
|
)
|
|
|
|
def send_sticker(
|
|
self,
|
|
instance_id: str,
|
|
message: dict,
|
|
instance_token: str,
|
|
file: Union[BinaryIO, str] = None,
|
|
):
|
|
fields = {}
|
|
|
|
# Adiciona todos os campos do message como text/plain
|
|
for key, value in message.items():
|
|
if key == "delay" and value is not None:
|
|
fields[key] = (None, str(value), "text/plain; type=number")
|
|
else:
|
|
fields[key] = (None, str(value), "text/plain")
|
|
|
|
if file:
|
|
if isinstance(file, str):
|
|
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
|
|
fields["file"] = ("file", open(file, "rb"), mime_type)
|
|
else:
|
|
fields["file"] = ("file", file, "application/octet-stream")
|
|
|
|
multipart = MultipartEncoder(fields=fields)
|
|
headers = self.client._get_headers(instance_token)
|
|
headers["Content-Type"] = multipart.content_type
|
|
|
|
url = f"{self.client.base_url}/message/sendSticker/{instance_id}"
|
|
response = requests.post(url, headers=headers, data=multipart)
|
|
return response.json()
|
|
|
|
def send_location(self, instance_id: str, message: LocationMessage, instance_token: str):
|
|
data = message.__dict__.copy()
|
|
if "delay" in data and data["delay"] is not None:
|
|
data["delay"] = int(data["delay"])
|
|
|
|
return self.client.post(
|
|
f"message/sendLocation/{instance_id}", data=data, instance_token=instance_token
|
|
)
|
|
|
|
def send_contact(self, instance_id: str, message: ContactMessage, instance_token: str):
|
|
return self.client.post(
|
|
f"message/sendContact/{instance_id}",
|
|
data=message.__dict__,
|
|
instance_token=instance_token,
|
|
)
|
|
|
|
def send_reaction(self, instance_id: str, message: ReactionMessage, instance_token: str):
|
|
return self.client.post(
|
|
f"message/sendReaction/{instance_id}",
|
|
data=message.__dict__,
|
|
instance_token=instance_token,
|
|
)
|
|
|
|
def send_poll(self, instance_id: str, message: PollMessage, instance_token: str):
|
|
data = message.__dict__.copy()
|
|
if "delay" in data and data["delay"] is not None:
|
|
data["delay"] = int(data["delay"])
|
|
|
|
return self.client.post(
|
|
f"message/sendPoll/{instance_id}", data=data, instance_token=instance_token
|
|
)
|
|
|
|
def send_list(self, instance_id: str, message: ListMessage, instance_token: str):
|
|
data = message.__dict__.copy()
|
|
if "delay" in data and data["delay"] is not None:
|
|
data["delay"] = int(data["delay"])
|
|
|
|
return self.client.post(
|
|
f"message/sendList/{instance_id}", data=data, instance_token=instance_token
|
|
)
|
|
|
|
def send_buttons(self, instance_id: str, message: ButtonMessage, instance_token: str):
|
|
data = message.__dict__.copy()
|
|
if "delay" in data and data["delay"] is not None:
|
|
data["delay"] = int(data["delay"])
|
|
|
|
return self.client.post(
|
|
f"message/sendButtons/{instance_id}", data=data, instance_token=instance_token
|
|
)
|