Opened 3 hours ago

#36219 new Bug

Django Celery: Kombu connection refused error. OperationalError: [Errno 111] Connection refused

Reported by: Anshul Owned by:
Component: Error reporting Version: 4.2
Severity: Normal Keywords: celery kombu
Cc: Triage Stage: Unreviewed
Has patch: no Needs documentation: no
Needs tests: no Patch needs improvement: no
Easy pickings: no UI/UX: no

Description

It seems a bit odd, maybe I'm missing something, but whenever I send tasks to celery queue it suddenly gives error:

AttributeError: 'ChannelPromise' object has no attribute 'value'

The issue is reproducible when I call the API request twice, which triggers the async task. The error appears starting from the second request. However, if I cancel the request in between and send another one (e.g., using a tool like Postman), the issue is resolved.

Celery Settings:

CELERY_RESULT_BACKEND           = 'django-db'
CELERY_BROKER_URL               = 'sqs://'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region' : 'us-south-1', #temp name
    'visibility-timeout' : 3600,
    'polling-interval' : 10
}
CELERY_ACCEPT_CONTENT           = ['application/json']
CELERY_TASK_SERIALIZER          = 'json'
CELERY_RESULT_SERIALIZER        = 'json'
CELERY_TIMEZONE                 = 'Asia/Kolkata'

Packages:

celery = ">=5.0.5" 
django = "==4.2.16" 
kombu = "==5.4.2"
django-celery-beat = ">=2.0.0" 
django-celery-results = ">=2.2.0"

djangoproject/djangoproject/init.py

from .celery import app as celery_app

__all__ = ['celery_app']

djangoproject/djangoproject/celery.py

import os

from celery import Celery

from project.settings import settings

# set the default Django settings module for the 'celery' program.
#settings are kept inside a separate folder for multiple envs [project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py]
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.settings')

app = Celery('project')

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    pass

Complete error trace:

kombu.exceptions.OperationalError: [Errno 111] Connection refused on next try in production. complete error trace: Traceback (most recent call last):
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/django/views.py", line 94, in sentry_wrapped_callback
    return callback(request, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/decorators/csrf.py", line 56, in wrapper_view
    return view_func(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/generic/base.py", line 104, in view
    return self.dispatch(request, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
    response = self.handle_exception(exc)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
    raise exc
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
    response = handler(request, *args, **kwargs)
  File "/var/app/current/appname/views/views_user.py", line 230, in patch
    my_task.delay()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
    return f(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
    return f(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/base.py", line 801, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/amqp.py", line 518, in send_task_message
    ret = producer.publish(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/utils.py", line 1788, in runner
    return sentry_patched_function(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 527, in sentry_publish
    return original_publish(self, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 186, in publish
    return _publish(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 195, in _publish
    channel = self.channel
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 218, in _get_channel
    channel = self._channel = channel()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 234, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 953, in default_channel
    self._ensure_connection(**conn_opts)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
  File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 111] Connection refused

celery-worker output

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
 
 -------------- celery@ip-128-232-22-11.ap-west-3.compute.internal v5.4.0 (opalescent)
--- ***** ----- 
-- ******* ---- Linux-6.1.128-136.201.amzn2023.x86_64-x86_64-with-glibc2.34 2025-02-28 19:19:41
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         djangoproject:0x4f535e7c8v70
- ** ---------- .> transport:   sqs://localhost//
- ** ---------- .> results:     
- *** --- * --- .> concurrency: 2 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . djangoproject.celery.debug_task
  . djangoproject.myapp.tasks.activity

[2025-02-28 19:19:41,982: WARNING/MainProcess] /var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-02-28 19:19:41,995: INFO/MainProcess] Found credentials in environment variables.
[2025-02-28 19:19:42,109: INFO/MainProcess] Connected to sqs://localhost//

Steps tried till now:

  1. starting worker on a separate server, right now it is running on the same with supervisor
  2. simplifying the settings.py to one file(removed prod, stag,etc complexity)
  3. added broker url and some other params to app = Celery() in celery.py
  4. tried upgrading python 3.8 to 3.9
  5. replacing task.delay with task.apply_async

I'm thinking that it might be due to version incompatibilty but not sure about that

Change History (0)

Note: See TracTickets for help on using tickets.
Back to Top