MDL-67648 tasks: Fair-share scheduling with resource management

This patch changes the way adhoc tasks are chosen to run. It now calculates
how many runners each type of adhoc task should be allowed to use. In the
case that not all the runners are utilised, it attempts to infer which
tasks do not take a long time to run, and gives those to the vacant runners.

Thanks to Brendan Heywood for guidance and SQL help.
This commit is contained in:
Cameron Ball 2021-12-13 15:52:09 +08:00
parent 6c114e2a80
commit 8154aa2e03
6 changed files with 222 additions and 204 deletions

View File

@ -36,6 +36,31 @@ define('CORE_TASK_TASKS_FILENAME', 'db/tasks.php');
*/
class manager {
/**
* @var int Used to tell the adhoc task queue to fairly distribute tasks.
*/
const ADHOC_TASK_QUEUE_MODE_DISTRIBUTING = 0;
/**
* @var int Used to tell the adhoc task queue to try and fill unused capacity.
*/
const ADHOC_TASK_QUEUE_MODE_FILLING = 1;
/**
* @var array A cached queue of adhoc tasks
*/
public static $miniqueue;
/**
* @var int The last recorded number of unique adhoc tasks.
*/
public static $numtasks;
/**
* @var string Used to determine if the adhoc task queue is distributing or filling capacity.
*/
public static $mode;
/**
* Given a component name, will load the list of tasks in the db/tasks.php file for that component.
*
@ -542,8 +567,13 @@ class manager {
*
* @param array $records array of task records
* @param array $records array of same task records shuffled
* @deprecated since Moodle 4.1 MDL-67648 - please do not use this method anymore.
* @todo MDL-74843 This method will be deleted in Moodle 4.5
* @see \core\task\manager::get_next_adhoc_task
*/
public static function ensure_adhoc_task_qos(array $records): array {
debugging('The method \core\task\manager::ensure_adhoc_task_qos is deprecated.
Please use \core\task\manager::get_next_adhoc_task instead.', DEBUG_DEVELOPER);
$count = count($records);
if ($count == 0) {
@ -621,16 +651,114 @@ class manager {
public static function get_next_adhoc_task($timestart, $checklimits = true) {
global $DB;
$where = '(nextruntime IS NULL OR nextruntime < :timestart1)';
$params = array('timestart1' => $timestart);
$records = $DB->get_records_select('task_adhoc', $where, $params, 'nextruntime ASC, id ASC', '*', 0, 2000);
$records = self::ensure_adhoc_task_qos($records);
$concurrencylimit = get_config('core', 'task_adhoc_concurrency_limit');
$cachedqueuesize = 1200;
$uniquetasksinqueue = array_map(
['\core\task\manager', 'adhoc_task_from_record'],
$DB->get_records_sql(
'SELECT classname FROM {task_adhoc} WHERE nextruntime < :timestart GROUP BY classname',
['timestart' => $timestart]
)
);
if (!isset(self::$numtasks) || self::$numtasks !== count($uniquetasksinqueue)) {
self::$numtasks = count($uniquetasksinqueue);
self::$miniqueue = [];
}
$concurrencylimits = [];
if ($checklimits) {
$concurrencylimits = array_map(
function ($task) {
return $task->get_concurrency_limit();
},
$uniquetasksinqueue
);
}
/*
* The maximum number of cron runners that an individual task is allowed to use.
* For example if the concurrency limit is 20 and there are 5 unique types of tasks
* in the queue, each task should not be allowed to consume more than 3 (i.e., ⌊20/6).
* The + 1 is needed to prevent the queue from becoming full of only one type of class.
* i.e., if it wasn't there and there were 20 tasks of the same type in the queue, every
* runner would become consumed with the same (potentially long-running task) and no more
* tasks can run. This way, some resources are always available if some new types
* of tasks enter the queue.
*
* We use the short-ternary to force the value to 1 in the case when the number of tasks
* exceeds the runners (e.g., there are 8 tasks and 4 runners, ⌊4/(8+1) = 0).
*/
$slots = floor($concurrencylimit / (count($uniquetasksinqueue) + 1)) ?: 1;
if (empty(self::$miniqueue)) {
self::$mode = self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING;
self::$miniqueue = self::get_candidate_adhoc_tasks(
$timestart,
$cachedqueuesize,
$slots,
$concurrencylimits
);
}
// The query to cache tasks is expensive on big data sets, so we use this cheap
// query to get the ordering (which is the interesting part about the main query)
// We can use this information to filter the cache and also order it.
$runningtasks = $DB->get_records_sql(
'SELECT classname, COALESCE(COUNT(*), 0) running, MIN(timestarted) earliest
FROM {task_adhoc}
WHERE timestarted IS NOT NULL
AND nextruntime < :timestart
GROUP BY classname
ORDER BY running ASC, earliest DESC',
['timestart' => $timestart]
);
/*
* Each runner has a cache, so the same task can be in multiple runners' caches.
* We need to check that each task we have cached hasn't gone over its fair number
* of slots. This filtering is only applied during distributing mode as when we are
* filling capacity we intend for fast tasks to go over their slot limit.
*/
if (self::$mode === self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING) {
self::$miniqueue = array_filter(
self::$miniqueue,
function (\stdClass $task) use ($runningtasks, $slots) {
return !array_key_exists($task->classname, $runningtasks) || $runningtasks[$task->classname]->running < $slots;
}
);
}
/*
* If this happens that means each task has consumed its fair share of capacity, but there's still
* runners left over (and we are one of them). Fetch tasks without checking slot limits.
*/
if (empty(self::$miniqueue) && array_sum(array_column($runningtasks, 'running')) < $concurrencylimit) {
self::$mode = self::ADHOC_TASK_QUEUE_MODE_FILLING;
self::$miniqueue = self::get_candidate_adhoc_tasks(
$timestart,
$cachedqueuesize,
false,
$concurrencylimits
);
}
// Used below to order the cache.
$ordering = array_flip(array_keys($runningtasks));
// Order the queue so it's consistent with the ordering from the DB.
usort(
self::$miniqueue,
function ($a, $b) use ($ordering) {
return ($ordering[$a->classname] ?? -1) - ($ordering[$b->classname] ?? -1);
}
);
$cronlockfactory = \core\lock\lock_config::get_lock_factory('cron');
$skipclasses = array();
foreach ($records as $record) {
foreach (self::$miniqueue as $taskid => $record) {
if (in_array($record->classname, $skipclasses)) {
// Skip the task if it can't be started due to per-task concurrency limit.
@ -643,6 +771,7 @@ class manager {
$record = $DB->get_record('task_adhoc', array('id' => $record->id));
if (!$record) {
$lock->release();
unset(self::$miniqueue[$taskid]);
continue;
}
@ -650,6 +779,7 @@ class manager {
// Safety check in case the task in the DB does not match a real class (maybe something was uninstalled).
if (!$task) {
$lock->release();
unset(self::$miniqueue[$taskid]);
continue;
}
@ -661,6 +791,7 @@ class manager {
// Unable to obtain a concurrency lock.
mtrace("Skipping $record->classname adhoc task class as the per-task limit of $tasklimit is reached.");
$skipclasses[] = $record->classname;
unset(self::$miniqueue[$taskid]);
$lock->release();
continue;
}
@ -679,13 +810,76 @@ class manager {
} else {
$task->set_cron_lock($cronlock);
}
unset(self::$miniqueue[$taskid]);
return $task;
} else {
unset(self::$miniqueue[$taskid]);
}
}
return null;
}
/**
* Return a list of candidate adhoc tasks to run.
*
* @param int $timestart Only return tasks where nextruntime is less than this value
* @param int $limit Limit the list to this many results
* @param int|null $runmax Only return tasks that have less than this value currently running
* @param array $pertasklimits An array of classname => limit specifying how many instance of a task may be returned
* @return array Array of candidate tasks
*/
public static function get_candidate_adhoc_tasks(
int $timestart,
int $limit,
?int $runmax,
array $pertasklimits = []
): array {
global $DB;
$pertaskclauses = array_map(
function (string $class, int $limit, int $index): array {
$limitcheck = $limit > 0 ? " AND COALESCE(run.running, 0) < :running_$index" : "";
$limitparam = $limit > 0 ? ["running_$index" => $limit] : [];
return [
"sql" => "(q.classname = :classname_$index" . $limitcheck . ")",
"params" => ["classname_$index" => $class] + $limitparam
];
},
array_keys($pertasklimits),
$pertasklimits,
$pertasklimits ? range(1, count($pertasklimits)) : []
);
$pertasksql = implode(" OR ", array_column($pertaskclauses, 'sql'));
$pertaskparams = $pertaskclauses ? array_merge(...array_column($pertaskclauses, 'params')) : [];
$params = ['timestart' => $timestart] +
($runmax ? ['runmax' => $runmax] : []) +
$pertaskparams;
return $DB->get_records_sql(
"SELECT q.id, q.classname, q.timestarted, COALESCE(run.running, 0) running, run.earliest
FROM {task_adhoc} q
LEFT JOIN (
SELECT classname, COUNT(*) running, MIN(timestarted) earliest
FROM {task_adhoc} run
WHERE timestarted IS NOT NULL
GROUP BY classname
) run ON run.classname = q.classname
WHERE nextruntime < :timestart
AND q.timestarted IS NULL " .
(!empty($pertasksql) ? "AND (" . $pertasksql . ") " : "") .
($runmax ? "AND (COALESCE(run.running, 0)) < :runmax " : "") .
"ORDER BY COALESCE(run.running, 0) ASC, run.earliest DESC, q.nextruntime ASC, q.id ASC",
$params,
0,
$limit
);
}
/**
* This function will dispatch the next scheduled task in the queue. The task will be handed out
* with an open lock - possibly on the entire cron process. Make sure you call either

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" ?>
<XMLDB PATH="lib/db" VERSION="20220510" COMMENT="XMLDB file for core Moodle tables"
<XMLDB PATH="lib/db" VERSION="20220524" COMMENT="XMLDB file for core Moodle tables"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../../lib/xmldb/xmldb.xsd"
>
@ -3448,6 +3448,7 @@
</KEYS>
<INDEXES>
<INDEX NAME="nextruntime_idx" UNIQUE="false" FIELDS="nextruntime"/>
<INDEX NAME="timestarted_idx" UNIQUE="false" FIELDS="timestarted"/>
</INDEXES>
</TABLE>
<TABLE NAME="task_log" COMMENT="The log table for all tasks">

View File

@ -4506,5 +4506,20 @@ privatefiles,moodle|/user/files.php';
upgrade_main_savepoint(true, 2022051000.00);
}
if ($oldversion < 2022051900.01) {
// Define index timestarted_idx (not unique) to be added to task_adhoc.
$table = new xmldb_table('task_adhoc');
$index = new xmldb_index('timestarted_idx', XMLDB_INDEX_NOTUNIQUE, ['timestarted']);
// Conditionally launch add index timestarted_idx.
if (!$dbman->index_exists($table, $index)) {
$dbman->add_index($table, $index);
}
// Main savepoint reached.
upgrade_main_savepoint(true, 2022051900.01);
}
return true;
}

View File

@ -1,197 +0,0 @@
<?php
// This file is part of Moodle - http://moodle.org/
//
// Moodle is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Moodle is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Moodle. If not, see <http://www.gnu.org/licenses/>.
/**
* This file contains the unit tests for the task manager.
*
* @package core
* @copyright 2019 Brendan Heywood <brendan@catalyst-au.net>
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/
defined('MOODLE_INTERNAL') || die();
/**
* This file contains the unit tests for the task manager.
*
* @copyright 2019 Brendan Heywood <brendan@catalyst-au.net>
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/
class core_task_manager_testcase extends advanced_testcase {
public function test_ensure_adhoc_task_qos_provider() {
return [
[
[],
[],
],
// A queue with a lopside initial load that needs to be staggered.
[
[
(object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
[
(object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
],
// The same lopsided queue but now the first item is gone.
[
[
(object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
[
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
],
],
// The same lopsided queue but now the first two items is gone.
[
[
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
[
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
],
],
// The same lopsided queue but now the first three items are gone.
[
[
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
[
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
],
],
[
[
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
[
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
],
],
];
}
/**
* Reduces a list of tasks into a simpler string
*
* @param array $input array of tasks
* @return string list of task ids
*/
function flatten($tasks) {
$list = '';
foreach ($tasks as $id => $task) {
$list .= ' ' . $task->id;
}
return $list;
}
/**
* Test that the Quality of Service reordering works.
*
* @dataProvider test_ensure_adhoc_task_qos_provider
*
* @param array $input array of tasks
* @param array $expected array of reordered tasks
* @return void
*/
public function test_ensure_adhoc_task_qos(array $input, array $expected) {
$this->resetAfterTest();
$result = \core\task\manager::ensure_adhoc_task_qos($input);
$result = $this->flatten($result);
$expected = $this->flatten($expected);
$this->assertEquals($expected, $result);
}
}

View File

@ -1,6 +1,11 @@
This files describes API changes in core libraries and APIs,
information provided here is intended especially for developers.
=== 4.1 ===
* The method ensure_adhoc_task_qos() in lib/classes/task/manager.php has been deprecated, please use get_next_adhoc_task()
instead.
=== 4.0 ===
* To better detect wrong floats (like, for example, unformatted, using local-dependent separators ones) a number of

View File

@ -29,7 +29,7 @@
defined('MOODLE_INTERNAL') || die();
$version = 2022051900.00; // YYYYMMDD = weekly release date of this DEV branch.
$version = 2022051900.01; // YYYYMMDD = weekly release date of this DEV branch.
// RR = release increments - 00 in DEV branches.
// .XX = incremental changes.
$release = '4.1dev (Build: 20220519)'; // Human-friendly version name