Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel::DeadWorker #122

Open
ktruckenmiller opened this issue Feb 17, 2015 · 11 comments
Open

Parallel::DeadWorker #122

ktruckenmiller opened this issue Feb 17, 2015 · 11 comments

Comments

@ktruckenmiller
Copy link

Hi Everyone! I'm super new to ruby but I've found Parallel very helpful to what I'm doing. I'm trying to put some CSV files into my postgresql db and I'm running into an error. I download the csv and then manipulate them - then try to put them into postgres... I'm having that reconnect error but I'm not sure how to fix it in my code. If I move processes to 0 it works just fine. But if I'm going to be downloading multiple files it would be great to have it all done in parallel. :) Any ideas or is there something glaring that I'm just too much of a newbie for? another idea would be to do the downloads first and then run the copy_from csv to postgres without parallel.

Parallel.map(reports, :in_processes => 6) do |report|
        begin
          ActiveRecord::Base.connection.reconnect!
        rescue
          ActiveRecord::Base.connection.reconnect!
        end
        remote_filename = aggregator.url + report.filename

        fileObj = http.get(remote_filename)

        fileCSV = dir + "/" + fileObj.filename
        newCSV = fileCSV + "_updated"
        fileObj.save(fileCSV)
        CSV.open(newCSV, "wb") do |csv|
            csv << ["sales_period", "posted_date", "store_name" ,"country_of_sale" ,"artist", "release_type", "release_title", "song_title", "label", "upc", "optional_upc", "tc_song_id", "optional_isrc", "sales_type", "num_units_sold", "per_unit_price", "net_sales", "net_sales_currency", "exchange_rate", "total_earned", "currency", "report_id", "user_id"]
            CSV.foreach(
                fileCSV,
                :headers => true,
                :header_converters => lambda { |header|
                    if header == '# Units Sold'
                        "num_units_sold"
                    else
                        header.downcase.gsub(' ', '_')
                    end
                }) do |row|
                # add columns here
                row['report_id'] = report.id
                row['user_id'] = user.id
                csv << row
            end
        end

                    #cell.copy_from pushes csv to my database 
        Cell.copy_from newCSV
     end

Here's the error I get.

/Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/parallel-1.3.4/lib/parallel.rb:340:in `dump': no _dump_data is defined for class PG::Result (TypeError)

@grosser
Copy link
Owner

grosser commented Feb 18, 2015

the error you are getting means it blows up when trying to send the postgress results from 1 process to the other, can you just use .each or send something other than PG::Result ?

@ktruckenmiller
Copy link
Author

I used in_threads=> instead and it seemed to work.

I think it might have something to do with the writing of the CSV file. When I use in_processes there is a permissions problem with the file (file.is_closed() = false). But with threading it seems to be that the file itself is more globally protected and is let go of when not being used. Does that make sense or am I way off?

@grosser
Copy link
Owner

grosser commented Feb 18, 2015

do you open the file before Parallel.map ? -> the forked processes might
not have access to it

On Wed, Feb 18, 2015 at 8:30 AM, Kevin Truckenmiller <
[email protected]> wrote:

I used in_threads=> instead and it seemed to work.

I think it might have something to do with the writing of the CSV file.
When I use in_processes there is a permissions problem with the file
(file.is_closed() = false). But with threading it seems to be that the file
itself is more globally protected and is let go of when not being used.
Does that make sense or am I way off?


Reply to this email directly or view it on GitHub
#122 (comment).

@ktruckenmiller
Copy link
Author

No iit downloads the file, creates a new one based off of that file, then should save it. This all happens within the map.

Since its downloading up to 80 files or so I thought by doing a bunch at a time would be nice and speed things up

@grosser
Copy link
Owner

grosser commented Feb 18, 2015

and the files are all stored in a different location / no overwriting each
other ?

On Wed, Feb 18, 2015 at 9:18 AM, Kevin Truckenmiller <
[email protected]> wrote:

No iit downloads the file, creates a new one based off of that file, then
should save it. This all happens within the map.

Since its downloading up to 80 files or so I thought by doing a bunch at a
time would be nice and speed things up


Reply to this email directly or view it on GitHub
#122 (comment).

@ktruckenmiller
Copy link
Author

They don't overwrite each other, but they are stored in the same director. I append a different filename and then delete the downloaded file after the creation of the new .csv

@grosser
Copy link
Owner

grosser commented Feb 18, 2015

can you reproduce the error with fork { ... inner code ... } ?

On Wed, Feb 18, 2015 at 9:51 AM, Kevin Truckenmiller <
[email protected]> wrote:

They don't overwrite each other, but they are stored in the same director.
I append a different filename and then delete the downloaded file after the
creation of the new .csv


Reply to this email directly or view it on GitHub
#122 (comment).

@ktruckenmiller
Copy link
Author

Inner code:

    Parallel.map(reports, :in_processes => 5) do |report|
        #filename = dir + "/" + report.month + report.report_type + ".csv"
        remote_filename = aggregator.url + report.filename
        #agent.get(remote_filename).save(filename)
        fileObj = agent.get(remote_filename)
        fileCSV = dir + "/" + fileObj.filename
        newCSV = fileCSV + "_updated.csv"
        fileObj.save(fileCSV)
        CSV.open(newCSV, "wb") do |csv|
            csv << ["sales_period", "posted_date", "store_name" ,"country_of_sale" ,"artist", "release_type", "release_title", "song_title", "label", "upc", "optional_upc", "tc_song_id", "optional_isrc", "sales_type", "num_units_sold", "per_unit_price", "net_sales", "net_sales_currency", "exchange_rate", "total_earned", "currency", "report_id", "user_id"]
            CSV.foreach(
                fileCSV,
                :headers => true,
                :header_converters => lambda { |header|
                    if header == '# Units Sold'
                        "num_units_sold"
                    else
                        header.downcase.gsub(' ', '_')
                    end
                }) do |row|
                # add columns here
                row['report_id'] = report.id
                row['user_id'] = user.id
                csv << row
            end
        end
        File.delete(fileCSV)
        #add report - downloaded = true
        puts report.id.to_s + " We downloaded this one"
    end
            reports.map { |report|
        report.update(:downloaded =>  true)
    }

ERROR:

63 We downloaded this one
1 We downloaded this one
62 We downloaded this one
61 We downloaded this one
60 We downloaded this one
2 We downloaded this one
64 We downloaded this one
65 We downloaded this one
3 We downloaded this one
4 We downloaded this one
66 We downloaded this one
5 We downloaded this one
67 We downloaded this one
6 We downloaded this one
(1.9ms) BEGIN
PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
: BEGIN
Completed 500 Internal Server Error in 6562ms

ActiveRecord::StatementInvalid (PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
: BEGIN):
app/helpers/welcome_helper.rb:128:in block in downloadReports' app/helpers/welcome_helper.rb:127:indownloadReports'
app/controllers/welcome_controller.rb:62:in `import'

Rendered /Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_source.erb (0.6ms)
Rendered /Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_trace.erb (1.0ms)
Rendered /Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_request_and_response.erb (0.9ms)
Rendered /Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/diagnostics.erb within rescues/layout (14.0ms)

@grosser
Copy link
Owner

grosser commented Feb 21, 2015

hmm maybe keep the marking as downloaded outside of the threads, just
return the downloaded reports and then mark them there

On Thu, Feb 19, 2015 at 11:51 AM, Kevin Truckenmiller <
[email protected]> wrote:

Inner code:

Parallel.map(reports, :in_threads => 6) do |report|
    #filename = dir + "/" + report.month + report.report_type + ".csv"
    remote_filename = aggregator.url + report.filename
    #agent.get(remote_filename).save(filename)
    fileObj = agent.get(remote_filename)
    fileCSV = dir + "/" + fileObj.filename
    newCSV = fileCSV + "_updated.csv"
    fileObj.save(fileCSV)
    CSV.open(newCSV, "wb") do |csv|
        csv << ["sales_period", "posted_date", "store_name" ,"country_of_sale" ,"artist", "release_type", "release_title", "song_title", "label", "upc", "optional_upc", "tc_song_id", "optional_isrc", "sales_type", "num_units_sold", "per_unit_price", "net_sales", "net_sales_currency", "exchange_rate", "total_earned", "currency", "report_id", "user_id"]
        CSV.foreach(
            fileCSV,
            :headers => true,
            :header_converters => lambda { |header|
                if header == '# Units Sold'
                    "num_units_sold"
                else
                    header.downcase.gsub(' ', '_')
                end
            }) do |row|
            # add columns here
            row['report_id'] = report.id
            row['user_id'] = user.id
            csv << row
        end
    end
    File.delete(fileCSV)
    #add report - downloaded = true
    puts report.id.to_s + " We downloaded this one"
end
        reports.map { |report|
    report.update(:downloaded =>  true)
}

ERROR:

60 We downloaded this one
61 We downloaded this one
1 We downloaded this one
63 We downloaded this one
62 We downloaded this one
2 We downloaded this one
64 We downloaded this one
3 We downloaded this one
65 We downloaded this one
4 We downloaded this one
67 We downloaded this one
66 We downloaded this one
6 We downloaded this one
5 We downloaded this one
(0.1ms) BEGIN
(0.1ms) ROLLBACK
Completed 500 Internal Server Error in 5485ms

ActiveRecord::UnknownAttributeError (unknown attribute: download):
app/helpers/welcome_helper.rb:133:in block in downloadReports'
app/helpers/welcome_helper.rb:132:indownloadReports'
app/controllers/welcome_controller.rb:62:in `import'

Rendered
/Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_source.erb
(0.5ms)
Rendered
/Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_trace.erb
(1.0ms)
Rendered
/Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/_request_and_response.erb
(0.9ms)
Rendered
/Users/kmtruckenmiller/.rvm/gems/ruby-2.0.0-p598/gems/actionpack-4.0.13/lib/action_dispatch/middleware/templates/rescues/diagnostics.erb
within rescues/layout (13.3ms)

Then at


Reply to this email directly or view it on GitHub
#122 (comment).

@evandrodp
Copy link

Sorry my delay... but i had some problem!

And this erros is caused because the worker is waiting an answer.

Always put a return at the end. The last code above "puts ..." works like return.

@jmaheshkumar
Copy link

#186 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants