Refactoring, migration to fs lib and add local storage protocol

This commit is contained in:
Valentin Chemiere 2015-03-17 19:00:15 +01:00
parent ff3c2735e6
commit 5f9e5b1f44
8 changed files with 261 additions and 255 deletions

View File

@ -11,7 +11,7 @@ Installation
To install this module, you need to: To install this module, you need to:
* FTPUtil python module * fs python module
* Paramiko python module * Paramiko python module
Usage Usage
@ -30,7 +30,6 @@ For further information, please visit:
Known issues / Roadmap Known issues / Roadmap
====================== ======================
* add FILESTORE protocol
Credits Credits
======= =======

View File

@ -43,6 +43,8 @@ class Location(models.Model):
if not cls._synchronize_type: if not cls._synchronize_type:
cls_info = (cls._key, cls._name) cls_info = (cls._key, cls._name)
res.append(cls_info) res.append(cls_info)
elif not cls._synchronize_type and cls._key and cls._name:
pass
return res return res
@api.onchange('protocol') @api.onchange('protocol')

View File

@ -50,10 +50,11 @@ class Task(models.Model):
return [('move', 'Move'), ('delete', 'Delete')] return [('move', 'Move'), ('delete', 'Delete')]
def _get_method(self): def _get_method(self):
print 'get method'
res = [] res = []
for cls in itersubclasses(AbstractTask): for cls in itersubclasses(AbstractTask):
if cls._synchronize_type \ if cls._synchronize_type: #\
and cls._key == self._context.get('protocol'): # and cls._key == self._context.get('protocol'):
cls_info = (cls._key + '_' + cls._synchronize_type, cls_info = (cls._key + '_' + cls._synchronize_type,
cls._name + ' ' + cls._synchronize_type) cls._name + ' ' + cls._synchronize_type)
res.append(cls_info) res.append(cls_info)

View File

@ -20,5 +20,7 @@
# #
############################################################################### ###############################################################################
from . import abstract_fs
from . import ftp from . import ftp
from . import sftp from . import sftp
from . import filestore

View File

@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (C) 2014 initOS GmbH & Co. KG (<http://www.initos.com>).
# @author Valentin CHEMIERE <valentin.chemiere@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from ..abstract_task import AbstractTask
from base64 import b64decode
import logging
import os
_logger = logging.getLogger(__name__)
class AbstractFSTask(AbstractTask):
_name = None
_key = None
_synchronize_type = None
def __init__(self, env, config):
self.env = env
self.host = config.get('host', '')
self.user = config.get('user', '')
self.pwd = config.get('pwd', '')
self.port = config.get('port', '')
self.allow_dir_creation = config.get('allow_dir_creation', '')
self.file_name = config.get('file_name', '')
self.path = config.get('path') or '.'
self.move_path = config.get('move_path', '')
self.after_import = config.get('after_import', False)
self.attachment_ids = config.get('attachment_ids', False)
self.task = config.get('task', False)
self.ext_hash = False
self.md5_check = config.get('md5_check', False)
def _handle_new_source(self, fs_conn, download_directory, file_name,
move_directory):
"""open and read given file into create_file method,
move file if move_directory is given"""
with fs_conn.open(self._source_name(download_directory, file_name),
"rb") as fileobj:
data = fileobj.read()
return self.create_file(file_name, data)
def _source_name(self, download_directory, file_name):
"""helper to get the full name"""
return os.path.join(download_directory, file_name)
def _move_file(self, fs_conn, source, target):
"""Moves a file on the FTP server"""
_logger.info('Moving file %s %s' % (source, target))
fs_conn.rename(source, target)
if self.md5_check:
fs_conn.rename(source + '.md5', target + '.md5')
def _delete_file(self, fs_conn, source):
"""Deletes a file from the FTP server"""
_logger.info('Deleting file %s' % source)
fs_conn.remove(source)
if self.md5_check:
fs_conn.remove(source + '.md5')
def _get_hash(self, file_name, fs_conn):
hash_file_name = file_name + '.md5'
with fs_conn.open(hash_file_name, 'rb') as f:
return f.read().rstrip('\r\n')
def _get_files(self, conn, path):
process_files = []
files_list = conn.listdir(path)
for file in files_list:
if file == self.file_name:
source_name = self._source_name(self.path, self.file_name)
process_files.append((file, source_name))
return process_files
def _process_file(self, conn, file_to_process):
if self.md5_check:
self.ext_hash = self._get_hash(file_to_process[1], conn)
self._handle_new_source(
conn,
self.path,
self.file_name,
self.move_path)
# Move/delete files only after all files have been processed.
if self.after_import == 'delete':
self._delete_file(conn, file_to_process[1])
elif self.after_import == 'move':
if not conn.path.exists(self.move_path):
conn.mkdir(self.move_path)
self._move_file(
conn,
file_to_process[1],
self._source_name(self.move_path, file_to_process[0]))
def _handle_existing_target(self, fs_conn, target_name, filedata):
raise Exception("%s already exists" % target_name)
def _handle_new_target(self, fs_conn, target_name, filedata):
try:
with fs_conn.open(target_name, mode='wb') as fileobj:
fileobj.write(filedata)
_logger.info('wrote %s, size %d', target_name, len(filedata))
self.attachment_id.state = 'done'
self.attachment_id.state_message = ''
except IOError:
self.attachment_id.state = 'failed'
self.attachment_id.state_message = (
'The directory doesn\'t exist or had insufficient rights')
def _target_name(self, fs_conn, upload_directory, filename):
return os.path.join(upload_directory, filename)
def _upload_file(self, conn, host, port, user, pwd, path, filename, filedata):
upload_directory = path or '.'
target_name = self._target_name(conn,
upload_directory,
filename)
if conn.isfile(target_name):
self._handle_existing_target(conn, target_name, filedata)
else:
self._handle_new_target(conn, target_name, filedata)

View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (C) 2014 initOS GmbH & Co. KG (<http://www.initos.com>).
# @author Valentin CHEMIERE <valentin.chemiere@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from .abstract_fs import AbstractFSTask
from base64 import b64decode
import fs
from fs.osfs import OSFS
from ftputil.error import FTPIOError
import logging
import os
_logger = logging.getLogger(__name__)
class FileStoreTask(AbstractFSTask):
_key = 'filestore'
_name = 'File Store'
_synchronize_type = None
_default_port = None
class FileStoreImportTask(FileStoreTask):
"""FTP Configuration options:
- host, user, password, port
- download_directory: directory on the FTP server where files are
downloaded from
- move_directory: If present, files will be moved to this directory
on the FTP server after download.
- delete_files: If true, files will be deleted on the FTP server
after download.
"""
_synchronize_type = 'import'
def run(self):
with OSFS(self.host) as fs_conn:
files_to_process = self._get_files(fs_conn, self.path)
for file_to_process in files_to_process:
self._process_file(fs_conn, file_to_process)
class FileStoreExportTask(FileStoreTask):
"""FTP Configuration options:
- host, user, password, port
- upload_directory: directory on the FTP server where files are
uploaded to
"""
_synchronize_type = 'export'
def run(self, async=True):
for attachment in self.attachment_ids:
if attachment.state in ('pending', 'failed'):
self.attachment_id = attachment
with OSFS(host) as fs_conn:
self._upload_file(fs_conn,
self.host,
self.port,
self.user,
self.pwd,
self.path,
attachment.datas_fname,
b64decode(attachment.datas))

View File

@ -19,39 +19,21 @@
# #
############################################################################## ##############################################################################
from ..abstract_task import AbstractTask from .abstract_fs import AbstractFSTask
from base64 import b64decode from base64 import b64decode
import ftputil from fs.ftpfs import FTPFS
import ftputil.session
from ftputil.error import FTPIOError
import logging import logging
import os import os
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class FtpTask(AbstractTask): class FtpTask(AbstractFSTask):
_key = 'ftp' _key = 'ftp'
_name = 'FTP' _name = 'FTP'
_synchronize_type = None _synchronize_type = None
_default_port = 21 _default_port = 21
def __init__(self, env, config):
self.env = env
self.host = config.get('host', '')
self.user = config.get('user', '')
self.pwd = config.get('pwd', '')
self.port = config.get('port', '')
self.allow_dir_creation = config.get('allow_dir_creation', '')
self.file_name = config.get('file_name', '')
self.path = config.get('path') or '.'
self.move_path = config.get('move_path', '')
self.after_import = config.get('after_import', False)
self.attachment_ids = config.get('attachment_ids', False)
self.task = config.get('task', False)
self.ext_hash = False
self.md5_check = config.get('md5_check', False)
class FtpImportTask(FtpTask): class FtpImportTask(FtpTask):
"""FTP Configuration options: """FTP Configuration options:
@ -66,76 +48,9 @@ class FtpImportTask(FtpTask):
_synchronize_type = 'import' _synchronize_type = 'import'
def _handle_new_source(self, ftp_conn, download_directory, file_name,
move_directory):
"""open and read given file into create_file method,
move file if move_directory is given"""
with ftp_conn.open(self._source_name(download_directory, file_name),
"rb") as fileobj:
data = fileobj.read()
return self.create_file(file_name, data)
def _source_name(self, download_directory, file_name):
"""helper to get the full name"""
return os.path.join(download_directory, file_name)
def _move_file(self, ftp_conn, source, target):
"""Moves a file on the FTP server"""
_logger.info('Moving file %s %s' % (source, target))
ftp_conn.rename(source, target)
if self.md5_check:
ftp_conn.rename(source + '.md5', target + '.md5')
def _delete_file(self, ftp_conn, source):
"""Deletes a file from the FTP server"""
_logger.info('Deleting file %s' % source)
ftp_conn.remove(source)
if self.md5_check:
ftp_conn.remove(source + '.md5')
def _get_hash(self, file_name, ftp_conn):
hash_file_name = file_name + '.md5'
with ftp_conn.open(hash_file_name, 'rb') as f:
return f.read().rstrip('\r\n')
def _get_files(self, conn, path):
process_files = []
files_list = conn.listdir(path)
for file in files_list:
if file == self.file_name:
source_name = self._source_name(self.path, self.file_name)
process_files.append((file, source_name))
return process_files
def _process_file(self, conn, file_to_process):
if self.md5_check:
self.ext_hash = self._get_hash(file_to_process[1], conn)
self._handle_new_source(
conn,
self.path,
self.file_name,
self.move_path)
# Move/delete files only after all files have been processed.
if self.after_import == 'delete':
self._delete_file(conn, file_to_process[1])
elif self.after_import == 'move':
if not conn.path.exists(self.move_path):
conn.mkdir(self.move_path)
self._move_file(
conn,
file_to_process[1],
self._source_name(self.move_path, file_to_process[0]))
def run(self): def run(self):
port_session_factory = ftputil.session.session_factory( with FTPFS(self.host, self.user, self.pwd, port=self.port) as ftp_conn:
port=self.port) files_to_process = self._get_files(ftp_conn, self.path)
with ftputil.FTPHost(self.host, self.user,
self.pwd,
session_factory=port_session_factory) as ftp_conn:
path = self.path or '.'
files_to_process = self._get_files(ftp_conn, path)
for file_to_process in files_to_process: for file_to_process in files_to_process:
self._process_file(ftp_conn, file_to_process) self._process_file(ftp_conn, file_to_process)
@ -149,42 +64,15 @@ class FtpExportTask(FtpTask):
_synchronize_type = 'export' _synchronize_type = 'export'
def _handle_existing_target(self, ftp_conn, target_name, filedata):
raise Exception("%s already exists" % target_name)
def _handle_new_target(self, ftp_conn, target_name, filedata):
try:
with ftp_conn.open(target_name, mode='wb') as fileobj:
fileobj.write(filedata)
_logger.info('wrote %s, size %d', target_name, len(filedata))
self.attachment_id.state = 'done'
self.attachment_id.state_message = ''
except FTPIOError:
self.attachment_id.state = 'failed'
self.attachment_id.state_message = (
'The directory doesn\'t exist or had insufficient rights')
def _target_name(self, ftp_conn, upload_directory, filename):
return upload_directory + '/' + filename
def _upload_file(self, host, port, user, pwd, path, filename, filedata):
upload_directory = path or '.'
port_session_factory = ftputil.session.session_factory(port=port)
with ftputil.FTPHost(host, user, pwd,
session_factory=port_session_factory) as ftp_conn:
target_name = self._target_name(ftp_conn,
upload_directory,
filename)
if ftp_conn.path.isfile(target_name):
self._handle_existing_target(ftp_conn, target_name, filedata)
else:
self._handle_new_target(ftp_conn, target_name, filedata)
def run(self, async=True): def run(self, async=True):
for attachment in self.attachment_ids: for attachment in self.attachment_ids:
if attachment.state in ('pending', 'failed'): if attachment.state in ('pending', 'failed'):
self.attachment_id = attachment self.attachment_id = attachment
self._upload_file(self.host, self.port, self.user, self.pwd, with FTPFS(self.host, self.user, self.pwd,
port=self.port) as ftp_conn:
self._upload_file(ftp_conn, self.host, self.port,
self.user,
self.pwd,
self.path, self.path,
attachment.datas_fname, attachment.datas_fname,
b64decode(attachment.datas)) b64decode(attachment.datas))

View File

@ -19,37 +19,21 @@
# #
############################################################################## ##############################################################################
from ..abstract_task import AbstractTask from .abstract_fs import AbstractFSTask
from base64 import b64decode from base64 import b64decode
import paramiko from fs.sftpfs import SFTPFS
import logging import logging
import os import os
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class SftpTask(AbstractTask): class SftpTask(AbstractFSTask):
_key = 'sftp' _key = 'sftp'
_name = 'SFTP' _name = 'SFTP'
_synchronize_type = None _synchronize_type = None
_default_port = 22 _default_port = 22
def __init__(self, env, config):
self.env = env
self.host = config.get('host', '')
self.user = config.get('user', '')
self.pwd = config.get('pwd', '')
self.port = config.get('port', '')
self.allow_dir_creation = config.get('allow_dir_creation', '')
self.file_name = config.get('file_name', '')
self.path = config.get('path') or '.'
self.move_path = config.get('move_path', '')
self.after_import = config.get('after_import', False)
self.attachment_ids = config.get('attachment_ids', False)
self.task = config.get('task', False)
self.ext_hash = False
self.md5_check = config.get('md5_check', False)
class SftpImportTask(SftpTask): class SftpImportTask(SftpTask):
"""FTP Configuration options: """FTP Configuration options:
@ -64,77 +48,14 @@ class SftpImportTask(SftpTask):
_synchronize_type = 'import' _synchronize_type = 'import'
def _handle_new_source(self, ftp_conn, download_directory, file_name,
move_directory):
"""open and read given file into create_file method,
move file if move_directory is given"""
with ftp_conn.open(self._source_name(download_directory, file_name),
"rb") as fileobj:
data = fileobj.read()
return self.create_file(file_name, data)
def _source_name(self, download_directory, file_name):
"""helper to get the full name"""
return os.path.join(download_directory, file_name)
def _move_file(self, ftp_conn, source, target):
"""Moves a file on the FTP server"""
_logger.info('Moving file %s %s' % (source, target))
ftp_conn.rename(source, target)
if self.md5_check:
ftp_conn.rename(source + '.md5', target + '.md5')
def _delete_file(self, ftp_conn, source):
"""Deletes a file from the FTP server"""
_logger.info('Deleting file %s' % source)
ftp_conn.remove(source)
if self.md5_check:
ftp_conn.remove(source + '.md5')
def _get_hash(self, file_name, ftp_conn):
hash_file_name = file_name + '.md5'
with ftp_conn.open(hash_file_name, 'rb') as f:
return f.read().rstrip('\r\n')
def _get_files(self, conn, path):
process_files = []
files_list = conn.listdir(path)
for file in files_list:
if file == self.file_name:
source_name = self._source_name(self.path, self.file_name)
process_files.append((file, source_name))
return process_files
def _process_file(self, conn, file_to_process):
if self.md5_check:
self.ext_hash = self._get_hash(file_to_process[1], conn)
self._handle_new_source(
conn,
self.path,
self.file_name,
self.move_path)
# Move/delete files only after all files have been processed.
if self.after_import == 'delete':
self._delete_file(conn, file_to_process[1])
elif self.after_import == 'move':
if not conn.path.exists(self.move_path):
conn.mkdir(self.move_path)
self._move_file(
conn,
file_to_process[1],
self._source_name(self.move_path, file_to_process[0]))
def run(self): def run(self):
ssh = paramiko.SSHClient() connection_string = "{}:{}".format(self.host, self.port)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) root = "/home/{}".format(self.user)
ssh.connect(self.host, self.port, with SFTPFS(connection=connection_string, root_path=root,
username=self.user, password=self.pwd) username=self.user, password=self.pwd) as sftp_conn:
ftp_conn = ssh.open_sftp() files_to_process = self._get_files(sftp_conn, self.path)
path = self.path or '.'
files_to_process = self._get_files(ftp_conn, path)
for file_to_process in files_to_process: for file_to_process in files_to_process:
self._process_file(ftp_conn, file_to_process) self._process_file(sftp_conn, file_to_process)
class SftpExportTask(SftpTask): class SftpExportTask(SftpTask):
@ -146,44 +67,18 @@ class SftpExportTask(SftpTask):
_synchronize_type = 'export' _synchronize_type = 'export'
def _handle_existing_target(self, ftp_conn, target_name, filedata):
raise Exception("%s already exists" % target_name)
def _handle_new_target(self, ftp_conn, target_name, filedata):
try:
with ftp_conn.open(target_name, mode='wb') as fileobj:
fileobj.write(filedata)
_logger.info('wrote %s, size %d', target_name, len(filedata))
self.attachment_id.state = 'done'
self.attachment_id.state_message = ''
except IOError:
self.attachment_id.state = 'failed'
self.attachment_id.state_message = (
'The directory doesn\'t exist or had insufficient rights')
def _target_name(self, ftp_conn, upload_directory, filename):
return os.path.join(upload_directory, filename)
def _upload_file(self, host, port, user, pwd, path, filename, filedata):
upload_directory = path or '.'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.host, self.port,
username=self.user, password=self.pwd)
ftp_conn = ssh.open_sftp()
target_name = self._target_name(ftp_conn,
upload_directory,
filename)
if filename in ftp_conn.listdir(upload_directory):
self._handle_existing_target(ftp_conn, target_name, filedata)
else:
self._handle_new_target(ftp_conn, target_name, filedata)
def run(self, async=True): def run(self, async=True):
for attachment in self.attachment_ids: for attachment in self.attachment_ids:
if attachment.state in ('pending', 'failed'): if attachment.state in ('pending', 'failed'):
self.attachment_id = attachment self.attachment_id = attachment
self._upload_file(self.host, self.port, self.user, self.pwd, connection_string = "{}:{}".format(self.host, self.port)
root = "/home/{}".format(self.user)
with SFTPFS(connection=connection_string, root_path=root,
username=self.user,
password=self.pwd) as sftp_conn:
self._upload_file(sftp_conn, self.host, self.port,
self.user,
self.pwd,
self.path, self.path,
attachment.datas_fname, attachment.datas_fname,
b64decode(attachment.datas)) b64decode(attachment.datas))