A
A
artem782018-02-07 23:27:29
Python
artem78, 2018-02-07 23:27:29

Why stream hangs after file upload error on FTP?

Created a module for uploading files to FTP in multiple streams. The module is used in a daemon that regularly checks for new files and uploads them to the server. The daemon is packaged with pyinstaller and runs on a Windows server. Sometimes the file download fails with the error '[WinError 10054] The remote host forcibly terminated the existing connection', in which case the upload_callbackstream hangs somewhere after execution. I tried to reproduce the situation on my computer, but the hang does not occur. What could be the reason?

from threading import Thread, RLock#, Lock
from queue import Queue
from ftplib import FTP
import logging
from os.path import basename


class _FTPMultiUploaderWorker(Thread):

    def __init__(self, host, port, login, password, directory, upload_callback, files_queue, lock):
        super().__init__()

        self._host = host
        self._port = port
        self._login = login
        self._password = password
        self._directory = directory
        self._upload_callback = upload_callback
        self._files_queue = files_queue
        self._lock = lock

        self._ftp = FTP()

    def run(self):
        logging.debug('Поток %s запущен', self.name)

        self._ftp_connect()

        try:
            while not self._files_queue.empty():
                file = self._files_queue.get()
                try:
                    self._upload_file(file)
                except Exception as e:
                    logging.exception('Ошибка при загрузке файла %s', file)
                    success = False
                    #raise e
                else:
                    success = True
                finally:
                    with self._lock:
                        self._upload_callback(file, success)

                    self._files_queue.task_done()

        finally:
            self._ftp_disconnect()

        logging.debug('Поток %s завершён', self.name)

    def _upload_file(self, file):
        filename = basename(file)
        logging.debug('Начинаем загружать файл %s', filename)
        with open(file, 'rb') as fobj:
            self._ftp.storbinary('STOR ' + filename, fobj)
        logging.info('Файл %s загружен', filename)

    def _ftp_connect(self):
        self._ftp.connect(self._host, self._port)
        self._ftp.login(self._login, self._password)
        logging.debug('Соединение с FTP установлено')
        self._ftp.cwd(self._directory)

    def _ftp_disconnect(self):
        self._ftp.quit()
        logging.debug('Соединение с FTP закрыто')


class FTPMultiUploader():

    def __init__(self, host, port, login, password, directory='/', threads_count=3, upload_callback=lambda file, success: None):
        self._host = host
        self._port = port
        self._login = login
        self._password = password
        self._directory = directory

        self._threads_count = threads_count
        self._upload_callback = upload_callback

        self._files_queue = Queue()
        self._lock = RLock()

    def add_file(self, file):
        self._files_queue.put(file)

    def run(self):
        if self._files_queue.empty():
            return

        threads = []
        for i in range(self._threads_count):
            thr = _FTPMultiUploaderWorker(
                host=self._host,
                port=self._port,
                login=self._login,
                password=self._password,
                directory=self._directory,
                upload_callback=self._upload_callback,
                files_queue=self._files_queue,
                lock=self._lock
            )
            thr.start()
            threads.append(thr)

        for thr in threads:
            thr.join()

Sending code:
ftp_uploader = FTPMultiUploader(
    host=self.config['ftp']['host'],
    port=int(self.config['ftp']['port']),
    login=self.config['ftp']['login'],
    password=self.config['ftp']['password'],
    directory=self.config['ftp']['dir'],
    upload_callback=self.on_uploaded,
    threads_count=int(self.config['main']['threads'])
)

for file_id in new_files_ids:
    stock = 'shutterstock'
    file = self.get_file_by_id(stock, file_id)
    if file is None:
        logging.warning('Файл #%s не найден', file_id)
        self.set_status(file_id, self.STATUS_ERROR)
        continue
    logging.info('Найден файл %s', file)

    # Помещаем в ZIP
    zip_file = os.path.join('temp/', '{}_{}.zip'.format(stock, file_id))
    logging.debug('Начинаем архивирование')

    with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
        zipf.write(file, os.path.basename(file))
    logging.debug('Архив создан: %s', zip_file)

    ftp_uploader.add_file(zip_file)

logging.debug('Начинаем загрузку файлов на FTP')
ftp_uploader.run()
logging.debug('Загрузка файлов завершена')

Answer the question

In order to leave comments, you need to log in

1 answer(s)
D
Dimonchik, 2018-02-07
@dimonchik2013

the number of connections to FTP is limited
connect this and directory listing also
replace ftplib with pycurl

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question