D
D
devlly2018-07-21 15:09:31
Django
devlly, 2018-07-21 15:09:31

Asynchrony and shared resources: how to make state information global for workers?

I have a class that checks for new articles on the site (parses the list of urls, finds the index of the (past) last article and if it is not zero, updates its last url, writes the urls one by one to a file and sends it to another class that checks that they are really new (gets into the same file and extracts other urls one by one (except the last one, because this is our url), compares pictures and makes a heading metric).
The fact is that the comparison process is quite long, and tasks are assigned to celery every minute. But when the second worker spawns, it starts from the beginning and doesn't see the last url updated (although it is stored in a class variable) until the previous worker has finished running. So far I have written an additional check, but this is probably not done. Those question is how to make the state information (last url) global for workers? or how to tell celery not to start a new worker until the previous one finishes, and if it didn’t fit in a minute (although asynchrony starts to limp, if I understand correctly)
Maybe create a new FlowListener instance each time so that it retrieves the last one from the file?
code here:
in tasks

listener = FlowListener()

@shared_task
def check_if_new():
    listener.start()

in the toolbox where the classes are
class FlowListener():
    last = open('last_article_log.txt').readlines()[-1].strip()

    def __init__(self,
                url=config_url,
                log='last_article_log.txt'):
        self.log = log
        self.url = url

    def start(self):
        self.soup = BeautifulSoup(requests.get(self.url).text, 'lxml')
        links = self.soup.table.find_all('a')
        self.urls = []
        for link in links:
            self.urls.append(link.get('href').strip())
        try:
            num = self.urls.index(FlowListener.last)
            if num:
                FlowListener.last = self.urls[0].strip()
                for url in reversed(self.urls[:num]):
                    with open(self.log, 'a') as log:
                        log.write(url+'\n')
                    Manager(url=url).manage()
        except ValueError:
            pass

Answer the question

In order to leave comments, you need to log in

2 answer(s)
D
devlly, 2018-07-21
@devlly

so far I have done that a new instance is created in the task, the last url is an instance variable, which is extracted from the file with the latest each time and the manager class is passed one url at a time - the earliest of the new ones seems to work as expected)

A
Alexey Cheremisin, 2018-07-21
@leahch

Install redis and store everything in it!

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question