mirror of
https://github.com/guzzle/guzzle.git
synced 2025-05-01 20:28:41 +02:00
[Http] Adding a BatchQueuePlugin to make it easier to queue requests and flush the queue implicitly or explicitly
This commit is contained in:
parent
7605ef08a7
commit
d7e332bb64
118
src/Guzzle/Http/Plugin/BatchQueuePlugin.php
Normal file
118
src/Guzzle/Http/Plugin/BatchQueuePlugin.php
Normal file
@ -0,0 +1,118 @@
|
||||
<?php
|
||||
|
||||
namespace Guzzle\Http\Plugin;
|
||||
|
||||
use Guzzle\Common\Event;
|
||||
use Guzzle\Http\Message\RequestInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
|
||||
/**
|
||||
* Queues requests and sends them in parallel when a flush event is recevied.
|
||||
* You can call the flush() method on the plugin or emit a 'flush' event from
|
||||
* the client on which the plugin is attached.
|
||||
*
|
||||
* This plugin probably will not work well with plugins that implicitly
|
||||
* send requests (ExponentialBackoffPlugin, CachePlugin) or CommandSets.
|
||||
*/
|
||||
class BatchQueuePlugin implements EventSubscriberInterface, \Countable
|
||||
{
|
||||
private $autoFlushCount;
|
||||
private $queue = array();
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return array(
|
||||
'client.create_request' => 'onRequestCreate',
|
||||
'request.before_send' => 'onRequestBeforeSend',
|
||||
'flush' => 'flush'
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $autoFlushCount (optional) Set to >0 to automatically flush
|
||||
* the queue when the number of requests is > $autoFlushCount
|
||||
*/
|
||||
public function __construct($autoFlushCount = 0)
|
||||
{
|
||||
$this->autoFlushCount = $autoFlushCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a count of the requests in queue
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
public function count()
|
||||
{
|
||||
return count($this->queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a request from the queue
|
||||
*
|
||||
* @param RequestInterface $request Request to remove
|
||||
*
|
||||
* @return BatchQueuePlugin
|
||||
*/
|
||||
public function removeRequest(RequestInterface $request)
|
||||
{
|
||||
$this->queue = array_filter($this->queue, function($r) use ($request) {
|
||||
return $r !== $request;
|
||||
});
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add request to the queue
|
||||
*
|
||||
* @param Event $event
|
||||
*/
|
||||
public function onRequestCreate(Event $event)
|
||||
{
|
||||
if ($event['request']) {
|
||||
$this->queue[] = $event['request'];
|
||||
if ($this->autoFlushCount && count($this->queue) >= $this->autoFlushCount) {
|
||||
$this->flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that queued requests that get sent outside of the context
|
||||
* of the batch plugin get removed from the queue
|
||||
*
|
||||
* @param Event $event
|
||||
*/
|
||||
public function onRequestBeforeSend(Event $event)
|
||||
{
|
||||
if ($event['request']) {
|
||||
$this->removeRequest($event['request']);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the queue
|
||||
*
|
||||
* @param Event $event
|
||||
*/
|
||||
public function flush()
|
||||
{
|
||||
$multis = array();
|
||||
// Prepare each request for their respective curl multi objects
|
||||
while ($request = array_shift($this->queue)) {
|
||||
$multi = $request->getClient()->getCurlMulti();
|
||||
$multi->add($request, true);
|
||||
if (!in_array($multi, $multis)) {
|
||||
$multis[] = $multi;
|
||||
}
|
||||
}
|
||||
foreach ($multis as $multi) {
|
||||
$multi->send();
|
||||
}
|
||||
}
|
||||
}
|
194
tests/Guzzle/Tests/Http/Plugin/BatchQueuePluginTest.php
Normal file
194
tests/Guzzle/Tests/Http/Plugin/BatchQueuePluginTest.php
Normal file
@ -0,0 +1,194 @@
|
||||
<?php
|
||||
|
||||
namespace Guzzle\Tests\Http\Plugin;
|
||||
|
||||
use Guzzle\Http\Plugin\MockPlugin;
|
||||
use Guzzle\Http\Plugin\BatchQueuePlugin;
|
||||
use Guzzle\Common\Event;
|
||||
use Guzzle\Http\Client;
|
||||
use Guzzle\Http\Message\Response;
|
||||
|
||||
class BatchQueuePluginTest extends \Guzzle\Tests\GuzzleTestCase
|
||||
{
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::getSubscribedEvents
|
||||
*/
|
||||
public function testSubscribesToEvents()
|
||||
{
|
||||
$events = BatchQueuePlugin::getSubscribedEvents();
|
||||
$this->assertArrayHasKey('flush', $events);
|
||||
$this->assertArrayHasKey('client.create_request', $events);
|
||||
$this->assertArrayHasKey('request.before_send', $events);
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::count
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::onRequestCreate
|
||||
*/
|
||||
public function testAddsRequestToQueue()
|
||||
{
|
||||
$plugin = new BatchQueuePlugin();
|
||||
$this->assertEquals(0, count($plugin));
|
||||
|
||||
$client = new Client('http://test.com/');
|
||||
$request = $client->get('/');
|
||||
|
||||
$event = new Event(array(
|
||||
'request' => $request
|
||||
));
|
||||
$plugin->onRequestCreate($event);
|
||||
|
||||
$this->assertEquals(1, count($plugin));
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::removeRequest
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::onRequestBeforeSend
|
||||
*/
|
||||
public function testRemovesRequestsWhenTheyAreSentOutOfContext()
|
||||
{
|
||||
$plugin = new BatchQueuePlugin();
|
||||
$client = new Client('http://test.com/');
|
||||
|
||||
// Create an event to use for our notifications
|
||||
$event = new Event(array(
|
||||
'request' => $client->get('/')
|
||||
));
|
||||
|
||||
// Add a request to the queue
|
||||
$plugin->onRequestCreate($event);
|
||||
$this->assertEquals(1, count($plugin));
|
||||
|
||||
// Fake that the request is being sent outside of the queue
|
||||
$plugin->onRequestBeforeSend($event);
|
||||
// Ensure that the request is no longer queued
|
||||
$this->assertEquals(0, count($plugin));
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::flush
|
||||
*/
|
||||
public function testFlushSendsRequestsInQueue()
|
||||
{
|
||||
$this->getServer()->flush();
|
||||
$plugin = new BatchQueuePlugin();
|
||||
$client = new Client($this->getServer()->getUrl());
|
||||
|
||||
// Create some test requests
|
||||
$requests = array(
|
||||
$client->get('/'),
|
||||
$client->get('/')
|
||||
);
|
||||
|
||||
// Add the requests to the batch queue
|
||||
foreach ($requests as $request) {
|
||||
$plugin->onRequestCreate(new Event(array(
|
||||
'request' => $request
|
||||
)));
|
||||
$responses[] = new Response(200);
|
||||
}
|
||||
|
||||
// Queue the test responses on node.js
|
||||
$this->getServer()->enqueue($responses);
|
||||
|
||||
// Explicitly call flush to send the queued requests
|
||||
$plugin->flush();
|
||||
$this->assertEquals(count($requests), count($this->getServer()->getReceivedRequests()));
|
||||
$this->assertEquals(0, count($plugin));
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::__construct
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::onRequestCreate
|
||||
*/
|
||||
public function testImplicitlyFlushesRequests()
|
||||
{
|
||||
$this->getServer()->flush();
|
||||
$plugin = new BatchQueuePlugin(2);
|
||||
$client = new Client($this->getServer()->getUrl());
|
||||
|
||||
$this->getServer()->enqueue(array(
|
||||
new Response(200),
|
||||
new Response(200),
|
||||
new Response(200)
|
||||
));
|
||||
|
||||
$plugin->onRequestCreate(new Event(array(
|
||||
'request' => $client->get('/')
|
||||
)));
|
||||
|
||||
$plugin->onRequestCreate(new Event(array(
|
||||
'request' => $client->get('/')
|
||||
)));
|
||||
|
||||
$this->assertEquals(0, count($plugin));
|
||||
$this->assertEquals(2, count($this->getServer()->getReceivedRequests()));
|
||||
|
||||
$plugin->onRequestCreate(new Event(array(
|
||||
'request' => $client->get('/')
|
||||
)));
|
||||
|
||||
$this->assertEquals(1, count($plugin));
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::onRequestCreate
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::onRequestBeforeSend
|
||||
*/
|
||||
public function testWorksUsingEvents()
|
||||
{
|
||||
// Queue up some test responses
|
||||
$this->getServer()->flush();
|
||||
$this->getServer()->enqueue(array(
|
||||
new Response(200),
|
||||
new Response(200),
|
||||
new Response(200)
|
||||
));
|
||||
|
||||
$plugin = new BatchQueuePlugin(2);
|
||||
$client = new Client($this->getServer()->getUrl());
|
||||
$client->getEventDispatcher()->addSubscriber($plugin);
|
||||
|
||||
$client->get('/');
|
||||
$client->get('/');
|
||||
// Ensure that the requests were sent implicitly
|
||||
$this->assertEquals(0, count($plugin));
|
||||
$this->assertEquals(2, count($this->getServer()->getReceivedRequests()));
|
||||
|
||||
// Add a single request and ensure that it is in queue and not sent
|
||||
$client->get('/');
|
||||
$this->assertEquals(1, count($plugin));
|
||||
$this->assertEquals(2, count($this->getServer()->getReceivedRequests()));
|
||||
|
||||
// Explicitly flush the queued requests
|
||||
$client->dispatch('flush');
|
||||
$this->assertEquals(0, count($plugin));
|
||||
$this->assertEquals(3, count($this->getServer()->getReceivedRequests()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers Guzzle\Http\Plugin\BatchQueuePlugin::flush
|
||||
*/
|
||||
public function testWorksWithMockResponses()
|
||||
{
|
||||
$this->getServer()->flush();
|
||||
$mock = new MockPlugin(array(
|
||||
new Response(200),
|
||||
new Response(201)
|
||||
));
|
||||
|
||||
$plugin = new BatchQueuePlugin();
|
||||
$client = new Client($this->getServer()->getUrl());
|
||||
$client->getEventDispatcher()->addSubscriber($plugin);
|
||||
$client->getEventDispatcher()->addSubscriber($mock);
|
||||
|
||||
$request1 = $client->get('/');
|
||||
$request2 = $client->get('/');
|
||||
$plugin->flush();
|
||||
$this->assertEquals(0, count($plugin));
|
||||
$this->assertEquals(0, count($this->getServer()->getReceivedRequests()));
|
||||
|
||||
$this->assertEquals(200, $request1->getResponse()->getStatusCode());
|
||||
$this->assertEquals(201, $request2->getResponse()->getStatusCode());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user