#20161 closed Uncategorized (invalid)
save errors with multiproccesing
Reported by: | Owned by: | nobody | |
---|---|---|---|
Component: | Database layer (models, ORM) | Version: | 1.5 |
Severity: | Normal | Keywords: | |
Cc: | Triage Stage: | Unreviewed | |
Has patch: | no | Needs documentation: | no |
Needs tests: | no | Patch needs improvement: | no |
Easy pickings: | no | UI/UX: | no |
Description
i have an exception then trying to process big data multiproccesed
return self.get_query_set().get(*args, **kwargs) File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/base.py", line 546, in save File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 382, in get force_update=force_update, update_fields=update_fields) File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/base.py", line 654, in save_base transaction.commit_unless_managed(using=using) File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/transaction.py", line 134, in commit_unless_managed num = len(clone) File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 90, in __len__ connection.commit_unless_managed() File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 221, in commit_unless_managed self._result_cache = list(self.iterator()) File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 301, in iterator for row in compiler.results_iter(): File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 775, in results_iter self._commit() File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py", line 240, in _commit return self.connection.commit() DatabaseError: error with no message from the libpq for rows in self.execute_sql(MULTI): File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 854, in <lambda> result = iter((lambda: cursor.fetchmany(GET_ITERATOR_CHUNK_SIZE)), ProgrammingError: no results to fetch
#coding: utf8 __author__ = 'eri' NUMBER_OF_PROCESSES = 1 import urllib2 as urllib from BeautifulSoup import BeautifulSoup import re from django.core.management.base import BaseCommand, CommandError from akkordi.models import * from django.db import transaction from multiprocessing import Process, Queue, current_process,Lock import time class Command(BaseCommand): help = "parses amdm.ru" def worker(self,input, output,lock): @transaction.commit_manually def artist(a): #some logic for line in lines: #logic and some .save() transaction.commit() def make_query(p,lock): listhtml = urllib.urlopen(_tableurl%p) listsoup = BeautifulSoup(listhtml) table = listsoup.find('table',width="600") artists = table.findAll('a',href=_match) for a in artists: artist(a) for page in iter(input.get, 'STOP'): result = make_query(page,lock) output.put(result) def handle(self, *args, **options): """Runs everything""" pages = range(1,26) # Create queues task_queue = Queue() done_queue = Queue() lock = Lock() for page in pages: task_queue.put(page) #Start worker processes for i in range(NUMBER_OF_PROCESSES): task_queue.put(page) Process(target=self.worker, args=(task_queue, done_queue,lock)).start() # Get and print results self.stdout.write(u'Results:') for i in range(len(pages)): self.stdout.write(u'Ready №%s' % done_queue.get()) # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') print "Stopping Process #%s" % i
if i spawn one task - works fine, but slow
Change History (5)
follow-up: 3 comment:1 by , 12 years ago
Resolution: | → invalid |
---|---|
Status: | new → closed |
comment:3 by , 12 years ago
Replying to aaugustin:
Database connections don't survive forking. This isn't specific to Django.
Even if i set new django.db.connection.cursor for each fork?
comment:5 by , 11 years ago
Was there a valid work around for this? I can get my code working with 2 to 3 workers but with 3 I see the django.db.utils.DatabaseError: error with no message from the libpq error. How do you close the connection before forking?
I am doing something like this. I am assuming that the update is the culprit because when removed it works:
locations = Location.objects.all()
from multiprocessing import Pool
p = Pool(3)
p.map(process_location, locations)
def process_location(location):
... some processing
Location.objects.filter(pk=location_id).update(is_online=True)
Location.objects.filter(pk=location.id).update(last_ping=loc_reporting_date)
... other processing
Database connections don't survive forking. This isn't specific to Django.