diff --git a/external_file_location/README.rst b/external_file_location/README.rst index 94313350..d245fa00 100644 --- a/external_file_location/README.rst +++ b/external_file_location/README.rst @@ -11,7 +11,7 @@ Installation To install this module, you need to: -* FTPUtil python module +* fs python module * Paramiko python module Usage @@ -30,7 +30,6 @@ For further information, please visit: Known issues / Roadmap ====================== -* add FILESTORE protocol Credits ======= diff --git a/external_file_location/location.py b/external_file_location/location.py index 6d082c99..99eebb39 100644 --- a/external_file_location/location.py +++ b/external_file_location/location.py @@ -43,6 +43,8 @@ class Location(models.Model): if not cls._synchronize_type: cls_info = (cls._key, cls._name) res.append(cls_info) + elif not cls._synchronize_type and cls._key and cls._name: + pass return res @api.onchange('protocol') diff --git a/external_file_location/task.py b/external_file_location/task.py index ffadedb5..69a1294f 100644 --- a/external_file_location/task.py +++ b/external_file_location/task.py @@ -50,10 +50,11 @@ class Task(models.Model): return [('move', 'Move'), ('delete', 'Delete')] def _get_method(self): + print 'get method' res = [] for cls in itersubclasses(AbstractTask): - if cls._synchronize_type \ - and cls._key == self._context.get('protocol'): + if cls._synchronize_type: #\ + # and cls._key == self._context.get('protocol'): cls_info = (cls._key + '_' + cls._synchronize_type, cls._name + ' ' + cls._synchronize_type) res.append(cls_info) diff --git a/external_file_location/tasks/__init__.py b/external_file_location/tasks/__init__.py index 97071ed0..61548c1a 100644 --- a/external_file_location/tasks/__init__.py +++ b/external_file_location/tasks/__init__.py @@ -20,5 +20,7 @@ # ############################################################################### +from . import abstract_fs from . import ftp from . import sftp +from . import filestore diff --git a/external_file_location/tasks/abstract_fs.py b/external_file_location/tasks/abstract_fs.py new file mode 100644 index 00000000..d3e1a1d2 --- /dev/null +++ b/external_file_location/tasks/abstract_fs.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# Copyright (C) 2014 initOS GmbH & Co. KG (). +# @author Valentin CHEMIERE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +from ..abstract_task import AbstractTask +from base64 import b64decode +import logging +import os +_logger = logging.getLogger(__name__) + + +class AbstractFSTask(AbstractTask): + + _name = None + _key = None + _synchronize_type = None + + def __init__(self, env, config): + self.env = env + self.host = config.get('host', '') + self.user = config.get('user', '') + self.pwd = config.get('pwd', '') + self.port = config.get('port', '') + self.allow_dir_creation = config.get('allow_dir_creation', '') + self.file_name = config.get('file_name', '') + self.path = config.get('path') or '.' + self.move_path = config.get('move_path', '') + self.after_import = config.get('after_import', False) + self.attachment_ids = config.get('attachment_ids', False) + self.task = config.get('task', False) + self.ext_hash = False + self.md5_check = config.get('md5_check', False) + + def _handle_new_source(self, fs_conn, download_directory, file_name, + move_directory): + """open and read given file into create_file method, + move file if move_directory is given""" + with fs_conn.open(self._source_name(download_directory, file_name), + "rb") as fileobj: + data = fileobj.read() + return self.create_file(file_name, data) + + def _source_name(self, download_directory, file_name): + """helper to get the full name""" + return os.path.join(download_directory, file_name) + + def _move_file(self, fs_conn, source, target): + """Moves a file on the FTP server""" + _logger.info('Moving file %s %s' % (source, target)) + fs_conn.rename(source, target) + if self.md5_check: + fs_conn.rename(source + '.md5', target + '.md5') + + def _delete_file(self, fs_conn, source): + """Deletes a file from the FTP server""" + _logger.info('Deleting file %s' % source) + fs_conn.remove(source) + if self.md5_check: + fs_conn.remove(source + '.md5') + + def _get_hash(self, file_name, fs_conn): + hash_file_name = file_name + '.md5' + with fs_conn.open(hash_file_name, 'rb') as f: + return f.read().rstrip('\r\n') + + def _get_files(self, conn, path): + process_files = [] + files_list = conn.listdir(path) + for file in files_list: + if file == self.file_name: + source_name = self._source_name(self.path, self.file_name) + process_files.append((file, source_name)) + return process_files + + def _process_file(self, conn, file_to_process): + if self.md5_check: + self.ext_hash = self._get_hash(file_to_process[1], conn) + self._handle_new_source( + conn, + self.path, + self.file_name, + self.move_path) + + # Move/delete files only after all files have been processed. + if self.after_import == 'delete': + self._delete_file(conn, file_to_process[1]) + elif self.after_import == 'move': + if not conn.path.exists(self.move_path): + conn.mkdir(self.move_path) + self._move_file( + conn, + file_to_process[1], + self._source_name(self.move_path, file_to_process[0])) + + def _handle_existing_target(self, fs_conn, target_name, filedata): + raise Exception("%s already exists" % target_name) + + def _handle_new_target(self, fs_conn, target_name, filedata): + try: + with fs_conn.open(target_name, mode='wb') as fileobj: + fileobj.write(filedata) + _logger.info('wrote %s, size %d', target_name, len(filedata)) + self.attachment_id.state = 'done' + self.attachment_id.state_message = '' + except IOError: + self.attachment_id.state = 'failed' + self.attachment_id.state_message = ( + 'The directory doesn\'t exist or had insufficient rights') + + def _target_name(self, fs_conn, upload_directory, filename): + return os.path.join(upload_directory, filename) + + def _upload_file(self, conn, host, port, user, pwd, path, filename, filedata): + upload_directory = path or '.' + target_name = self._target_name(conn, + upload_directory, + filename) + if conn.isfile(target_name): + self._handle_existing_target(conn, target_name, filedata) + else: + self._handle_new_target(conn, target_name, filedata) diff --git a/external_file_location/tasks/filestore.py b/external_file_location/tasks/filestore.py new file mode 100644 index 00000000..8dce3d52 --- /dev/null +++ b/external_file_location/tasks/filestore.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# Copyright (C) 2014 initOS GmbH & Co. KG (). +# @author Valentin CHEMIERE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +from .abstract_fs import AbstractFSTask +from base64 import b64decode +import fs +from fs.osfs import OSFS +from ftputil.error import FTPIOError +import logging +import os +_logger = logging.getLogger(__name__) + + +class FileStoreTask(AbstractFSTask): + + _key = 'filestore' + _name = 'File Store' + _synchronize_type = None + _default_port = None + + +class FileStoreImportTask(FileStoreTask): + """FTP Configuration options: + - host, user, password, port + - download_directory: directory on the FTP server where files are + downloaded from + - move_directory: If present, files will be moved to this directory + on the FTP server after download. + - delete_files: If true, files will be deleted on the FTP server + after download. + """ + + _synchronize_type = 'import' + + def run(self): + with OSFS(self.host) as fs_conn: + files_to_process = self._get_files(fs_conn, self.path) + for file_to_process in files_to_process: + self._process_file(fs_conn, file_to_process) + + +class FileStoreExportTask(FileStoreTask): + """FTP Configuration options: + - host, user, password, port + - upload_directory: directory on the FTP server where files are + uploaded to + """ + + _synchronize_type = 'export' + + def run(self, async=True): + for attachment in self.attachment_ids: + if attachment.state in ('pending', 'failed'): + self.attachment_id = attachment + with OSFS(host) as fs_conn: + self._upload_file(fs_conn, + self.host, + self.port, + self.user, + self.pwd, + self.path, + attachment.datas_fname, + b64decode(attachment.datas)) diff --git a/external_file_location/tasks/ftp.py b/external_file_location/tasks/ftp.py index 26566fe8..2e58493a 100644 --- a/external_file_location/tasks/ftp.py +++ b/external_file_location/tasks/ftp.py @@ -19,39 +19,21 @@ # ############################################################################## -from ..abstract_task import AbstractTask +from .abstract_fs import AbstractFSTask from base64 import b64decode -import ftputil -import ftputil.session -from ftputil.error import FTPIOError +from fs.ftpfs import FTPFS import logging import os _logger = logging.getLogger(__name__) -class FtpTask(AbstractTask): +class FtpTask(AbstractFSTask): _key = 'ftp' _name = 'FTP' _synchronize_type = None _default_port = 21 - def __init__(self, env, config): - self.env = env - self.host = config.get('host', '') - self.user = config.get('user', '') - self.pwd = config.get('pwd', '') - self.port = config.get('port', '') - self.allow_dir_creation = config.get('allow_dir_creation', '') - self.file_name = config.get('file_name', '') - self.path = config.get('path') or '.' - self.move_path = config.get('move_path', '') - self.after_import = config.get('after_import', False) - self.attachment_ids = config.get('attachment_ids', False) - self.task = config.get('task', False) - self.ext_hash = False - self.md5_check = config.get('md5_check', False) - class FtpImportTask(FtpTask): """FTP Configuration options: @@ -66,76 +48,9 @@ class FtpImportTask(FtpTask): _synchronize_type = 'import' - def _handle_new_source(self, ftp_conn, download_directory, file_name, - move_directory): - """open and read given file into create_file method, - move file if move_directory is given""" - with ftp_conn.open(self._source_name(download_directory, file_name), - "rb") as fileobj: - data = fileobj.read() - return self.create_file(file_name, data) - - def _source_name(self, download_directory, file_name): - """helper to get the full name""" - return os.path.join(download_directory, file_name) - - def _move_file(self, ftp_conn, source, target): - """Moves a file on the FTP server""" - _logger.info('Moving file %s %s' % (source, target)) - ftp_conn.rename(source, target) - if self.md5_check: - ftp_conn.rename(source + '.md5', target + '.md5') - - def _delete_file(self, ftp_conn, source): - """Deletes a file from the FTP server""" - _logger.info('Deleting file %s' % source) - ftp_conn.remove(source) - if self.md5_check: - ftp_conn.remove(source + '.md5') - - def _get_hash(self, file_name, ftp_conn): - hash_file_name = file_name + '.md5' - with ftp_conn.open(hash_file_name, 'rb') as f: - return f.read().rstrip('\r\n') - - def _get_files(self, conn, path): - process_files = [] - files_list = conn.listdir(path) - for file in files_list: - if file == self.file_name: - source_name = self._source_name(self.path, self.file_name) - process_files.append((file, source_name)) - return process_files - - def _process_file(self, conn, file_to_process): - if self.md5_check: - self.ext_hash = self._get_hash(file_to_process[1], conn) - self._handle_new_source( - conn, - self.path, - self.file_name, - self.move_path) - - # Move/delete files only after all files have been processed. - if self.after_import == 'delete': - self._delete_file(conn, file_to_process[1]) - elif self.after_import == 'move': - if not conn.path.exists(self.move_path): - conn.mkdir(self.move_path) - self._move_file( - conn, - file_to_process[1], - self._source_name(self.move_path, file_to_process[0])) - def run(self): - port_session_factory = ftputil.session.session_factory( - port=self.port) - with ftputil.FTPHost(self.host, self.user, - self.pwd, - session_factory=port_session_factory) as ftp_conn: - - path = self.path or '.' - files_to_process = self._get_files(ftp_conn, path) + with FTPFS(self.host, self.user, self.pwd, port=self.port) as ftp_conn: + files_to_process = self._get_files(ftp_conn, self.path) for file_to_process in files_to_process: self._process_file(ftp_conn, file_to_process) @@ -149,42 +64,15 @@ class FtpExportTask(FtpTask): _synchronize_type = 'export' - def _handle_existing_target(self, ftp_conn, target_name, filedata): - raise Exception("%s already exists" % target_name) - - def _handle_new_target(self, ftp_conn, target_name, filedata): - try: - with ftp_conn.open(target_name, mode='wb') as fileobj: - fileobj.write(filedata) - _logger.info('wrote %s, size %d', target_name, len(filedata)) - self.attachment_id.state = 'done' - self.attachment_id.state_message = '' - except FTPIOError: - self.attachment_id.state = 'failed' - self.attachment_id.state_message = ( - 'The directory doesn\'t exist or had insufficient rights') - - def _target_name(self, ftp_conn, upload_directory, filename): - return upload_directory + '/' + filename - - def _upload_file(self, host, port, user, pwd, path, filename, filedata): - upload_directory = path or '.' - port_session_factory = ftputil.session.session_factory(port=port) - with ftputil.FTPHost(host, user, pwd, - session_factory=port_session_factory) as ftp_conn: - target_name = self._target_name(ftp_conn, - upload_directory, - filename) - if ftp_conn.path.isfile(target_name): - self._handle_existing_target(ftp_conn, target_name, filedata) - else: - self._handle_new_target(ftp_conn, target_name, filedata) - def run(self, async=True): for attachment in self.attachment_ids: if attachment.state in ('pending', 'failed'): self.attachment_id = attachment - self._upload_file(self.host, self.port, self.user, self.pwd, - self.path, - attachment.datas_fname, - b64decode(attachment.datas)) + with FTPFS(self.host, self.user, self.pwd, + port=self.port) as ftp_conn: + self._upload_file(ftp_conn, self.host, self.port, + self.user, + self.pwd, + self.path, + attachment.datas_fname, + b64decode(attachment.datas)) diff --git a/external_file_location/tasks/sftp.py b/external_file_location/tasks/sftp.py index 3e02564f..1f62472f 100644 --- a/external_file_location/tasks/sftp.py +++ b/external_file_location/tasks/sftp.py @@ -19,37 +19,21 @@ # ############################################################################## -from ..abstract_task import AbstractTask +from .abstract_fs import AbstractFSTask from base64 import b64decode -import paramiko +from fs.sftpfs import SFTPFS import logging import os _logger = logging.getLogger(__name__) -class SftpTask(AbstractTask): +class SftpTask(AbstractFSTask): _key = 'sftp' _name = 'SFTP' _synchronize_type = None _default_port = 22 - def __init__(self, env, config): - self.env = env - self.host = config.get('host', '') - self.user = config.get('user', '') - self.pwd = config.get('pwd', '') - self.port = config.get('port', '') - self.allow_dir_creation = config.get('allow_dir_creation', '') - self.file_name = config.get('file_name', '') - self.path = config.get('path') or '.' - self.move_path = config.get('move_path', '') - self.after_import = config.get('after_import', False) - self.attachment_ids = config.get('attachment_ids', False) - self.task = config.get('task', False) - self.ext_hash = False - self.md5_check = config.get('md5_check', False) - class SftpImportTask(SftpTask): """FTP Configuration options: @@ -64,77 +48,14 @@ class SftpImportTask(SftpTask): _synchronize_type = 'import' - def _handle_new_source(self, ftp_conn, download_directory, file_name, - move_directory): - """open and read given file into create_file method, - move file if move_directory is given""" - with ftp_conn.open(self._source_name(download_directory, file_name), - "rb") as fileobj: - data = fileobj.read() - return self.create_file(file_name, data) - - def _source_name(self, download_directory, file_name): - """helper to get the full name""" - return os.path.join(download_directory, file_name) - - def _move_file(self, ftp_conn, source, target): - """Moves a file on the FTP server""" - _logger.info('Moving file %s %s' % (source, target)) - ftp_conn.rename(source, target) - if self.md5_check: - ftp_conn.rename(source + '.md5', target + '.md5') - - def _delete_file(self, ftp_conn, source): - """Deletes a file from the FTP server""" - _logger.info('Deleting file %s' % source) - ftp_conn.remove(source) - if self.md5_check: - ftp_conn.remove(source + '.md5') - - def _get_hash(self, file_name, ftp_conn): - hash_file_name = file_name + '.md5' - with ftp_conn.open(hash_file_name, 'rb') as f: - return f.read().rstrip('\r\n') - - def _get_files(self, conn, path): - process_files = [] - files_list = conn.listdir(path) - for file in files_list: - if file == self.file_name: - source_name = self._source_name(self.path, self.file_name) - process_files.append((file, source_name)) - return process_files - - def _process_file(self, conn, file_to_process): - if self.md5_check: - self.ext_hash = self._get_hash(file_to_process[1], conn) - self._handle_new_source( - conn, - self.path, - self.file_name, - self.move_path) - - # Move/delete files only after all files have been processed. - if self.after_import == 'delete': - self._delete_file(conn, file_to_process[1]) - elif self.after_import == 'move': - if not conn.path.exists(self.move_path): - conn.mkdir(self.move_path) - self._move_file( - conn, - file_to_process[1], - self._source_name(self.move_path, file_to_process[0])) - def run(self): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.host, self.port, - username=self.user, password=self.pwd) - ftp_conn = ssh.open_sftp() - path = self.path or '.' - files_to_process = self._get_files(ftp_conn, path) - for file_to_process in files_to_process: - self._process_file(ftp_conn, file_to_process) + connection_string = "{}:{}".format(self.host, self.port) + root = "/home/{}".format(self.user) + with SFTPFS(connection=connection_string, root_path=root, + username=self.user, password=self.pwd) as sftp_conn: + files_to_process = self._get_files(sftp_conn, self.path) + for file_to_process in files_to_process: + self._process_file(sftp_conn, file_to_process) class SftpExportTask(SftpTask): @@ -146,44 +67,18 @@ class SftpExportTask(SftpTask): _synchronize_type = 'export' - def _handle_existing_target(self, ftp_conn, target_name, filedata): - raise Exception("%s already exists" % target_name) - - def _handle_new_target(self, ftp_conn, target_name, filedata): - try: - with ftp_conn.open(target_name, mode='wb') as fileobj: - fileobj.write(filedata) - _logger.info('wrote %s, size %d', target_name, len(filedata)) - self.attachment_id.state = 'done' - self.attachment_id.state_message = '' - except IOError: - self.attachment_id.state = 'failed' - self.attachment_id.state_message = ( - 'The directory doesn\'t exist or had insufficient rights') - - def _target_name(self, ftp_conn, upload_directory, filename): - return os.path.join(upload_directory, filename) - - def _upload_file(self, host, port, user, pwd, path, filename, filedata): - upload_directory = path or '.' - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.host, self.port, - username=self.user, password=self.pwd) - ftp_conn = ssh.open_sftp() - target_name = self._target_name(ftp_conn, - upload_directory, - filename) - if filename in ftp_conn.listdir(upload_directory): - self._handle_existing_target(ftp_conn, target_name, filedata) - else: - self._handle_new_target(ftp_conn, target_name, filedata) - def run(self, async=True): for attachment in self.attachment_ids: if attachment.state in ('pending', 'failed'): self.attachment_id = attachment - self._upload_file(self.host, self.port, self.user, self.pwd, - self.path, - attachment.datas_fname, - b64decode(attachment.datas)) + connection_string = "{}:{}".format(self.host, self.port) + root = "/home/{}".format(self.user) + with SFTPFS(connection=connection_string, root_path=root, + username=self.user, + password=self.pwd) as sftp_conn: + self._upload_file(sftp_conn, self.host, self.port, + self.user, + self.pwd, + self.path, + attachment.datas_fname, + b64decode(attachment.datas))