How to handle processing of large file on GAE?

342
December 15, 2016, at 12:44 PM

I'm looking for a powerful and fast way to handle processing of large file in Google App Engine.

It works as the following (simplified workflow at the end):

  1. The customer send a CSV file, that our server will treat, line by line.
  2. Once the file is uploaded, an entry is added in the NDB datastore Uploads with the CSV name, file path (to Google Storage) and some basic informations. Then, a Task is created, called "pre-processing".
  3. The pre-processing task will loop over all the lines of the CSV file (could be millions) and will add a NDB entry to UploadEntries model, for each line, with the CSV id, the line, the data to extract/treat, and some indicators (boolean) on if this line has started processing, and ended processing ("is_treating", "is_done")
  4. Once the pre-processing task has ended, it updates the information to the client "XXX lines will be processed"
  5. A call to Uploads.next() is made. The next method will :
    • Search the UploadEntries that has is_treating and is_done at false,
    • Will add a task in a Redis datastore for the next line found. (The Redis datastore is used because the work here is made on servers not managed by Google)
    • Will also create a new entry in the task Process-healthcheck (This task is runned after 5 minutes and checks that 7) has been correctly executed. If not, it considers that the Redis/Outside server has failed and do the same as 7), without the result ("error" instead)).
    • Then, it updates UploadEntries.is_treating to True for that entry.
  6. The outside server will process the data, and returns the results by making a POST request to an endpoint on the server.
  7. That endpoint update the UploadEntries entry in the datastore (including "is_treating" and "is_done"), and call Uploads.next() to start the next line.
  8. In Uploads.next, when searching for the next entries returns nothing, I consider the file to be finally treated, and call the task post-process that will rebuild the CSV with the treated data, and returns it to the customer.

Here's a few things to keep in mind :

  1. The servers that does the real work are outside of Google AppEngine, that's why I had to come up with Redis.
  2. The current way of doing things give me a flexibility on the number of parallel entries to process : In the 5), the Uploads.next() methods contains a limit argument that let me search for n process in parallel. Can be 1, 5, 20, 50.
  3. I can't just add all the lines from the pre-processing task directly to Redis becase in that case, the next customer will have to wait for the first file to be finished processing, and this will pile-up to take too long

But this system has various issues, and that's why I'm turning to your help :

  1. Sometimes, this system is so fast that the Datastore is not yet updated correctly and when calling Uploads.next(), the entries returned are already being processed (it's just that entry.is_treating = True is not yet pushed to the database)
  2. The Redis or my server (I don't really know) sometime loose the task, or the POST request after the processing is not made, so the task never goes to is_done = True. That's why I had to implement a Healcheck system, to ensure the line is correctly treated no matter what. This has a double advantage : The name of that task contains the csv ID, and the line. Making it unique per file. If I the datastore is not up to date and the same task is run twice, the creation of the healthcheck will fail because the same name already exist, letting me know that there is a concurrence issue, so I ignore that task because it means the Datastore is not yet up to date.

I initiall thought about running the file in one independant process, line by line, but this has the big disadvantage of not being able to run multiple line in parallel. Moreover, Google limits the running of a task to 24h for dedicated targets (not default), and when the file is really big, it can run for longer than 24h.

For information, if it helps, I'm using Python

And to simplify the workflow, here's what I'm trying to achieve in the best way possible :

  • Process a large file, run multiple paralllel process, one per line.
  • Send the work to an outside server using Redis. Once done, that outside server returns the result via a POST request to the main server
  • The main server then update the information about that line, and goes to the next line

I'd really appreciate if someone had a better way of doing this. I really believe I'm not the first to do that kind of work and I'm pretty sure I'm not doing it correctly.

(I believe Stackoverflow is the best section of Stack Exchange to post that kind of question since it's an algorithm question, but it's also possible I didn't saw a better network for that. If so, I'm sorry about that).

Answer 1

The servers that does the real work are outside of Google AppEngine

Have you considered using Google Cloud Dataflow for processing large files instead? It is a managed service that will handle the file splitting and processing for you.

Based on initial thoughts here is an outline process:

  • User uploads files direct to google cloud storage, using signed urls or blobstore API
  • A request from AppEngine launches a small compute engine instance that initiates a blocking request (BlockingDataflowPipelineRunner) to launch the dataflow task. (I'm afraid it needs to be a compute instance because of sandbox and blocking I/O issues).
  • When the dataflow task is finished the compute engine instance is unblocked and posts a message into pubsub.
  • The pubsub message invokes a webhook on the AppEngine service that changes the tasks state from 'in progress' to 'complete' so the user can fetch their results.
READ ALSO
Density of multivariate t distribution in Python for large number of observations

Density of multivariate t distribution in Python for large number of observations

I am trying to evaluate the density of multivariate t distribution of a 13-d vectorUsing the dmvt function from the mvtnorm package in R, the result I get is

344
Pandas Dataframe grouping and standard deviation

Pandas Dataframe grouping and standard deviation

Given a pandas dataframe in the following format:

505
Python - Printing out all data reveived from a socket

Python - Printing out all data reveived from a socket

I found come old TCP reverse shell made in python online and im editing it to fix it up, when it prints the output of the command it doesnt print it all if the command is too bigThis is the code for sending the commands and then printing the output

293
Effectively turning strings into unicode for python 2.7

Effectively turning strings into unicode for python 2.7

I'm following a turtorial on LDA and encountering a problem since the turtorial is made in python 3 and I'm working in 27 (the turtorial claims to work in both)

417