Uploaded image for project: 'Moodle'
  1. Moodle
  2. MDL-58279

Extend task API to allow the same tasks to run in parallel with itself





      I'm seeing an increasing trend across many cron tasks at large scale, in particular the forum task, the unoconv task, the search index tasks etc where large chunky processes are happening and in many cases are not keeping up with the addition of new work into the various queues. Other plugin examples we've worked on include the tool_objectfs with file migration into s3, and the tool_crawler constantly re-indexing a moodle looking for link issues.

      So for instance lag between forum comments being posted and actually being sent, or the delay between a document being submitted and it being sent off to being converted is way too long.

      The root cause is that tasks are inherently serial in moodle, but there is no real reason why this always needs to be the case. As long as a task is aware and handles things correctly there is no reason why there could not be say 4 unoconvs processing the queue at the same time. For forums the bottle neck is processing the emails, postfix could handle several orders of magnitude more email throughput. Unoconv could also handle way more throughput. This is generally the case for any external service and it is always the architecture of the task api which is the performance bottle neck.

      So what I am proposing is that each task when declared can opt in and say that it is 'parallel ready'. If that is the case then in the scheduled tasks gui (or forced in config.php) you can set an optional limit (would default to 1) to the number of parallel tasks of that instance which are running which can be tuned on the fly as required.

      When the task system fires of tasks it would pass in two numbers like '3 of 16', and the task can optionally use these to help divide up the queue. ie it could do something like find all id's whose modulo 16 == 3. It would choose whatever way is most appropriate to cut up it's particular work load. Or it can ignore it and have it's own method for sharing the work among the processes, like just popping the next off it's own an internal queue and locking it.

      The task system would guarantee that each cron tick each of the X parallel tasks would at least be attempted to start, but they may already be running.

      Internally the task locks would be something like '\mod_forum\task\cron_task-3' and each process would have it's own lock.

      There are also operational side benefits to this including:

      • lots of smaller tasks means that if a task explodes and important intermediate state is corrupted or lost then the damage is limited to a smaller subset (eg less emails lost)
      • when a release happens we need to turn off cron and wait for existing tasks to complete before running the upgrade. This waiting period isn't currently very deterministic, so we either need to turn cron off much earlier and so have a period of lower or zero throughput prior to a release, or sit and wait for cron to complete which means the actual outage timing isn't know perfectly in advance. By cutting up longer tasks into many smaller fast tasks we know we have to wait at most a couple minutes vs hours. The release process is then much more responsive and closer to the utopia of a continuous deployment regime while at the same time maintaining high throughput of background processes. See also MDL-57852 which introduces a 'fast shutdown' feature to cron 


      Design goals of any implementation:

      1. Must be able to scale more or less arbitrarily
      2. Must be able to scale up easily while under load
      3. The burden on a task developer to safely use this API must be very low
      4. Scaling down while under load is less important


      Proposed sharding concept / algorithm:

      1. Each task declares whether or not is supports a concept of sharding in db/tasks.php as a new optional attribute in the array
      2. In the task tool gui you can set a sharding level N, which will create 2 ^ N shards. We use powers of 2 so that every superset shard is a clean combination of mutually exclusive smaller shards. This satisfies design goal 1)
      3. The task table stores the current level of sharding as well as the desired shard level specified above
      4. In the task loop, if it comes across a task which is shardable, then it will 'shard' the task record into pieces and create new sub records to track lastruntime / nextruntime /faildelay for each shard.  Whether this is a new table or mixed into the existing table isn't too important.
      5. Whenever you attempt to execute a shard, you must first gain the cron lock (as normal) and then also try and gain the lock(s) for the larger subset shards. If and only if you can do this then you can proceed to run each smaller shard. For example if a shard level was 0 (ie the default single task) and then was ramped up to 1 (ie 2 shards) then you first gain the normal task lock (eg "forum_task"), if you have gained it then you can then proceed to try and gain the shard locks at the desired level. (eg "forum_task-1" or "forum_task2") and if you gain that then you execute it.
      6. If in the steps above the 'current' shard level is below the 'desired' level, and you gained all the superset locks then you can safely set the current level to desired'. This is a successful scale up.
      7. If the desired level is many steps up then you need to do this at each level between the current and desired level. Lets suppose you are currently on shard level 2 (ie there are 4 shards) and you set it to 4 (so there will eventually be 16 shards). The task loop initially would have run "forum_task11" then "forum_task-12" then "-21" ten "-22", but now will be attempting to run 1111 - 2222. Lets suppose that task 11 and 12 are still running, but 21 and 22 have completed. In this cron run, 11 & 12 will be skipped, but 21 and 22 can immediately be sharded into a total of 8 smaller shards 2111 - 2122, and 2211 - 2222 respectively. We do not have to wait until all of the other shards are complete before we scale up. This satisfies design goal 1.
      8. If we are scaling down on the other hand, then we do need to wait until all smaller shards are complete before we can "heal" a collection of shards into a bigger shard. If we are in this situation then in the task loop we first need to gain the cron lock as normal, and then the task locks for all sub-shards, and if and only if we gain all of them, can we combined the shard records together into a larger shard (we set lastruntime & nextruntime to be the minimum of the various timestamps, and faildelay to the maximum of the sub records). While we are in this state we will not execute any new sub-shards either, we must wait and let them all finish and synchronize and then merge them. When we successfully have gained all the sub-shard locks then we can safely modify the 'current' shard level. Note that if we attempt to go from say 64 shards to 8 shards, in one step there will be a major loss of throughput while we downscale, as many shards will simply not be running at all until the last shard finishes. To mitigate this we only ever downscale one shard level at a time, and at most we only need to synchronize 2 sub-shards in order to merge them into a larger sub shard.
      9. The execute() function will be extended to support two arguments execute($shard, $shards) and $shards will be 2 ^ N. Because of the locking strategy above the api guarantees that if you can cleanly partition you're work in a binary bisect fashion then you never need to worry about locking or concurrency in your task.


      Take the unoconv task which is fairly typical:


      public function execute() {
          global $CFG, $DB;
          require_once($CFG->dirroot . '/mod/assign/locallib.php');
          $records = $DB->get_records('assignfeedback_editpdf_queue');

      now with only 2 changed lines of code we now fully support parallel processing:

      public function execute($shard, $shards) {
          global $CFG, $DB;
          require_once($CFG->dirroot . '/mod/assign/locallib.php');
          $records = $DB->get_records_sql('SELECT * from {assignfeedback_editpdf_queue} WHERE ? = ' . $DB->sql_modulo('id', '?'), array($shard, $shards) );

      That's it!

      Because of the sharding strategy chosen above the module arithmetic will always ensure that as the number of shards is scaled up then each shard will also be mutually exclusive to all other shards regardless of size. ie "1 mod 2", is exactly equivalent to "1 mod 4" and "2 mod 4" combined.

      For the majority of cron tasks which are working from a single 1 dimensional queue, partitioning the queue in this way is very simple. So typically the burden on task developers to safely support parallelism is incredible low, so this satisfies design goal 3. Tasks of this type are "embarrassingly parallel" and will scale up extremely well. The only part of the task which will be a fixed cost is the initial DB call itself, so Amdahl's law will give us a very high ceiling on the level of scaling we can do.

      For some types of task, maybe something to do with processing multiple things inside a course, it might be inefficient to lookup some course data in lots of places, or invalidate a course cache multiple times. So instead it could make more sense for this task to group and shard all the tasks by courseid giving a slightly coarser grain of processing, but with a higher level of efficient as it would only load and invalidate course level info once. It is completely up the task to decide how to partition the workload as long the there are 2^n shards which are mutually exclusive and nothing is missed.

      For odd balls tasks like the forum which have a 2 dimensional workload, or higher, more work and thought is required. But as long as the workload can be partitioned in a way that satisfies the same binary bisection model of the module arithmetic then we are good. In the case of the forum task, we can at minimum split it into two tasks, one for digest which is still hard, but the second one for normal emails is embarrassingly parallel and so we can ramp it up massively.

      From an operational point of view, in really big systems, the number of cron processes could be allowed to auto scale to meet demand. As the duration of tasks goes up then the level or task sharding for those particular tasks would be increased so more run in parallel. As the load on the cron boxes goes up then more cron boxes are auto scaled out horizontally. Tuning these two numbers, the level of sharding and the number of cron boxes, means you can always maintain a fairly deterministic limit to how long any particular task will run.

      There isn't a massive cost to having too many shards. If they execute quickly then they will naturally fall back to being executed in a more serial fashion. Once you see multiple shards all being executed really quickly, perhaps within the same cron tick, then you know it's time to scale back down.





            brendanheywood Brendan Heywood
            brendanheywood Brendan Heywood
            Amaia Anabitarte, Carlos Escobedo, Ferran Recio, Ilya Tregubov, Laurent David, Sara Arjona (@sarjona)
            14 Vote for this issue
            22 Start watching this issue