1
0
mirror of https://github.com/guzzle/guzzle.git synced 2025-02-24 18:13:00 +01:00

Fixing race condition that failed to drain the queue

Adding a simple test to ensure requests are sent from a queue
Removing conditional as it will already throw an exception if the transaction is not found in the batch context
Adding a test to see if iterator functionality has changed by version
Setting error priority higher
This commit is contained in:
Michael Dowling 2014-03-09 14:46:28 -07:00
parent 69f358ae43
commit a9d93d2733
2 changed files with 41 additions and 13 deletions

View File

@ -117,15 +117,18 @@ class MultiAdapter implements AdapterInterface, ParallelAdapterInterface
if ($mrc != CURLM_OK && $mrc != CURLM_CALL_MULTI_PERFORM) {
self::throwMultiError($mrc);
}
// Need to check if there are pending transactions before processing
// them so that we don't bail from the loop too early.
$pending = $context->hasPending();
$this->processMessages($context);
if ($active && curl_multi_select($multi, $this->selectTimeout) === -1) {
// Perform a usleep if a select returns -1.
// See: https://bugs.php.net/bug.php?id=61141
usleep(250);
}
} while ($active || $context->hasPending());
} while ($active || $pending);
$this->releaseMultiHandle($context->getMultiHandle());
$this->releaseMultiHandle($multi);
}
private function processMessages(BatchContext $context)
@ -133,12 +136,11 @@ class MultiAdapter implements AdapterInterface, ParallelAdapterInterface
$multi = $context->getMultiHandle();
while ($done = curl_multi_info_read($multi)) {
if ($transaction = $context->findTransaction($done['handle'])) {
$this->processResponse($transaction, $done, $context);
// Add the next transaction if there are more in the queue
if ($next = $context->nextPending()) {
$this->addHandle($next, $context);
}
$transaction = $context->findTransaction($done['handle']);
$this->processResponse($transaction, $done, $context);
// Add the next transaction if there are more in the queue
if ($next = $context->nextPending()) {
$this->addHandle($next, $context);
}
}
}
@ -240,8 +242,9 @@ class MultiAdapter implements AdapterInterface, ParallelAdapterInterface
// Add a new handle
$handle = curl_multi_init();
$this->multiHandles[(int) $handle] = $handle;
$this->multiOwned[(int) $handle] = true;
$id = (int) $handle;
$this->multiHandles[$id] = $handle;
$this->multiOwned[$id] = true;
return $handle;
}

View File

@ -74,6 +74,31 @@ class MultiAdapterTest extends AbstractCurl
}
}
public function testSendsParallelRequestsFromQueue()
{
$c = new Client();
self::$server->flush();
self::$server->enqueue([
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n",
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n",
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n",
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"
]);
$transactions = [
new Transaction($c, new Request('GET', self::$server->getUrl())),
new Transaction($c, new Request('PUT', self::$server->getUrl())),
new Transaction($c, new Request('HEAD', self::$server->getUrl())),
new Transaction($c, new Request('GET', self::$server->getUrl()))
];
$a = new MultiAdapter(new MessageFactory());
$a->sendAll(new \ArrayIterator($transactions), 2);
foreach ($transactions as $t) {
$response = $t->getResponse();
$this->assertNotNull($response);
$this->assertEquals(200, $response->getStatusCode());
}
}
public function testCreatesAndReleasesHandlesWhenNeeded()
{
$a = new MultiAdapter(new MessageFactory());
@ -101,7 +126,7 @@ class MultiAdapterTest extends AbstractCurl
'events' => [
'headers' => function () use ($a, $c, $ef) {
$r = $c->createRequest('GET', '/', [
'events' => ['error' => $ef]
'events' => ['error' => ['fn' => $ef, 'priority' => 9999]]
]);
$r->getEmitter()->once('headers', function () use ($a, $c, $r) {
$a->send(new Transaction($c, $r));
@ -109,8 +134,8 @@ class MultiAdapterTest extends AbstractCurl
$a->send(new Transaction($c, $r));
// Now, reuse an existing handle
$a->send(new Transaction($c, $r));
},
'error' => $ef
},
'error' => ['fn' => $ef, 'priority' => 9999]
]
])));
});