Answer the question
In order to leave comments, you need to log in
Why stream hangs after file upload error on FTP?
Created a module for uploading files to FTP in multiple streams. The module is used in a daemon that regularly checks for new files and uploads them to the server. The daemon is packaged with pyinstaller and runs on a Windows server. Sometimes the file download fails with the error '[WinError 10054] The remote host forcibly terminated the existing connection', in which case the upload_callback
stream hangs somewhere after execution. I tried to reproduce the situation on my computer, but the hang does not occur. What could be the reason?
from threading import Thread, RLock#, Lock
from queue import Queue
from ftplib import FTP
import logging
from os.path import basename
class _FTPMultiUploaderWorker(Thread):
def __init__(self, host, port, login, password, directory, upload_callback, files_queue, lock):
super().__init__()
self._host = host
self._port = port
self._login = login
self._password = password
self._directory = directory
self._upload_callback = upload_callback
self._files_queue = files_queue
self._lock = lock
self._ftp = FTP()
def run(self):
logging.debug('Поток %s запущен', self.name)
self._ftp_connect()
try:
while not self._files_queue.empty():
file = self._files_queue.get()
try:
self._upload_file(file)
except Exception as e:
logging.exception('Ошибка при загрузке файла %s', file)
success = False
#raise e
else:
success = True
finally:
with self._lock:
self._upload_callback(file, success)
self._files_queue.task_done()
finally:
self._ftp_disconnect()
logging.debug('Поток %s завершён', self.name)
def _upload_file(self, file):
filename = basename(file)
logging.debug('Начинаем загружать файл %s', filename)
with open(file, 'rb') as fobj:
self._ftp.storbinary('STOR ' + filename, fobj)
logging.info('Файл %s загружен', filename)
def _ftp_connect(self):
self._ftp.connect(self._host, self._port)
self._ftp.login(self._login, self._password)
logging.debug('Соединение с FTP установлено')
self._ftp.cwd(self._directory)
def _ftp_disconnect(self):
self._ftp.quit()
logging.debug('Соединение с FTP закрыто')
class FTPMultiUploader():
def __init__(self, host, port, login, password, directory='/', threads_count=3, upload_callback=lambda file, success: None):
self._host = host
self._port = port
self._login = login
self._password = password
self._directory = directory
self._threads_count = threads_count
self._upload_callback = upload_callback
self._files_queue = Queue()
self._lock = RLock()
def add_file(self, file):
self._files_queue.put(file)
def run(self):
if self._files_queue.empty():
return
threads = []
for i in range(self._threads_count):
thr = _FTPMultiUploaderWorker(
host=self._host,
port=self._port,
login=self._login,
password=self._password,
directory=self._directory,
upload_callback=self._upload_callback,
files_queue=self._files_queue,
lock=self._lock
)
thr.start()
threads.append(thr)
for thr in threads:
thr.join()
ftp_uploader = FTPMultiUploader(
host=self.config['ftp']['host'],
port=int(self.config['ftp']['port']),
login=self.config['ftp']['login'],
password=self.config['ftp']['password'],
directory=self.config['ftp']['dir'],
upload_callback=self.on_uploaded,
threads_count=int(self.config['main']['threads'])
)
for file_id in new_files_ids:
stock = 'shutterstock'
file = self.get_file_by_id(stock, file_id)
if file is None:
logging.warning('Файл #%s не найден', file_id)
self.set_status(file_id, self.STATUS_ERROR)
continue
logging.info('Найден файл %s', file)
# Помещаем в ZIP
zip_file = os.path.join('temp/', '{}_{}.zip'.format(stock, file_id))
logging.debug('Начинаем архивирование')
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file, os.path.basename(file))
logging.debug('Архив создан: %s', zip_file)
ftp_uploader.add_file(zip_file)
logging.debug('Начинаем загрузку файлов на FTP')
ftp_uploader.run()
logging.debug('Загрузка файлов завершена')
Answer the question
In order to leave comments, you need to log in
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question