Реализация очереди задач Redis
Прежде всего, рассмотрим требования к установке. Пройдемся по списку используемых инструментов:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) - простая библиотека для создания очереди задач
Начните с загрузки и установки Redis с официального сайта или через Homebrew (brew install redis). После установки запустите сервер Redis:
$ redis-server
Затем установите Python Redis и RQ в новом окне терминала:
$ cd flask-by-example $ python -m pip install redis==3.4.1 rq==1.2.2 $ python -m pip freeze > requirements.txt
Давайте начнем с настройки рабочего процесса (worker) для прослушивания поставленных в очередь задач. Создадим новый файл worker.py, и добавим этот код:
import os import redis from rq import Worker, Queue, Connection listen = ['default'] redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379') conn = redis.from_url(redis_url) if __name__ == '__main__': with Connection(conn): worker = Worker(list(map(Queue, listen))) worker.work()
Данный процесс показывает, что мы "прослушали очередь" под названием default и установили соединение с сервером Redis на локальном хосте:6379.
Запустим это в другом окне терминала:
$ cd flask-by-example $ python worker.py 17:01:29 RQ worker started, version 0.5.6 17:01:29 17:01:29 *** Listening on default...
Теперь нам нужно обновить наш app.py для отправки заданий в очередь.
Обновим app.py и добавим следующие данные import в app.py:
from rq import Queue from rq.job import Job from worker import conn
Затем обновим раздел конфигурации:
app = Flask(__name__) app.config.from_object(os.environ['APP_SETTINGS']) app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True db = SQLAlchemy(app) q = Queue(connection=conn) from models import *
q = Queue(connection=conn)(очередь) настройте соединение Redis и инициализируйте очередь на основе этого соединения.
Переместите функцию обработки текста из нашего маршрута индекса в новую функцию с именем count_and_save_words(). Эта функция принимает один аргумент, URL-адрес, который мы передадим ей при вызове из нашего маршрута индекса:
def count_and_save_words(url): errors = [] try: r = requests.get(url) except: errors.append( "Unable to get URL. Please make sure it's valid and try again." ) return {"error": errors} # text processing raw = BeautifulSoup(r.text).get_text() nltk.data.path.append('./nltk_data/') # set the path tokens = nltk.word_tokenize(raw) text = nltk.Text(tokens) # remove punctuation, count raw words nonPunct = re.compile('.*[A-Za-z].*') raw_words = [w for w in text if nonPunct.match(w)] raw_word_count = Counter(raw_words) # stop words no_stop_words = [w for w in raw_words if w.lower() not in stops] no_stop_words_count = Counter(no_stop_words) # save the results try: result = Result( url=url, result_all=raw_word_count, result_no_stop_words=no_stop_words_count ) db.session.add(result) db.session.commit() return result.id except: errors.append("Unable to add item to database.") return {"error": errors} @app.route('/', methods=['GET', 'POST']) def index(): results = {} if request.method == "POST": # this import solves a rq bug which currently exists from app import count_and_save_words # get url that the person has entered url = request.form['url'] if not url[:8].startswith(('https://', 'http://')): url = 'http://' + url job = q.enqueue_call( func=count_and_save_words, args=(url,), result_ttl=5000 ) print(job.get_id()) return render_template('index.html', results=results)
Также, лучше всего обратить внимание на следующий код:
job = q.enqueue_call( func=count_and_save_words, args=(url,), result_ttl=5000 ) print(job.get_id())
Нам нужно импортировать функцию count_and_save_words в наш индекс функций, так как пакет RQ в настоящее время имеет ошибку, из-за которой он не может найти функции в том же модуле.
Здесь мы использовали очередь, которую мы инициализировали ранее, и вызвали функцию enqueue_call (). Это добавило новое задание в очередь, и это задание запустило функцию count_and_save_words() с URL-адресом в качестве аргумента. Аргумент строки result_ttl=5000 указывает RQ, как долго нужно удерживать результат задания в течение - в данном случае 5000 секунд. Затем мы вывели идентификатор задания на терминал. Этот идентификатор необходим для проверки выполнения задания.
Получение результатов.
@app.route("/results/<job_key>", methods=['GET']) def get_results(job_key): job = Job.fetch(job_key, connection=conn) if job.is_finished: return str(job.result), 200 else: return "Nay!", 202
Время для проверки.
Запустите сервер, перейдите к http://localhost:5000/, используйте URL-адрес https://realpython.com, и возьмите идентификатор задания из терминала. Затем используйте этот идентификатор в конечной точке ‘/results/’, т. Е. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Если до проверки состояния прошло менее 5000 секунд, вы должны увидеть идентификационный номер, который генерируется при добавлении результатов в базу данных:
# save the results try: from models import Result result = Result( url=url, result_all=raw_word_count, result_no_stop_words=no_stop_words_count ) db.session.add(result) db.session.commit() return result.id
Теперь давайте немного переработаем маршрут, чтобы вернуть фактические результаты из базы данных в JSON:
@app.route("/results/<job_key>", methods=['GET']) def get_results(job_key): job = Job.fetch(job_key, connection=conn) if job.is_finished: result = Result.query.filter_by(id=job.result).first() results = sorted( result.result_no_stop_words.items(), key=operator.itemgetter(1), reverse=True )[:10] return jsonify(results) else: return "Nay!", 202
from flask import jsonify
Проверьте это еще раз. Если все прошло хорошо, вы должны увидеть что-то похожее в своем браузере:
[ [ "Python", 315 ], [ "intermediate", 167 ], [ "python", 161 ], [ "basics", 118 ], [ "web-dev", 108 ], [ "data-science", 51 ], [ "best-practices", 49 ], [ "advanced", 45 ], [ "django", 43 ], [ "flask", 41 ] ]
Возвращаясь к части 5, в самом начале, стоит отметить, что мы объединим клиент и сервер, добавив Angular в микс, чтобы создать опросник, который каждые пять секунд будет отправлять запрос на конечную точку /results/<job_key> с запросом на обновления. Как только данные будут доступны, мы добавим их в DOM.
Это все! Ваше здоровье!
Если вам захочется посмотреть истоки этой статьи, то предлагаю Вашему внимаю ее оригинал: Реализация очереди задач Redis