style: format codebase with ruff

This commit is contained in:
Jackson Vieira
2025-12-03 13:54:30 -03:00
parent 44539123d9
commit 2421b1cf5b
21 changed files with 458 additions and 441 deletions

View File

@@ -4,202 +4,210 @@ from requests_toolbelt import MultipartEncoder
import mimetypes
import requests
class MessageService:
def __init__(self, client):
self.client = client
def send_text(self, instance_id: str, message: TextMessage, instance_token: str):
# Preparar os dados como JSON
data = {
'number': message.number,
'text': message.text
}
if hasattr(message, 'delay') and message.delay is not None:
data['delay'] = message.delay
data = {"number": message.number, "text": message.text}
if hasattr(message, "delay") and message.delay is not None:
data["delay"] = message.delay
# Usar o método post do cliente que já trata JSON corretamente
return self.client.post(
f'message/sendText/{instance_id}',
data=data,
instance_token=instance_token
f"message/sendText/{instance_id}", data=data, instance_token=instance_token
)
def send_media(self, instance_id: str, message: MediaMessage, instance_token: str, file: Union[BinaryIO, str] = None):
def send_media(
self,
instance_id: str,
message: MediaMessage,
instance_token: str,
file: Union[BinaryIO, str] = None,
):
# Preparar os dados do formulário
fields = {
'number': (None, message.number, 'text/plain'),
'mediatype': (None, message.mediatype, 'text/plain'),
'mimetype': (None, message.mimetype, 'text/plain'),
'caption': (None, message.caption, 'text/plain'),
'fileName': (None, message.fileName, 'text/plain'),
"number": (None, message.number, "text/plain"),
"mediatype": (None, message.mediatype, "text/plain"),
"mimetype": (None, message.mimetype, "text/plain"),
"caption": (None, message.caption, "text/plain"),
"fileName": (None, message.fileName, "text/plain"),
}
# Adicionar delay apenas se existir
if hasattr(message, 'delay') and message.delay is not None:
fields['delay'] = (None, str(message.delay), 'text/plain; type=number')
if hasattr(message, "delay") and message.delay is not None:
fields["delay"] = (None, str(message.delay), "text/plain; type=number")
# Adicionar o arquivo se fornecido
if file:
if isinstance(file, str):
mime_type = mimetypes.guess_type(file)[0] or 'application/octet-stream'
fields['file'] = ('file', open(file, 'rb'), mime_type)
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
fields["file"] = ("file", open(file, "rb"), mime_type)
else:
fields['file'] = ('file', file, 'application/octet-stream')
fields["file"] = ("file", file, "application/octet-stream")
# Criar o multipart encoder
multipart = MultipartEncoder(fields=fields)
# Preparar os headers
headers = self.client._get_headers(instance_token)
headers['Content-Type'] = multipart.content_type
headers["Content-Type"] = multipart.content_type
# Fazer a requisição diretamente
url = f'{self.client.base_url}/message/sendMedia/{instance_id}'
response = requests.post(
url,
headers=headers,
data=multipart
)
url = f"{self.client.base_url}/message/sendMedia/{instance_id}"
response = requests.post(url, headers=headers, data=multipart)
return response.json()
def send_ptv(self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None):
def send_ptv(
self,
instance_id: str,
message: dict,
instance_token: str,
file: Union[BinaryIO, str] = None,
):
fields = {}
# Adiciona todos os campos do message como text/plain
for key, value in message.items():
if key == 'delay' and value is not None:
fields[key] = (None, str(value), 'text/plain; type=number')
if key == "delay" and value is not None:
fields[key] = (None, str(value), "text/plain; type=number")
else:
fields[key] = (None, str(value), 'text/plain')
fields[key] = (None, str(value), "text/plain")
if file:
if isinstance(file, str):
mime_type = mimetypes.guess_type(file)[0] or 'application/octet-stream'
fields['file'] = ('file', open(file, 'rb'), mime_type)
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
fields["file"] = ("file", open(file, "rb"), mime_type)
else:
fields['file'] = ('file', file, 'application/octet-stream')
fields["file"] = ("file", file, "application/octet-stream")
multipart = MultipartEncoder(fields=fields)
headers = self.client._get_headers(instance_token)
headers['Content-Type'] = multipart.content_type
url = f'{self.client.base_url}/message/sendPtv/{instance_id}'
headers["Content-Type"] = multipart.content_type
url = f"{self.client.base_url}/message/sendPtv/{instance_id}"
response = requests.post(url, headers=headers, data=multipart)
return response.json()
def send_whatsapp_audio(self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None):
def send_whatsapp_audio(
self,
instance_id: str,
message: dict,
instance_token: str,
file: Union[BinaryIO, str] = None,
):
fields = {}
# Adiciona todos os campos do message como text/plain
for key, value in message.items():
if key == 'delay' and value is not None:
fields[key] = (None, str(value), 'text/plain; type=number')
if key == "delay" and value is not None:
fields[key] = (None, str(value), "text/plain; type=number")
else:
fields[key] = (None, str(value), 'text/plain')
fields[key] = (None, str(value), "text/plain")
if file:
if isinstance(file, str):
mime_type = mimetypes.guess_type(file)[0] or 'application/octet-stream'
fields['file'] = ('file', open(file, 'rb'), mime_type)
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
fields["file"] = ("file", open(file, "rb"), mime_type)
else:
fields['file'] = ('file', file, 'application/octet-stream')
fields["file"] = ("file", file, "application/octet-stream")
multipart = MultipartEncoder(fields=fields)
headers = self.client._get_headers(instance_token)
headers['Content-Type'] = multipart.content_type
url = f'{self.client.base_url}/message/sendWhatsAppAudio/{instance_id}'
headers["Content-Type"] = multipart.content_type
url = f"{self.client.base_url}/message/sendWhatsAppAudio/{instance_id}"
response = requests.post(url, headers=headers, data=multipart)
return response.json()
def send_status(self, instance_id: str, message: StatusMessage, instance_token: str):
return self.client.post(
f'message/sendStatus/{instance_id}',
f"message/sendStatus/{instance_id}",
data=message.__dict__,
instance_token=instance_token
instance_token=instance_token,
)
def send_sticker(self, instance_id: str, message: dict, instance_token: str, file: Union[BinaryIO, str] = None):
def send_sticker(
self,
instance_id: str,
message: dict,
instance_token: str,
file: Union[BinaryIO, str] = None,
):
fields = {}
# Adiciona todos os campos do message como text/plain
for key, value in message.items():
if key == 'delay' and value is not None:
fields[key] = (None, str(value), 'text/plain; type=number')
if key == "delay" and value is not None:
fields[key] = (None, str(value), "text/plain; type=number")
else:
fields[key] = (None, str(value), 'text/plain')
fields[key] = (None, str(value), "text/plain")
if file:
if isinstance(file, str):
mime_type = mimetypes.guess_type(file)[0] or 'application/octet-stream'
fields['file'] = ('file', open(file, 'rb'), mime_type)
mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
fields["file"] = ("file", open(file, "rb"), mime_type)
else:
fields['file'] = ('file', file, 'application/octet-stream')
fields["file"] = ("file", file, "application/octet-stream")
multipart = MultipartEncoder(fields=fields)
headers = self.client._get_headers(instance_token)
headers['Content-Type'] = multipart.content_type
url = f'{self.client.base_url}/message/sendSticker/{instance_id}'
headers["Content-Type"] = multipart.content_type
url = f"{self.client.base_url}/message/sendSticker/{instance_id}"
response = requests.post(url, headers=headers, data=multipart)
return response.json()
def send_location(self, instance_id: str, message: LocationMessage, instance_token: str):
data = message.__dict__.copy()
if 'delay' in data and data['delay'] is not None:
data['delay'] = int(data['delay'])
if "delay" in data and data["delay"] is not None:
data["delay"] = int(data["delay"])
return self.client.post(
f'message/sendLocation/{instance_id}',
data=data,
instance_token=instance_token
f"message/sendLocation/{instance_id}", data=data, instance_token=instance_token
)
def send_contact(self, instance_id: str, message: ContactMessage, instance_token: str):
return self.client.post(
f'message/sendContact/{instance_id}',
f"message/sendContact/{instance_id}",
data=message.__dict__,
instance_token=instance_token
instance_token=instance_token,
)
def send_reaction(self, instance_id: str, message: ReactionMessage, instance_token: str):
return self.client.post(
f'message/sendReaction/{instance_id}',
f"message/sendReaction/{instance_id}",
data=message.__dict__,
instance_token=instance_token
instance_token=instance_token,
)
def send_poll(self, instance_id: str, message: PollMessage, instance_token: str):
data = message.__dict__.copy()
if 'delay' in data and data['delay'] is not None:
data['delay'] = int(data['delay'])
if "delay" in data and data["delay"] is not None:
data["delay"] = int(data["delay"])
return self.client.post(
f'message/sendPoll/{instance_id}',
data=data,
instance_token=instance_token
f"message/sendPoll/{instance_id}", data=data, instance_token=instance_token
)
def send_list(self, instance_id: str, message: ListMessage, instance_token: str):
data = message.__dict__.copy()
if 'delay' in data and data['delay'] is not None:
data['delay'] = int(data['delay'])
if "delay" in data and data["delay"] is not None:
data["delay"] = int(data["delay"])
return self.client.post(
f'message/sendList/{instance_id}',
data=data,
instance_token=instance_token
f"message/sendList/{instance_id}", data=data, instance_token=instance_token
)
def send_buttons(self, instance_id: str, message: ButtonMessage, instance_token: str):
data = message.__dict__.copy()
if 'delay' in data and data['delay'] is not None:
data['delay'] = int(data['delay'])
if "delay" in data and data["delay"] is not None:
data["delay"] = int(data["delay"])
return self.client.post(
f'message/sendButtons/{instance_id}',
data=data,
instance_token=instance_token
)
f"message/sendButtons/{instance_id}", data=data, instance_token=instance_token
)