diff --git a/celery_haystack/__init__.py b/celery_haystack/__init__.py index c1414c0271f855673d9cf601540f44b49c67164f..1800555be7731c2b4dfe07bd94f69df5476e4e4c 100644 --- a/celery_haystack/__init__.py +++ b/celery_haystack/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.21.1' +__version__ = '2.0rc1' def version_hook(config): diff --git a/celery_haystack/signals.py b/celery_haystack/signals.py index f952f995b2056fa27f6939fe9b04a8f015f36558..79fa813da0db088a9f929d5d585f00260b1de3b9 100644 --- a/celery_haystack/signals.py +++ b/celery_haystack/signals.py @@ -2,12 +2,15 @@ from django.db.models import signals from haystack.signals import BaseSignalProcessor from haystack.exceptions import NotHandled +from haystack.utils import get_identifier -from .utils import enqueue_task +from .utils import enqueue_tasks from .indexes import CelerySearchIndex class CelerySignalProcessor(BaseSignalProcessor): + def __init__(self): + self._queue = [] def setup(self): signals.post_save.connect(self.enqueue_save) @@ -17,6 +20,8 @@ class CelerySignalProcessor(BaseSignalProcessor): signals.post_save.disconnect(self.enqueue_save) signals.post_delete.disconnect(self.enqueue_delete) + enqueue_tasks(self._queue) + def enqueue_save(self, sender, instance, **kwargs): return self.enqueue('update', instance, sender, **kwargs) @@ -41,4 +46,4 @@ class CelerySignalProcessor(BaseSignalProcessor): if isinstance(index, CelerySearchIndex): if action == 'update' and not index.should_update(instance): continue - enqueue_task(action, instance) + self._queue.append((action, get_identifier(instance))) diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index 29af2e26b6c7d4e89ab45bbbf9c1fa08a906cea0..fbfa03d67cf4eaa9a53c87095f47eceacf574a28 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -82,7 +82,12 @@ class CeleryHaystackSignalHandler(current_app.Task): raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % model_class) - def run(self, action, identifier, **kwargs): + def run(self, queue, **kwargs): + """Trigger actual index handler for a list of actions.""" + for action, identifier in queue: + self._run_one(action, identifier) + + def _run_one(self, action, identifier, **kwargs): """ Trigger the actual index handler depending on the given action ('update' or 'delete'). diff --git a/celery_haystack/utils.py b/celery_haystack/utils.py index b9f8ea0dd0c014c34af2eaa5208b61eb9339dbaa..7b6af695fca946c77e8f98addaedf2221cd4065d 100644 --- a/celery_haystack/utils.py +++ b/celery_haystack/utils.py @@ -5,8 +5,6 @@ except ImportError: from django.utils.importlib import import_module from django.db import connection, transaction -from haystack.utils import get_identifier - from .conf import settings @@ -26,12 +24,11 @@ def get_update_task(task_path=None): return task -def enqueue_task(action, instance, **kwargs): +def enqueue_tasks(queue, **kwargs): """ Common utility for enqueing a task for the given action and model instance. """ - identifier = get_identifier(instance) options = {} if settings.CELERY_HAYSTACK_QUEUE: options['queue'] = settings.CELERY_HAYSTACK_QUEUE @@ -39,7 +36,7 @@ def enqueue_task(action, instance, **kwargs): options['countdown'] = settings.CELERY_HAYSTACK_COUNTDOWN task = get_update_task() - task_func = lambda: task.apply_async((action, identifier), kwargs, **options) + task_func = lambda: task.apply_async(queue, kwargs, **options) if hasattr(transaction, 'on_commit'): # Django 1.9 on_commit hook diff --git a/docs/changelog.rst b/docs/changelog.rst index 3c7593618d0828f7c0ffde961d72bdd95f02a504..57398d9737f7927ad6071c4e2b1fda7b82430b4f 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +v2.0 (2022-01-08) +----------------- + +* The signal handler now enqueues update tasks on teardown, and the task + expects a list of (action, identifier) tuples to be passed. This ensures + that on batch updates, only one task is created on_commit instead of one + task per object. + v0.21 (2021-12-25) ------------------