mirror of
https://github.com/OCA/knowledge.git
synced 2025-07-26 18:38:41 -06:00
Add test with mock server
This commit is contained in:
parent
39901ada9e
commit
ce4edf9b26
@ -24,3 +24,4 @@ from . import attachment
|
|||||||
from . import location
|
from . import location
|
||||||
from . import task
|
from . import task
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
from . import tests
|
||||||
|
@ -19,8 +19,8 @@ class AbstractTask(object):
|
|||||||
'name': filename,
|
'name': filename,
|
||||||
'datas': b64encode(data),
|
'datas': b64encode(data),
|
||||||
'datas_fname': filename,
|
'datas_fname': filename,
|
||||||
'task_id': self.task.id,
|
'task_id': self.task and self.task.id or False,
|
||||||
'location_id': self.task.location_id.id,
|
'location_id': self.task and self.task.location_id.id or False,
|
||||||
'external_hash': self.ext_hash
|
'external_hash': self.ext_hash
|
||||||
})
|
})
|
||||||
return ir_attachment_id
|
return ir_attachment_id
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
|
|
||||||
from .abstract_fs import AbstractFSTask
|
from .abstract_fs import AbstractFSTask
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
from fs.osfs import OSFS
|
from fs import osfs
|
||||||
import logging
|
import logging
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -42,7 +42,7 @@ class FileStoreImportTask(FileStoreTask):
|
|||||||
_synchronize_type = 'import'
|
_synchronize_type = 'import'
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
with OSFS(self.host) as fs_conn:
|
with osfs.OSFS(self.host) as fs_conn:
|
||||||
files_to_process = self._get_files(fs_conn, self.path)
|
files_to_process = self._get_files(fs_conn, self.path)
|
||||||
for file_to_process in files_to_process:
|
for file_to_process in files_to_process:
|
||||||
self._process_file(fs_conn, file_to_process)
|
self._process_file(fs_conn, file_to_process)
|
||||||
@ -56,7 +56,7 @@ class FileStoreExportTask(FileStoreTask):
|
|||||||
for attachment in self.attachment_ids:
|
for attachment in self.attachment_ids:
|
||||||
if attachment.state in ('pending', 'failed'):
|
if attachment.state in ('pending', 'failed'):
|
||||||
self.attachment_id = attachment
|
self.attachment_id = attachment
|
||||||
with OSFS(self.host) as fs_conn:
|
with osfs.OSFS(self.host) as fs_conn:
|
||||||
self._upload_file(fs_conn,
|
self._upload_file(fs_conn,
|
||||||
self.host,
|
self.host,
|
||||||
self.port,
|
self.port,
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
|
|
||||||
from .abstract_fs import AbstractFSTask
|
from .abstract_fs import AbstractFSTask
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
from fs.ftpfs import FTPFS
|
from fs import ftpfs
|
||||||
import logging
|
import logging
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -42,7 +42,7 @@ class FtpImportTask(FtpTask):
|
|||||||
_synchronize_type = 'import'
|
_synchronize_type = 'import'
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
with FTPFS(self.host, self.user, self.pwd, port=self.port) as ftp_conn:
|
with ftpfs.FTPFS(self.host, self.user, self.pwd, port=self.port) as ftp_conn:
|
||||||
files_to_process = self._get_files(ftp_conn, self.path)
|
files_to_process = self._get_files(ftp_conn, self.path)
|
||||||
for file_to_process in files_to_process:
|
for file_to_process in files_to_process:
|
||||||
self._process_file(ftp_conn, file_to_process)
|
self._process_file(ftp_conn, file_to_process)
|
||||||
@ -56,7 +56,7 @@ class FtpExportTask(FtpTask):
|
|||||||
for attachment in self.attachment_ids:
|
for attachment in self.attachment_ids:
|
||||||
if attachment.state in ('pending', 'failed'):
|
if attachment.state in ('pending', 'failed'):
|
||||||
self.attachment_id = attachment
|
self.attachment_id = attachment
|
||||||
with FTPFS(self.host, self.user, self.pwd,
|
with ftpfs.FTPFS(self.host, self.user, self.pwd,
|
||||||
port=self.port) as ftp_conn:
|
port=self.port) as ftp_conn:
|
||||||
self._upload_file(ftp_conn, self.host, self.port,
|
self._upload_file(ftp_conn, self.host, self.port,
|
||||||
self.user,
|
self.user,
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
|
|
||||||
from .abstract_fs import AbstractFSTask
|
from .abstract_fs import AbstractFSTask
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
from fs.sftpfs import SFTPFS
|
from fs import sftpfs
|
||||||
import logging
|
import logging
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -44,7 +44,7 @@ class SftpImportTask(SftpTask):
|
|||||||
def run(self):
|
def run(self):
|
||||||
connection_string = "{}:{}".format(self.host, self.port)
|
connection_string = "{}:{}".format(self.host, self.port)
|
||||||
root = "/home/{}".format(self.user)
|
root = "/home/{}".format(self.user)
|
||||||
with SFTPFS(connection=connection_string, root_path=root,
|
with sftpfs.SFTPFS(connection=connection_string, root_path=root,
|
||||||
username=self.user, password=self.pwd) as sftp_conn:
|
username=self.user, password=self.pwd) as sftp_conn:
|
||||||
files_to_process = self._get_files(sftp_conn, self.path)
|
files_to_process = self._get_files(sftp_conn, self.path)
|
||||||
for file_to_process in files_to_process:
|
for file_to_process in files_to_process:
|
||||||
@ -60,12 +60,13 @@ class SftpExportTask(SftpTask):
|
|||||||
if attachment.state in ('pending', 'failed'):
|
if attachment.state in ('pending', 'failed'):
|
||||||
self.attachment_id = attachment
|
self.attachment_id = attachment
|
||||||
connection_string = "{}:{}".format(self.host, self.port)
|
connection_string = "{}:{}".format(self.host, self.port)
|
||||||
with SFTPFS(connection=connection_string,
|
with sftpfs.SFTPFS(connection=connection_string,
|
||||||
username=self.user,
|
username=self.user,
|
||||||
password=self.pwd) as sftp_conn:
|
password=self.pwd) as sftp_conn:
|
||||||
|
datas = b64decode(attachment.datas)
|
||||||
self._upload_file(sftp_conn, self.host, self.port,
|
self._upload_file(sftp_conn, self.host, self.port,
|
||||||
self.user,
|
self.user,
|
||||||
self.pwd,
|
self.pwd,
|
||||||
self.path,
|
self.path,
|
||||||
attachment.datas_fname,
|
attachment.datas_fname,
|
||||||
b64decode(attachment.datas))
|
datas)
|
||||||
|
85
external_file_location/tests/.ropeproject/config.py
Normal file
85
external_file_location/tests/.ropeproject/config.py
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
# The default ``config.py``
|
||||||
|
|
||||||
|
|
||||||
|
def set_prefs(prefs):
|
||||||
|
"""This function is called before opening the project"""
|
||||||
|
|
||||||
|
# Specify which files and folders to ignore in the project.
|
||||||
|
# Changes to ignored resources are not added to the history and
|
||||||
|
# VCSs. Also they are not returned in `Project.get_files()`.
|
||||||
|
# Note that ``?`` and ``*`` match all characters but slashes.
|
||||||
|
# '*.pyc': matches 'test.pyc' and 'pkg/test.pyc'
|
||||||
|
# 'mod*.pyc': matches 'test/mod1.pyc' but not 'mod/1.pyc'
|
||||||
|
# '.svn': matches 'pkg/.svn' and all of its children
|
||||||
|
# 'build/*.o': matches 'build/lib.o' but not 'build/sub/lib.o'
|
||||||
|
# 'build//*.o': matches 'build/lib.o' and 'build/sub/lib.o'
|
||||||
|
prefs['ignored_resources'] = ['*.pyc', '*~', '.ropeproject',
|
||||||
|
'.hg', '.svn', '_svn', '.git']
|
||||||
|
|
||||||
|
# Specifies which files should be considered python files. It is
|
||||||
|
# useful when you have scripts inside your project. Only files
|
||||||
|
# ending with ``.py`` are considered to be python files by
|
||||||
|
# default.
|
||||||
|
#prefs['python_files'] = ['*.py']
|
||||||
|
|
||||||
|
# Custom source folders: By default rope searches the project
|
||||||
|
# for finding source folders (folders that should be searched
|
||||||
|
# for finding modules). You can add paths to that list. Note
|
||||||
|
# that rope guesses project source folders correctly most of the
|
||||||
|
# time; use this if you have any problems.
|
||||||
|
# The folders should be relative to project root and use '/' for
|
||||||
|
# separating folders regardless of the platform rope is running on.
|
||||||
|
# 'src/my_source_folder' for instance.
|
||||||
|
#prefs.add('source_folders', 'src')
|
||||||
|
|
||||||
|
# You can extend python path for looking up modules
|
||||||
|
#prefs.add('python_path', '~/python/')
|
||||||
|
|
||||||
|
# Should rope save object information or not.
|
||||||
|
prefs['save_objectdb'] = True
|
||||||
|
prefs['compress_objectdb'] = False
|
||||||
|
|
||||||
|
# If `True`, rope analyzes each module when it is being saved.
|
||||||
|
prefs['automatic_soa'] = True
|
||||||
|
# The depth of calls to follow in static object analysis
|
||||||
|
prefs['soa_followed_calls'] = 0
|
||||||
|
|
||||||
|
# If `False` when running modules or unit tests "dynamic object
|
||||||
|
# analysis" is turned off. This makes them much faster.
|
||||||
|
prefs['perform_doa'] = True
|
||||||
|
|
||||||
|
# Rope can check the validity of its object DB when running.
|
||||||
|
prefs['validate_objectdb'] = True
|
||||||
|
|
||||||
|
# How many undos to hold?
|
||||||
|
prefs['max_history_items'] = 32
|
||||||
|
|
||||||
|
# Shows whether to save history across sessions.
|
||||||
|
prefs['save_history'] = True
|
||||||
|
prefs['compress_history'] = False
|
||||||
|
|
||||||
|
# Set the number spaces used for indenting. According to
|
||||||
|
# :PEP:`8`, it is best to use 4 spaces. Since most of rope's
|
||||||
|
# unit-tests use 4 spaces it is more reliable, too.
|
||||||
|
prefs['indent_size'] = 4
|
||||||
|
|
||||||
|
# Builtin and c-extension modules that are allowed to be imported
|
||||||
|
# and inspected by rope.
|
||||||
|
prefs['extension_modules'] = []
|
||||||
|
|
||||||
|
# Add all standard c-extensions to extension_modules list.
|
||||||
|
prefs['import_dynload_stdmods'] = True
|
||||||
|
|
||||||
|
# If `True` modules with syntax errors are considered to be empty.
|
||||||
|
# The default value is `False`; When `False` syntax errors raise
|
||||||
|
# `rope.base.exceptions.ModuleSyntaxError` exception.
|
||||||
|
prefs['ignore_syntax_errors'] = False
|
||||||
|
|
||||||
|
# If `True`, rope ignores unresolvable imports. Otherwise, they
|
||||||
|
# appear in the importing namespace.
|
||||||
|
prefs['ignore_bad_imports'] = False
|
||||||
|
|
||||||
|
|
||||||
|
def project_opened(project):
|
||||||
|
"""This function is called after opening the project"""
|
||||||
|
# Do whatever you like here!
|
BIN
external_file_location/tests/.ropeproject/globalnames
Normal file
BIN
external_file_location/tests/.ropeproject/globalnames
Normal file
Binary file not shown.
1
external_file_location/tests/.ropeproject/history
Normal file
1
external_file_location/tests/.ropeproject/history
Normal file
@ -0,0 +1 @@
|
|||||||
|
€]q(]q]qe.
|
1
external_file_location/tests/.ropeproject/objectdb
Normal file
1
external_file_location/tests/.ropeproject/objectdb
Normal file
@ -0,0 +1 @@
|
|||||||
|
<EFBFBD>}q.
|
25
external_file_location/tests/__init__.py
Normal file
25
external_file_location/tests/__init__.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
###############################################################################
|
||||||
|
#
|
||||||
|
# Module for OpenERP
|
||||||
|
# Copyright (C) 2015 Akretion (http://www.akretion.com).
|
||||||
|
# @author Valentin CHEMIERE <valentin.chemiere@akretion.com>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
###############################################################################
|
||||||
|
|
||||||
|
import mock_server
|
||||||
|
import test_sftp
|
||||||
|
|
75
external_file_location/tests/mock_server.py
Normal file
75
external_file_location/tests/mock_server.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
###############################################################################
|
||||||
|
#
|
||||||
|
# Copyright (C) 2015 Akretion (http://www.akretion.com).
|
||||||
|
# @author Valentin CHEMIERE <valentin.chemiere@akretion.com>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
###############################################################################
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
|
||||||
|
class MultiResponse(dict):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ConnMock(object):
|
||||||
|
|
||||||
|
def __init__(self, response):
|
||||||
|
self.response = response
|
||||||
|
self._calls = []
|
||||||
|
self.call_count = defaultdict(int)
|
||||||
|
|
||||||
|
def __getattribute__(self, method):
|
||||||
|
if method not in ('_calls', 'response', 'call_count'):
|
||||||
|
def callable(*args, **kwargs):
|
||||||
|
self._calls.append({
|
||||||
|
'method': method,
|
||||||
|
'args': args,
|
||||||
|
'kwargs': kwargs,
|
||||||
|
})
|
||||||
|
call = self.response[method]
|
||||||
|
if isinstance(call, MultiResponse):
|
||||||
|
call = call[self.call_count[method]]
|
||||||
|
self.call_count[method] += 1
|
||||||
|
return call
|
||||||
|
|
||||||
|
return callable
|
||||||
|
else:
|
||||||
|
return super(ConnMock, self).__getattribute__(method)
|
||||||
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __enter__(self, *args, **kwargs):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __repr__(self, *args, **kwargs):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def server_mock(response):
|
||||||
|
with mock.patch('fs.sftpfs.SFTPFS', ConnMock(response)) as SFTPFS:
|
||||||
|
yield SFTPFS._calls
|
140
external_file_location/tests/test_sftp.py
Normal file
140
external_file_location/tests/test_sftp.py
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
###############################################################################
|
||||||
|
#
|
||||||
|
# Module for OpenERP
|
||||||
|
# Copyright (C) 2015 Akretion (http://www.akretion.com).
|
||||||
|
# @author Valentin CHEMIERE <valentin.chemiere@akretion.com>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
###############################################################################
|
||||||
|
|
||||||
|
import openerp.tests.common as common
|
||||||
|
from ..tasks.sftp import SftpImportTask
|
||||||
|
from ..tasks.sftp import SftpExportTask
|
||||||
|
from .mock_server import (server_mock)
|
||||||
|
from .mock_server import MultiResponse
|
||||||
|
from StringIO import StringIO
|
||||||
|
from base64 import b64decode
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
|
class ContextualStringIO(StringIO):
|
||||||
|
"""
|
||||||
|
snippet from http://bit.ly/1HfH6uW (stackoverflow)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args):
|
||||||
|
self.close()
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class TestNewSource(common.TransactionCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestNewSource, self).setUp()
|
||||||
|
self.test_file = ContextualStringIO()
|
||||||
|
self.test_file.write('import')
|
||||||
|
self.test_file.seek(0)
|
||||||
|
self.config = \
|
||||||
|
{'file_name': 'testfile',
|
||||||
|
'user': 'test',
|
||||||
|
'password': 'test',
|
||||||
|
'host': 'test',
|
||||||
|
'port': 22,
|
||||||
|
'attachment_ids': self.env['ir.attachment.metadata'].search([])
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_00_sftp_import(self):
|
||||||
|
with server_mock(
|
||||||
|
{'exists': True,
|
||||||
|
'makedir': True,
|
||||||
|
'open': self.test_file,
|
||||||
|
'listdir': ['testfile']
|
||||||
|
}):
|
||||||
|
task = SftpImportTask(self.env, self.config)
|
||||||
|
task.run()
|
||||||
|
search_file = self.env['ir.attachment.metadata'].search(
|
||||||
|
(('name', '=', 'testfile'),))
|
||||||
|
self.assertEqual(len(search_file), 1)
|
||||||
|
self.assertEqual(b64decode(search_file[0].datas), 'import')
|
||||||
|
|
||||||
|
def test_01_sftp_export(self):
|
||||||
|
with server_mock(
|
||||||
|
{'isfile': False,
|
||||||
|
'open': self.test_file,
|
||||||
|
}) as FakeSFTP:
|
||||||
|
task = SftpExportTask(self.env, self.config)
|
||||||
|
task.run()
|
||||||
|
self.assertEqual('open', FakeSFTP[-1]['method'])
|
||||||
|
|
||||||
|
def test_02_sftp_import_delete(self):
|
||||||
|
with server_mock(
|
||||||
|
{'exists': True,
|
||||||
|
'makedir': True,
|
||||||
|
'open': self.test_file,
|
||||||
|
'listdir': ['testfile'],
|
||||||
|
'remove': True
|
||||||
|
}) as FakeSFTP:
|
||||||
|
self.config.update({'after_import': 'delete'})
|
||||||
|
task = SftpImportTask(self.env, self.config)
|
||||||
|
task.run()
|
||||||
|
search_file = self.env['ir.attachment.metadata'].search(
|
||||||
|
(('name', '=', 'testfile'),))
|
||||||
|
self.assertEqual(len(search_file), 1)
|
||||||
|
self.assertEqual(b64decode(search_file[0].datas), 'import')
|
||||||
|
self.assertEqual('remove', FakeSFTP[-1]['method'])
|
||||||
|
|
||||||
|
def test_03_sftp_import_move(self):
|
||||||
|
with server_mock(
|
||||||
|
{'exists': True,
|
||||||
|
'makedir': True,
|
||||||
|
'open': self.test_file,
|
||||||
|
'listdir': ['testfile'],
|
||||||
|
'rename': True
|
||||||
|
}) as FakeSFTP:
|
||||||
|
self.config.update({'after_import': 'move', 'move_path': '/home'})
|
||||||
|
task = SftpImportTask(self.env, self.config)
|
||||||
|
task.run()
|
||||||
|
search_file = self.env['ir.attachment.metadata'].search(
|
||||||
|
(('name', '=', 'testfile'),))
|
||||||
|
self.assertEqual(len(search_file), 1)
|
||||||
|
self.assertEqual(b64decode(search_file[0].datas), 'import')
|
||||||
|
self.assertEqual('rename', FakeSFTP[-1]['method'])
|
||||||
|
|
||||||
|
def test_04_sftp_import_md5(self):
|
||||||
|
md5_file = ContextualStringIO()
|
||||||
|
md5_file.write(hashlib.md5('import').hexdigest())
|
||||||
|
md5_file.seek(0)
|
||||||
|
with server_mock(
|
||||||
|
{'exists': True,
|
||||||
|
'makedir': True,
|
||||||
|
'open': MultiResponse({
|
||||||
|
1: self.test_file,
|
||||||
|
0: md5_file
|
||||||
|
}),
|
||||||
|
'listdir': ['testfile', 'testfile.md5'],
|
||||||
|
}) as FakeSFTP:
|
||||||
|
self.config.update({'md5_check': True})
|
||||||
|
task = SftpImportTask(self.env, self.config)
|
||||||
|
task.run()
|
||||||
|
search_file = self.env['ir.attachment.metadata'].search(
|
||||||
|
(('name', '=', 'testfile'),))
|
||||||
|
self.assertEqual(len(search_file), 1)
|
||||||
|
self.assertEqual(b64decode(search_file[0].datas), 'import')
|
||||||
|
self.assertEqual('open', FakeSFTP[-1]['method'])
|
||||||
|
self.assertEqual('open', FakeSFTP[1]['method'])
|
||||||
|
self.assertEqual(('./testfile.md5', 'rb'), FakeSFTP[1]['args'])
|
Loading…
Reference in New Issue
Block a user