Code

Opened 16 months ago

Closed 16 months ago

Last modified 3 months ago

#20161 closed Uncategorized (invalid)

save errors with multiproccesing

Reported by: oddghost@… Owned by: nobody
Component: Database layer (models, ORM) Version: 1.5
Severity: Normal Keywords:
Cc: Triage Stage: Unreviewed
Has patch: no Needs documentation: no
Needs tests: no Patch needs improvement: no
Easy pickings: no UI/UX: no

Description

i have an exception then trying to process big data multiproccesed

    return self.get_query_set().get(*args, **kwargs)
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/base.py", line 546, in save
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 382, in get
    force_update=force_update, update_fields=update_fields)
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/base.py", line 654, in save_base
    transaction.commit_unless_managed(using=using)
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/transaction.py", line 134, in commit_unless_managed
    num = len(clone)
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 90, in __len__
    connection.commit_unless_managed()
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 221, in commit_unless_managed
    self._result_cache = list(self.iterator())
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/query.py", line 301, in iterator
    for row in compiler.results_iter():
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 775, in results_iter
    self._commit()
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py", line 240, in _commit
    return self.connection.commit()
DatabaseError: error with no message from the libpq
    for rows in self.execute_sql(MULTI):
  File "/opt/Envs/akkord/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 854, in <lambda>
    result = iter((lambda: cursor.fetchmany(GET_ITERATOR_CHUNK_SIZE)),
ProgrammingError: no results to fetch

#coding: utf8
__author__ = 'eri'

NUMBER_OF_PROCESSES = 1


import urllib2 as urllib
from BeautifulSoup import BeautifulSoup
import re
from django.core.management.base import BaseCommand, CommandError
from akkordi.models import *
from django.db import transaction
from multiprocessing import Process, Queue, current_process,Lock
import time

class Command(BaseCommand):
    help = "parses amdm.ru"

    def worker(self,input, output,lock):

        @transaction.commit_manually
        def artist(a):
            #some logic
            for line in lines:
                #logic and some .save()
                transaction.commit()

        def make_query(p,lock):
            listhtml = urllib.urlopen(_tableurl%p)
            listsoup = BeautifulSoup(listhtml)
            table    = listsoup.find('table',width="600")
            artists  = table.findAll('a',href=_match)


            for a in artists:
                artist(a)


        for page in iter(input.get, 'STOP'):
            result = make_query(page,lock)
            output.put(result)


    def handle(self, *args, **options):

        """Runs everything"""

        pages = range(1,26)

        # Create queues
        task_queue = Queue()
        done_queue = Queue()
        lock = Lock()

        for page in pages:
            task_queue.put(page)
            #Start worker processes
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put(page)
            Process(target=self.worker, args=(task_queue, done_queue,lock)).start()

            # Get and print results
        self.stdout.write(u'Results:')
        for i in range(len(pages)):
            self.stdout.write(u'Ready №%s' % done_queue.get())


        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')
            print "Stopping Process #%s" % i

if i spawn one task - works fine, but slow

Attachments (0)

Change History (5)

comment:1 follow-up: Changed 16 months ago by aaugustin

  • Needs documentation unset
  • Needs tests unset
  • Patch needs improvement unset
  • Resolution set to invalid
  • Status changed from new to closed

Database connections don't survive forking. This isn't specific to Django.

comment:2 Changed 16 months ago by anonymous

fails on postgre
with mysql works

comment:3 in reply to: ↑ 1 Changed 16 months ago by anonymous

Replying to aaugustin:

Database connections don't survive forking. This isn't specific to Django.

Even if i set new django.db.connection.cursor for each fork?

comment:4 Changed 16 months ago by anonymous

You need to close the connection before forking.

comment:5 Changed 3 months ago by anonymous

Was there a valid work around for this? I can get my code working with 2 to 3 workers but with 3 I see the django.db.utils.DatabaseError: error with no message from the libpq error. How do you close the connection before forking?

I am doing something like this. I am assuming that the update is the culprit because when removed it works:

locations = Location.objects.all()

from multiprocessing import Pool
p = Pool(3)
p.map(process_location, locations)

def process_location(location):

... some processing
Location.objects.filter(pk=location_id).update(is_online=True)
Location.objects.filter(pk=location.id).update(last_ping=loc_reporting_date)
... other processing

Add Comment

Modify Ticket

Change Properties
<Author field>
Action
as closed
as The resolution will be set. Next status will be 'closed'
The resolution will be deleted. Next status will be 'new'
Author


E-mail address and user name can be saved in the Preferences.

 
Note: See TracTickets for help on using tickets.