Opened 3 hours ago
#36219 new Bug
Django Celery: Kombu connection refused error. OperationalError: [Errno 111] Connection refused
Reported by: | Anshul | Owned by: | |
---|---|---|---|
Component: | Error reporting | Version: | 4.2 |
Severity: | Normal | Keywords: | celery kombu |
Cc: | Triage Stage: | Unreviewed | |
Has patch: | no | Needs documentation: | no |
Needs tests: | no | Patch needs improvement: | no |
Easy pickings: | no | UI/UX: | no |
Description
It seems a bit odd, maybe I'm missing something, but whenever I send tasks to celery queue it suddenly gives error:
AttributeError: 'ChannelPromise' object has no attribute 'value'
The issue is reproducible when I call the API request twice, which triggers the async task. The error appears starting from the second request. However, if I cancel the request in between and send another one (e.g., using a tool like Postman), the issue is resolved.
Celery Settings:
CELERY_RESULT_BACKEND = 'django-db' CELERY_BROKER_URL = 'sqs://' CELERY_BROKER_TRANSPORT_OPTIONS = { 'region' : 'us-south-1', #temp name 'visibility-timeout' : 3600, 'polling-interval' : 10 } CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Kolkata'
Packages:
celery = ">=5.0.5" django = "==4.2.16" kombu = "==5.4.2" django-celery-beat = ">=2.0.0" django-celery-results = ">=2.2.0"
djangoproject/djangoproject/init.py
from .celery import app as celery_app __all__ = ['celery_app']
djangoproject/djangoproject/celery.py
import os from celery import Celery from project.settings import settings # set the default Django settings module for the 'celery' program. #settings are kept inside a separate folder for multiple envs [project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py] os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.settings') app = Celery('project') app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): pass
Complete error trace:
kombu.exceptions.OperationalError: [Errno 111] Connection refused on next try in production. complete error trace: Traceback (most recent call last): File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/exception.py", line 55, in inner response = get_response(request) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/base.py", line 197, in _get_response response = wrapped_callback(request, *callback_args, **callback_kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/django/views.py", line 94, in sentry_wrapped_callback return callback(request, *args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/decorators/csrf.py", line 56, in wrapper_view return view_func(*args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/generic/base.py", line 104, in view return self.dispatch(request, *args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch response = self.handle_exception(exc) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception self.raise_uncaught_exception(exc) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception raise exc File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch response = handler(request, *args, **kwargs) File "/var/app/current/appname/views/views_user.py", line 230, in patch my_task.delay() File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 444, in delay return self.apply_async(args, kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async return f(*args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 594, in apply_async return app.send_task( File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async return f(*args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/base.py", line 801, in send_task amqp.send_task_message(P, name, message, **options) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/amqp.py", line 518, in send_task_message ret = producer.publish( File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/utils.py", line 1788, in runner return sentry_patched_function(*args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 527, in sentry_publish return original_publish(self, *args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 186, in publish return _publish( File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 556, in _ensured return fun(*args, **kwargs) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 195, in _publish channel = self.channel File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 218, in _get_channel channel = self._channel = channel() File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/utils/functional.py", line 34, in __call__ value = self.__value__ = self.__contract__() File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 234, in <lambda> channel = ChannelPromise(lambda: connection.default_channel) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 953, in default_channel self._ensure_connection(**conn_opts) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 459, in _ensure_connection return retry_over_time( File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__ self.gen.throw(type, value, traceback) File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors raise ConnectionError(str(exc)) from exc kombu.exceptions.OperationalError: [Errno 111] Connection refused
celery-worker output
User information: uid=0 euid=0 gid=0 egid=0 warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format( -------------- celery@ip-128-232-22-11.ap-west-3.compute.internal v5.4.0 (opalescent) --- ***** ----- -- ******* ---- Linux-6.1.128-136.201.amzn2023.x86_64-x86_64-with-glibc2.34 2025-02-28 19:19:41 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: djangoproject:0x4f535e7c8v70 - ** ---------- .> transport: sqs://localhost// - ** ---------- .> results: - *** --- * --- .> concurrency: 2 (solo) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . djangoproject.celery.debug_task . djangoproject.myapp.tasks.activity [2025-02-28 19:19:41,982: WARNING/MainProcess] /var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine whether broker connection retries are made during startup in Celery 6.0 and above. If you wish to retain the existing behavior for retrying connections on startup, you should set broker_connection_retry_on_startup to True. warnings.warn( [2025-02-28 19:19:41,995: INFO/MainProcess] Found credentials in environment variables. [2025-02-28 19:19:42,109: INFO/MainProcess] Connected to sqs://localhost//
Steps tried till now:
- starting worker on a separate server, right now it is running on the same with supervisor
- simplifying the settings.py to one file(removed prod, stag,etc complexity)
- added broker url and some other params to app = Celery() in celery.py
- tried upgrading python 3.8 to 3.9
- replacing task.delay with task.apply_async
I'm thinking that it might be due to version incompatibilty but not sure about that