Skip to content
Snippets Groups Projects
Verified Commit 7f7cf9ce authored by Nik | Klampfradler's avatar Nik | Klampfradler
Browse files

Enqueue all updates as one task

parent 02d06d14
No related branches found
No related tags found
No related merge requests found
__version__ = '0.21.1'
__version__ = '2.0rc1'
def version_hook(config):
......
......@@ -2,12 +2,15 @@ from django.db.models import signals
from haystack.signals import BaseSignalProcessor
from haystack.exceptions import NotHandled
from haystack.utils import get_identifier
from .utils import enqueue_task
from .utils import enqueue_tasks
from .indexes import CelerySearchIndex
class CelerySignalProcessor(BaseSignalProcessor):
def __init__(self):
self._queue = []
def setup(self):
signals.post_save.connect(self.enqueue_save)
......@@ -17,6 +20,8 @@ class CelerySignalProcessor(BaseSignalProcessor):
signals.post_save.disconnect(self.enqueue_save)
signals.post_delete.disconnect(self.enqueue_delete)
enqueue_tasks(self._queue)
def enqueue_save(self, sender, instance, **kwargs):
return self.enqueue('update', instance, sender, **kwargs)
......@@ -41,4 +46,4 @@ class CelerySignalProcessor(BaseSignalProcessor):
if isinstance(index, CelerySearchIndex):
if action == 'update' and not index.should_update(instance):
continue
enqueue_task(action, instance)
self._queue.append((action, get_identifier(instance)))
......@@ -82,7 +82,12 @@ class CeleryHaystackSignalHandler(current_app.Task):
raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." %
model_class)
def run(self, action, identifier, **kwargs):
def run(self, queue, **kwargs):
"""Trigger actual index handler for a list of actions."""
for action, identifier in queue:
self._run_one(action, identifier)
def _run_one(self, action, identifier, **kwargs):
"""
Trigger the actual index handler depending on the
given action ('update' or 'delete').
......
......@@ -5,8 +5,6 @@ except ImportError:
from django.utils.importlib import import_module
from django.db import connection, transaction
from haystack.utils import get_identifier
from .conf import settings
......@@ -26,12 +24,11 @@ def get_update_task(task_path=None):
return task
def enqueue_task(action, instance, **kwargs):
def enqueue_tasks(queue, **kwargs):
"""
Common utility for enqueing a task for the given action and
model instance.
"""
identifier = get_identifier(instance)
options = {}
if settings.CELERY_HAYSTACK_QUEUE:
options['queue'] = settings.CELERY_HAYSTACK_QUEUE
......@@ -39,7 +36,7 @@ def enqueue_task(action, instance, **kwargs):
options['countdown'] = settings.CELERY_HAYSTACK_COUNTDOWN
task = get_update_task()
task_func = lambda: task.apply_async((action, identifier), kwargs, **options)
task_func = lambda: task.apply_async(queue, kwargs, **options)
if hasattr(transaction, 'on_commit'):
# Django 1.9 on_commit hook
......
Changelog
=========
v2.0 (2022-01-08)
-----------------
* The signal handler now enqueues update tasks on teardown, and the task
expects a list of (action, identifier) tuples to be passed. This ensures
that on batch updates, only one task is created on_commit instead of one
task per object.
v0.21 (2021-12-25)
------------------
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment