1
0
mirror of https://github.com/guzzle/guzzle.git synced 2025-02-24 18:13:00 +01:00

Changing Adapters to send a single request

This commit is contained in:
Michael Dowling 2013-08-29 20:52:02 -07:00
parent f9e7750f6c
commit cd80605b20
14 changed files with 102 additions and 229 deletions

View File

@ -14,7 +14,7 @@ class Event extends SymfonyEvent implements ToArrayInterface, \ArrayAccess, \Ite
/**
* @param array $context Contextual information
*/
public function __construct(array $context = array())
public function __construct(array $context = [])
{
$this->data = $context;
}

View File

@ -8,7 +8,7 @@ namespace Guzzle\Http\Adapter;
interface AdapterInterface
{
/**
* Transfers one or more HTTP requests and populates responses
* Transfers an HTTP request and populates a response
*
* @param Transaction $transaction Transaction abject to populate
*

View File

@ -1,46 +0,0 @@
<?php
namespace Guzzle\Http\Adapter;
use Guzzle\Http\Adapter\Transaction;
/**
* Adapter that does not attempt to send more than a certain number of requests
* in parallel
*/
class BufferedAdapter implements AdapterInterface
{
private $adapter;
private $max;
/**
* @param AdapterInterface $adapter Adapter used for sending requests
* @param int $max Maximum number of requests per batch
*/
public function __construct(AdapterInterface $adapter, $max = 20)
{
$this->adapter = $adapter;
$this->max = $max;
}
public function send(Transaction $transaction)
{
$bufferCount = 0;
$buffer = new Transaction($transaction->getClient());
foreach ($transaction as $request) {
$buffer[$request] = $transaction[$request];
if (++$bufferCount >= $this->max) {
$transaction->addAll($this->adapter->send($buffer));
$buffer = new Transaction($transaction->getClient());
$bufferCount = 0;
}
}
if ($bufferCount > 0) {
$transaction->addAll($this->adapter->send($buffer));
}
return $transaction;
}
}

View File

@ -17,39 +17,34 @@ class StreamAdapter implements AdapterInterface
{
public function send(Transaction $transaction)
{
foreach ($transaction as $request) {
try {
$this->createResponse($request, $transaction[$request]);
$request->getEventDispatcher()->dispatch(
'request.after_send',
new RequestAfterSendEvent($request, $transaction)
);
} catch (RequestException $e) {
$transaction[$request] = $e;
$request->getEventDispatcher()->dispatch(
'request.error',
new RequestErrorEvent($request, $transaction)
);
try {
$this->createResponse($transaction);
$transaction->getRequest()->getEventDispatcher()->dispatch(
'request.after_send',
new RequestAfterSendEvent($transaction)
);
} catch (RequestException $e) {
if (!$transaction->getRequest()->getEventDispatcher()->dispatch(
'request.error',
new RequestErrorEvent($transaction, $e)
)->isPropagationStopped()) {
throw $e;
}
}
return $transaction;
return $transaction->getResponse();
}
/**
* @param RequestInterface $request Request to send
* @param ResponseInterface $response Response to populate
* @param Transaction
* @throws \LogicException if you attempt to stream and specify a write_to option
*/
private function createResponse(RequestInterface $request, ResponseInterface $response)
private function createResponse(Transaction $transaction)
{
$request = $transaction->getRequest();
$stream = $this->createStream($request, $http_response_header);
// Track the response headers of the request
if (isset($http_response_header)) {
$this->processResponseHeaders($http_response_header, $response);
}
$response = $this->createResponseObject($http_response_header, $transaction, $stream);
$request->dispatch(RequestEvents::GOT_HEADERS, ['request' => $request, 'response' => $response]);
if ($request->getConfig()['stream']) {
@ -92,17 +87,25 @@ class StreamAdapter implements AdapterInterface
$response->setBody($saveTo);
}
private function processResponseHeaders($headers, ResponseInterface $response)
private function createResponseObject($headers, Transaction $transaction, $stream)
{
$parts = explode(' ', array_shift($headers), 3);
$response->setProtocolVersion(substr($parts[0], -3));
$response->setStatus($parts[1], isset($parts[2]) ? $parts[2] : null);
$options = ['protocol_version' => substr($parts[0], -3)];
if (isset($parts[2])) {
$options['reason_phrase'] = $parts[2];
}
// Set the size on the stream if it was returned in the response
$responseHeaders = [];
foreach ($headers as $header) {
$parts = explode(':', $header, 2);
$response->addHeader($parts[0], isset($parts[1]) ? $parts[1] : '');
$headerParts = explode(':', $header, 2);
$responseHeaders[$headerParts[0]] = isset($headerParts[1]) ? $headerParts[1] : '';
}
$response = $transaction->getMessageFactory()->createResponse($parts[1], $responseHeaders, $stream, $options);
$transaction->setResponse($response);
return $response;
}
/**

View File

@ -3,7 +3,7 @@
namespace Guzzle\Http\Adapter;
/**
* Sends all streaming requests to a streaming compatible adapter while sending all other requests to a default
* Sends streaming requests to a streaming compatible adapter while sending all other requests to a default
* adapter. This, for example, could be useful for taking advantage of the performance benefits of the CurlAdapter
* while still supporing true streaming through the StreamAdapter.
*/
@ -24,35 +24,8 @@ class StreamingProxyAdapter implements AdapterInterface
public function send(Transaction $transaction)
{
$streaming = $default = array();
foreach ($transaction as $request) {
if ($request->getConfig()['stream']) {
$streaming[] = $request;
} else {
$default[] = $request;
}
}
if (!$streaming) {
return $this->defaultAdapter->send($transaction);
}
$streamingTransaction = new Transaction($transaction->getClient());
foreach ($streaming as $request) {
$streamingTransaction[$request] = $transaction[$request];
}
$this->streamingAdapter->send($streamingTransaction);
if ($default) {
$defaultTransaction = new Transaction($transaction->getClient());
foreach ($default as $request) {
$defaultTransaction[$request] = $transaction[$request];
}
$streamingTransaction->addAll($this->defaultAdapter->send($defaultTransaction));
}
return $streamingTransaction;
return $transaction->getRequest()->getConfig()['stream']
? $this->streamingAdapter->send($transaction)
: $this->defaultAdapter->send($transaction);
}
}

View File

@ -6,26 +6,22 @@ use Guzzle\Http\ClientInterface;
use Guzzle\Http\Message\MessageFactoryInterface;
use Guzzle\Http\Message\RequestInterface;
use Guzzle\Http\Message\ResponseInterface;
use Guzzle\Stream\StreamInterface;
class Transaction
{
/** @var ClientInterface */
private $client;
/** @var RequestInterface */
private $request;
/** @var ResponseInterface */
private $response;
/** @var MessageFactoryInterface */
private $messageFactory;
/**
* @param ClientInterface $client Client that is used to send the requests
* @param RequestInterface $request
* @param MessageFactoryInterface $messageFactory
* @param MessageFactoryInterface $messageFactory Message factory used with the Transaction
*/
public function __construct(
ClientInterface $client,
@ -56,14 +52,11 @@ class Transaction
/**
* Set a response on the transaction
*
* @param string $statusCode HTTP response status code
* @param string $reasonPhrase Response reason phrase
* @param array $headers Headers of the response
* @param StreamInterface $body Response body
* @param ResponseInterface $response Response to set
*/
public function setResponse($statusCode, $reasonPhrase, array $headers, StreamInterface $body)
public function setResponse(ResponseInterface $response)
{
$this->response = $this->messageFactory->createResponse($statusCode, $reasonPhrase, $headers, $body);
$this->response = $response;
}
/**
@ -73,4 +66,12 @@ class Transaction
{
return $this->client;
}
/**
* @return MessageFactoryInterface
*/
public function getMessageFactory()
{
return $this->messageFactory;
}
}

View File

@ -188,48 +188,19 @@ class Client implements ClientInterface
public function send(RequestInterface $request)
{
$transaction = new Transaction($this);
if (!$this->preSend($request, $transaction)->isPropagationStopped()) {
$transaction[$request] = $this->messageFactory->createResponse();
$transaction = new Transaction($this, $request, $this->messageFactory);
if (!$request->getEventDispatcher()->dispatch(
'request.before_send',
new RequestBeforeSendEvent($transaction)
)->isPropagationStopped()) {
$this->adapter->send($transaction);
}
if ($transaction[$request] instanceof \Exception) {
throw $transaction[$request];
}
$response = $transaction->getResponse();
$this->addEffectiveUrl($request, $response);
$this->addEffectiveUrl($request, $transaction[$request]);
return $transaction[$request];
}
public function batch(array $requests)
{
$transaction = new Transaction($this);
$intercepted = new Transaction($this);
foreach ($requests as $request) {
if ($this->preSend($request, $transaction)->isPropagationStopped()) {
$this->addEffectiveUrl($request, $transaction[$request]);
$intercepted[$request] = $transaction[$request];
unset($transaction[$request]);
} else {
$transaction[$request] = $this->messageFactory->createResponse();
$this->addEffectiveUrl($request, $transaction[$request]);
}
}
if (count($transaction)) {
$this->adapter->send($transaction);
}
$transaction->addAll($intercepted);
if ($transaction->hasExceptions()) {
throw new BatchException($transaction);
}
return $transaction;
return $response;
}
/**
@ -259,22 +230,6 @@ class Client implements ClientInterface
}
}
/**
* Emits a request.before_send event, prepares for sending over the wire, and emits a request.prepared event
*
* @param RequestInterface $request Request about to be sent
* @param Transaction $transaction Transaction
*
* @return RequestBeforeSendEvent
*/
private function preSend(RequestInterface $request, Transaction $transaction)
{
return $request->getEventDispatcher()->dispatch(
'request.before_send',
new RequestBeforeSendEvent($request, $transaction)
);
}
/**
* Expand a URI template
*

View File

@ -4,9 +4,7 @@ namespace Guzzle\Http;
use Guzzle\Common\Collection;
use Guzzle\Http\Exception\AdapterException;
use Guzzle\Http\Exception\BatchException;
use Guzzle\Common\HasDispatcherInterface;
use Guzzle\Http\Adapter\Transaction;
use Guzzle\Http\Message\RequestInterface;
use Guzzle\Http\Message\ResponseInterface;
use Guzzle\Stream\StreamInterface;
@ -113,16 +111,6 @@ interface ClientInterface extends HasDispatcherInterface
*/
public function send(RequestInterface $request);
/**
* Send one or more requests in parallel
*
* @param array $requests RequestInterface objects to send
*
* @return Transaction Returns a hash map object of request to response objects
* @throws BatchException
*/
public function batch(array $requests);
/**
* Get the client's base URL
*

View File

@ -5,25 +5,21 @@ namespace Guzzle\Http\Event;
use Guzzle\Common\Event;
use Guzzle\Http\Adapter\Transaction;
use Guzzle\Http\ClientInterface;
use Guzzle\Http\Exception\RequestException;
use Guzzle\Http\Message\RequestInterface;
abstract class AbstractRequestEvent extends Event
{
/** @var Transaction */
protected $transaction;
/** @var RequestInterface $request */
private $request;
private $transaction;
/**
* @param RequestInterface $request
* @param Transaction $transaction Transaction that contains the request
* @param Transaction $transaction Transaction that contains the request
*/
public function __construct(RequestInterface $request, Transaction $transaction)
public function __construct(Transaction $transaction)
{
parent::__construct();
$this->transaction = $transaction;
$this->request = $request;
}
/**
@ -43,17 +39,25 @@ abstract class AbstractRequestEvent extends Event
*/
public function getRequest()
{
return $this->request;
return $this->transaction->getRequest();
}
/**
* @return Transaction
*/
protected function getTransaction()
{
return $this->transaction;
}
/**
* Emit an error event
*/
protected function emitError()
protected function emitError(RequestException $exception)
{
$this->request->getEventDispatcher()->dispatch(
$this->transaction->getRequest()->getEventDispatcher()->dispatch(
'request.error',
new RequestErrorEvent($this->request, $this->transaction)
new RequestErrorEvent($this->transaction, $exception)
);
}
@ -62,9 +66,9 @@ abstract class AbstractRequestEvent extends Event
*/
protected function emitAfterSend()
{
$this->request->getEventDispatcher()->dispatch(
$this->transaction->getRequest()->getEventDispatcher()->dispatch(
'request.after_send',
new RequestAfterSendEvent($this->request, $this->transaction)
new RequestAfterSendEvent($this->transaction)
);
}
}

View File

@ -19,11 +19,11 @@ class RequestAfterSendEvent extends AbstractRequestEvent
*/
public function intercept($result)
{
$this->transaction[$this->getRequest()] = $result;
$this->stopPropagation();
if ($result instanceof RequestException) {
$this->emitError();
$this->emitError($result);
} else {
$this->getTransaction()->setResponse($result);
$this->stopPropagation();
}
}
@ -34,6 +34,6 @@ class RequestAfterSendEvent extends AbstractRequestEvent
*/
public function getResponse()
{
return $this->transaction[$this->getRequest()];
return $this->getTransaction()->getResponse();
}
}

View File

@ -21,14 +21,13 @@ class RequestBeforeSendEvent extends AbstractRequestEvent
*/
public function intercept($result)
{
$request = $this->getRequest();
$this->transaction[$request] = $result;
$this->stopPropagation();
if ($result instanceof ResponseInterface) {
$this->getTransaction()->setResponse($result);
$this->emitAfterSend();
} else {
$this->emitError();
$this->emitError($result);
}
}
}

View File

@ -4,6 +4,7 @@ namespace Guzzle\Http\Event;
use Guzzle\Http\Exception\RequestException;
use Guzzle\Http\Message\ResponseInterface;
use Guzzle\Http\Adapter\Transaction;
/**
* Event object emitted after a request has been sent and an error was encountered
@ -12,6 +13,18 @@ use Guzzle\Http\Message\ResponseInterface;
*/
class RequestErrorEvent extends AbstractRequestEvent
{
private $exception;
/**
* @param Transaction $transaction Transaction that contains the request
* @param RequestException $e Exception encountered
*/
public function __construct(Transaction $transaction, RequestException $e)
{
parent::__construct($transaction);
$this->exception = $e;
}
/**
* Intercept the exception and inject a response
*
@ -19,8 +32,7 @@ class RequestErrorEvent extends AbstractRequestEvent
*/
public function intercept(ResponseInterface $response)
{
$request = $this->getRequest();
$this->transaction[$request] = $response;
$this->getTransaction()->setResponse($response);
$this->stopPropagation();
$this->emitAfterSend();
}
@ -32,17 +44,7 @@ class RequestErrorEvent extends AbstractRequestEvent
*/
public function getException()
{
return $this->transaction[$this->getRequest()];
}
/**
* Check if the exception has a response
*
* @return bool
*/
public function hasResponse()
{
return $this->getException()->hasResponse();
return $this->exception;
}
/**

View File

@ -46,14 +46,9 @@ class MessageFactory implements MessageFactoryInterface
$this->redirectPlugin = new RedirectPlugin();
}
public function createResponse($statusCode = null, array $options = [])
public function createResponse($statusCode = null, array $headers = [], $body = null, array $options = [])
{
return new Response(
$statusCode,
isset($options['headers']) ? $options['headers'] : [],
isset($options['body']) ? $options['body'] : null,
$options
);
return new Response($statusCode, $headers, $body, $options);
}
public function createRequest(

View File

@ -59,14 +59,13 @@ interface MessageFactoryInterface
* Creates a response
*
* @param string $statusCode HTTP status code
* @param array $headers Response headers
* @param mixed $body Response body
* @param array $options Response options
* - reason_phrase: Response reason phrase
* - protocol_version: HTTP protocol version
* - headers: Array of headers
* - body: Response body
* - header_factory: Factory used to create headers
*
* @return ResponseInterface
*/
public function createResponse($statusCode = null, array $options = []);
public function createResponse($statusCode = null, array $headers = [], $body = null, array $options = []);
}