mirror of
https://github.com/moodle/moodle.git
synced 2025-01-18 05:58:34 +01:00
MDL-67483 tasks: Improvements to adhoc task queue at very high scale
This commit is contained in:
parent
6aacd8d6d1
commit
c7ce7601a3
@ -479,7 +479,7 @@ class manager {
|
||||
* relative order of tasks queued by timestamp.
|
||||
*
|
||||
* @param array $records array of task records
|
||||
* @return void
|
||||
* @param array $records array of same task records shuffled
|
||||
*/
|
||||
public static function ensure_adhoc_task_qos(array $records): array {
|
||||
|
||||
@ -508,6 +508,7 @@ class manager {
|
||||
$qos = []; // Our new queue with ensured quality of service.
|
||||
$seed = $count % $limittotal; // Which task queue to shuffle from first?
|
||||
|
||||
$move = 1; // How many tasks to shuffle at a time.
|
||||
do {
|
||||
$shuffled = 0;
|
||||
|
||||
@ -529,11 +530,16 @@ class manager {
|
||||
$seed += 1;
|
||||
continue;
|
||||
}
|
||||
$task = array_splice($queues[$type], 0, 1);
|
||||
$qos = array_merge($qos, $task);
|
||||
$tasks = array_splice($queues[$type], 0, $move);
|
||||
$qos = array_merge($qos, $tasks);
|
||||
|
||||
// Stop if we didn't move any tasks onto the main queue.
|
||||
$shuffled += count($task);
|
||||
$shuffled += count($tasks);
|
||||
}
|
||||
// Generally the only tasks that matter are those that are near the start so
|
||||
// after we have shuffled the first few 1 by 1, start shuffling larger groups.
|
||||
if (count($qos) >= (4 * count($limits))) {
|
||||
$move *= 2;
|
||||
}
|
||||
} while ($shuffled > 0);
|
||||
|
||||
@ -550,11 +556,6 @@ class manager {
|
||||
*/
|
||||
public static function get_next_adhoc_task($timestart) {
|
||||
global $DB;
|
||||
$cronlockfactory = \core\lock\lock_config::get_lock_factory('cron');
|
||||
|
||||
if (!$cronlock = $cronlockfactory->get_lock('core_cron', 10)) {
|
||||
throw new \moodle_exception('locktimeout');
|
||||
}
|
||||
|
||||
$where = '(nextruntime IS NULL OR nextruntime < :timestart1)';
|
||||
$params = array('timestart1' => $timestart);
|
||||
@ -562,6 +563,11 @@ class manager {
|
||||
|
||||
$records = self::ensure_adhoc_task_qos($records);
|
||||
|
||||
$cronlockfactory = \core\lock\lock_config::get_lock_factory('cron');
|
||||
if (!$cronlock = $cronlockfactory->get_lock('core_cron', 10)) {
|
||||
throw new \moodle_exception('locktimeout');
|
||||
}
|
||||
|
||||
foreach ($records as $record) {
|
||||
|
||||
if ($lock = $cronlockfactory->get_lock('adhoc_' . $record->id, 0)) {
|
||||
|
@ -50,6 +50,12 @@ class core_task_manager_testcase extends advanced_testcase {
|
||||
(object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
],
|
||||
[
|
||||
(object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'],
|
||||
@ -59,8 +65,14 @@ class core_task_manager_testcase extends advanced_testcase {
|
||||
(object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'],
|
||||
(object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'],
|
||||
(object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'],
|
||||
(object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'],
|
||||
(object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
(object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'],
|
||||
],
|
||||
],
|
||||
// The same lopsided queue but now the first item is gone.
|
||||
@ -147,6 +159,20 @@ class core_task_manager_testcase extends advanced_testcase {
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces a list of tasks into a simpler string
|
||||
*
|
||||
* @param array $input array of tasks
|
||||
* @return string list of task ids
|
||||
*/
|
||||
function flatten($tasks) {
|
||||
$list = '';
|
||||
foreach ($tasks as $id => $task) {
|
||||
$list .= ' ' . $task->id;
|
||||
}
|
||||
return $list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the Quality of Service reordering works.
|
||||
*
|
||||
@ -159,6 +185,11 @@ class core_task_manager_testcase extends advanced_testcase {
|
||||
public function test_ensure_adhoc_task_qos(array $input, array $expected) {
|
||||
$this->resetAfterTest();
|
||||
$result = \core\task\manager::ensure_adhoc_task_qos($input);
|
||||
|
||||
|
||||
$result = $this->flatten($result);
|
||||
$expected = $this->flatten($expected);
|
||||
|
||||
$this->assertEquals($expected, $result);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user