1
0
mirror of https://github.com/moodle/moodle.git synced 2025-04-24 09:55:33 +02:00

MDL-34055 add bulk DB->insert_records() method

This patch was inspired by patch by Simon Coggins from Totara.
This commit is contained in:
Petr Škoda 2014-01-18 16:03:31 +08:00 committed by Petr Skoda
parent 9788e26805
commit cc5dba8e54
4 changed files with 369 additions and 2 deletions

@ -1595,6 +1595,45 @@ abstract class moodle_database {
*/
public abstract function insert_record($table, $dataobject, $returnid=true, $bulk=false);
/**
* Insert multiple records into database as fast as possible.
*
* Order of inserts is maintained, but the operation is not atomic,
* use transactions if necessary.
*
* This method is intended for inserting of large number of small objects,
* do not use for huge objects with text or binary fields.
*
* @since 2.7
*
* @param string $table The database table to be inserted into
* @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
* @return void does not return new record ids
*
* @throws coding_exception if data objects have different structure
* @throws dml_exception A DML specific exception is thrown for any errors.
*/
public function insert_records($table, $dataobjects) {
if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
throw new coding_exception('insert_records() passed non-traversable object');
}
$fields = null;
// Note: override in driver if there is a faster way.
foreach ($dataobjects as $dataobject) {
if (!is_array($dataobject) and !is_object($dataobject)) {
throw new coding_exception('insert_records() passed invalid record object');
}
$dataobject = (array)$dataobject;
if ($fields === null) {
$fields = array_keys($dataobject);
} else if ($fields !== array_keys($dataobject)) {
throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
}
$this->insert_record($table, $dataobject, false);
}
}
/**
* Import a record into a table, id field is required.
* Safety checks are NOT carried out. Lobs are supported.

@ -506,7 +506,7 @@ class mysqli_native_moodle_database extends moodle_database {
* Returns detailed information about columns in table. This information is cached internally.
* @param string $table name
* @param bool $usecache
* @return array array of database_column_info objects indexed with column names
* @return database_column_info[] array of database_column_info objects indexed with column names
*/
public function get_columns($table, $usecache=true) {
@ -1122,6 +1122,124 @@ class mysqli_native_moodle_database extends moodle_database {
return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
}
/**
* Insert multiple records into database as fast as possible.
*
* Order of inserts is maintained, but the operation is not atomic,
* use transactions if necessary.
*
* This method is intended for inserting of large number of small objects,
* do not use for huge objects with text or binary fields.
*
* @since 2.7
*
* @param string $table The database table to be inserted into
* @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
* @return void does not return new record ids
*
* @throws coding_exception if data objects have different structure
* @throws dml_exception A DML specific exception is thrown for any errors.
*/
public function insert_records($table, $dataobjects) {
if (!is_array($dataobjects) and !$dataobjects instanceof Traversable) {
throw new coding_exception('insert_records() passed non-traversable object');
}
// MySQL has a relatively small query length limit by default,
// make sure 'max_allowed_packet' in my.cnf is high enough
// if you change the following default...
static $chunksize = null;
if ($chunksize === null) {
if (!empty($this->dboptions['bulkinsertsize'])) {
$chunksize = (int)$this->dboptions['bulkinsertsize'];
} else {
if (PHP_INT_SIZE === 4) {
// Bad luck for Windows, we cannot do any maths with large numbers.
$chunksize = 5;
} else {
$sql = "SHOW VARIABLES LIKE 'max_allowed_packet'";
$this->query_start($sql, null, SQL_QUERY_AUX);
$result = $this->mysqli->query($sql);
$this->query_end($result);
$size = 0;
if ($rec = $result->fetch_assoc()) {
$size = $rec['Value'];
}
$result->close();
// Hopefully 200kb per object are enough.
$chunksize = (int)($size / 200000);
if ($chunksize > 50) {
$chunksize = 50;
}
}
}
}
$columns = $this->get_columns($table, true);
$fields = null;
$count = 0;
$chunk = array();
foreach ($dataobjects as $dataobject) {
if (!is_array($dataobject) and !is_object($dataobject)) {
throw new coding_exception('insert_records() passed invalid record object');
}
$dataobject = (array)$dataobject;
if ($fields === null) {
$fields = array_keys($dataobject);
$columns = array_intersect_key($columns, $dataobject);
unset($columns['id']);
} else if ($fields !== array_keys($dataobject)) {
throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
}
$count++;
$chunk[] = $dataobject;
if ($count === $chunksize) {
$this->insert_chunk($table, $chunk, $columns);
$chunk = array();
$count = 0;
}
}
if ($count) {
$this->insert_chunk($table, $chunk, $columns);
}
}
/**
* Insert records in chunks.
*
* Note: can be used only from insert_records().
*
* @param string $table
* @param array $chunk
* @param database_column_info[] $columns
*/
protected function insert_chunk($table, array $chunk, array $columns) {
$fieldssql = '('.implode(',', array_keys($columns)).')';
$valuessql = '('.implode(',', array_fill(0, count($columns), '?')).')';
$valuessql = implode(',', array_fill(0, count($chunk), $valuessql));
$params = array();
foreach ($chunk as $dataobject) {
foreach ($columns as $field => $column) {
$params[] = $this->normalise_value($column, $dataobject[$field]);
}
}
$sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
$rawsql = $this->emulate_bound_params($sql, $params);
$this->query_start($sql, $params, SQL_QUERY_INSERT);
$result = $this->mysqli->query($rawsql);
$this->query_end($result);
}
/**
* Import a record into a table, id field is required.
* Safety checks are NOT carried out. Lobs are supported.

@ -381,7 +381,7 @@ class pgsql_native_moodle_database extends moodle_database {
* Returns detailed information about columns in table. This information is cached internally.
* @param string $table name
* @param bool $usecache
* @return array array of database_column_info objects indexed with column names
* @return database_column_info[] array of database_column_info objects indexed with column names
*/
public function get_columns($table, $usecache=true) {
if ($usecache) {
@ -919,6 +919,108 @@ class pgsql_native_moodle_database extends moodle_database {
}
/**
* Insert multiple records into database as fast as possible.
*
* Order of inserts is maintained, but the operation is not atomic,
* use transactions if necessary.
*
* This method is intended for inserting of large number of small objects,
* do not use for huge objects with text or binary fields.
*
* @since 2.7
*
* @param string $table The database table to be inserted into
* @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
* @return void does not return new record ids
*
* @throws coding_exception if data objects have different structure
* @throws dml_exception A DML specific exception is thrown for any errors.
*/
public function insert_records($table, $dataobjects) {
if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
throw new coding_exception('insert_records() passed non-traversable object');
}
// PostgreSQL does not seem to have problems with huge queries.
$chunksize = 500;
if (!empty($this->dboptions['bulkinsertsize'])) {
$chunksize = (int)$this->dboptions['bulkinsertsize'];
}
$columns = $this->get_columns($table, true);
// Make sure there are no nasty blobs!
foreach ($columns as $column) {
if ($column->binary) {
parent::insert_records($table, $dataobjects);
return;
}
}
$fields = null;
$count = 0;
$chunk = array();
foreach ($dataobjects as $dataobject) {
if (!is_array($dataobject) and !is_object($dataobject)) {
throw new coding_exception('insert_records() passed invalid record object');
}
$dataobject = (array)$dataobject;
if ($fields === null) {
$fields = array_keys($dataobject);
$columns = array_intersect_key($columns, $dataobject);
unset($columns['id']);
} else if ($fields !== array_keys($dataobject)) {
throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
}
$count++;
$chunk[] = $dataobject;
if ($count === $chunksize) {
$this->insert_chunk($table, $chunk, $columns);
$chunk = array();
$count = 0;
}
}
if ($count) {
$this->insert_chunk($table, $chunk, $columns);
}
}
/**
* Insert records in chunks, no binary support, strict param types...
*
* Note: can be used only from insert_records().
*
* @param string $table
* @param array $chunk
* @param database_column_info[] $columns
*/
protected function insert_chunk($table, array $chunk, array $columns) {
$i = 1;
$params = array();
$values = array();
foreach ($chunk as $dataobject) {
$vals = array();
foreach ($columns as $field => $column) {
$params[] = $this->normalise_value($column, $dataobject[$field]);
$vals[] = "\$".$i++;
}
$values[] = '('.implode(',', $vals).')';
}
$fieldssql = '('.implode(',', array_keys($columns)).')';
$valuessql = implode(',', $values);
$sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
$this->query_start($sql, $params, SQL_QUERY_INSERT);
$result = pg_query_params($this->pgsql, $sql, $params);
$this->query_end($result);
pg_free_result($result);
}
/**
* Import a record into a table, id field is required.
* Safety checks are NOT carried out. Lobs are supported.

@ -2263,6 +2263,114 @@ class core_dml_testcase extends database_driver_testcase {
}
}
public function test_insert_records() {
$DB = $this->tdb;
$dbman = $DB->get_manager();
$table = $this->get_test_table();
$tablename = $table->getName();
$table->add_field('id', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, XMLDB_SEQUENCE, null);
$table->add_field('course', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, '0');
$table->add_field('oneint', XMLDB_TYPE_INTEGER, '10', null, null, null, 100);
$table->add_field('onenum', XMLDB_TYPE_NUMBER, '10,2', null, null, null, 200);
$table->add_field('onechar', XMLDB_TYPE_CHAR, '100', null, null, null, 'onestring');
$table->add_field('onetext', XMLDB_TYPE_TEXT, 'big', null, null, null);
$table->add_key('primary', XMLDB_KEY_PRIMARY, array('id'));
$dbman->create_table($table);
$this->assertCount(0, $DB->get_records($tablename));
$record = new stdClass();
$record->id = '1';
$record->course = '1';
$record->oneint = null;
$record->onenum = '1.00';
$record->onechar = 'a';
$record->onetext = 'aaa';
$expected = array();
$records = array();
for ($i = 1; $i <= 2000; $i++) { // This may take a while, it should be higher than defaults in DML drivers.
$rec = clone($record);
$rec->id = (string)$i;
$rec->oneint = (string)$i;
$expected[$i] = $rec;
$rec = clone($rec);
unset($rec->id);
$records[$i] = $rec;
}
$DB->insert_records($tablename, $records);
$stored = $DB->get_records($tablename, array(), 'id ASC');
$this->assertEquals($expected, $stored);
// Test there can be some extra properties including id.
$count = $DB->count_records($tablename);
$rec1 = (array)$record;
$rec1['xxx'] = 1;
$rec2 = (array)$record;
$rec2['xxx'] = 2;
$records = array($rec1, $rec2);
$DB->insert_records($tablename, $records);
$this->assertEquals($count + 2, $DB->count_records($tablename));
// Test not all properties are necessary.
$rec1 = (array)$record;
unset($rec1['course']);
$rec2 = (array)$record;
unset($rec2['course']);
$records = array($rec1, $rec2);
$DB->insert_records($tablename, $records);
// Make sure no changes in data object structure are tolerated.
$rec1 = (array)$record;
unset($rec1['id']);
$rec2 = (array)$record;
unset($rec2['id']);
$records = array($rec1, $rec2);
$DB->insert_records($tablename, $records);
$rec2['xx'] = '1';
$records = array($rec1, $rec2);
try {
$DB->insert_records($tablename, $records);
$this->fail('coding_exception expected when insert_records receives different object data structures');
} catch (moodle_exception $e) {
$this->assertInstanceOf('coding_exception', $e);
}
unset($rec2['xx']);
unset($rec2['course']);
$rec2['course'] = '1';
$records = array($rec1, $rec2);
try {
$DB->insert_records($tablename, $records);
$this->fail('coding_exception expected when insert_records receives different object data structures');
} catch (moodle_exception $e) {
$this->assertInstanceOf('coding_exception', $e);
}
$records = 1;
try {
$DB->insert_records($tablename, $records);
$this->fail('coding_exception expected when insert_records receives non-traversable data');
} catch (moodle_exception $e) {
$this->assertInstanceOf('coding_exception', $e);
}
$records = array(1);
try {
$DB->insert_records($tablename, $records);
$this->fail('coding_exception expected when insert_records receives non-objet record');
} catch (moodle_exception $e) {
$this->assertInstanceOf('coding_exception', $e);
}
}
public function test_import_record() {
// All the information in this test is fetched from DB by get_recordset() so we
// have such method properly tested against nulls, empties and friends...