Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CONFLUENT_NETWORK_SUBNET ?= 172.68.0.0/24
SCHEMA_REGISTRY_IPV4 ?= 172.68.0.103
KAFKA_BROKER_IPV4 ?= 172.68.0.102
ZOOKEEPER_IPV4 ?= 172.68.0.101
CONFLUENT_PLATFORM_WARMUP_SECONDS ?= 30
CONFLUENT_PLATFORM_SHUTDOWN_GRACE_SECONDS ?= 5
COMPOSER ?= bin/composer.phar
COMPOSER_VERSION ?= 1.10.13
PHP ?= bin/php
Expand Down Expand Up @@ -71,8 +73,9 @@ install-phars:

platform:
docker-compose down
sleep $(CONFLUENT_PLATFORM_SHUTDOWN_GRACE_SECONDS)
docker-compose up -d
sleep 25
sleep $(CONFLUENT_PLATFORM_WARMUP_SECONDS)

clean:
rm -rf build
Expand All @@ -81,6 +84,6 @@ clean:
benchmark:
docker-compose down
docker-compose up -d
sleep 15
sleep $(CONFLUENT_PLATFORM_WARMUP_SECONDS)
PHP_VERSION=$(PHP_VERSION) $(PHP) ./vendor/bin/phpbench run benchmarks/AvroEncodingBench.php --report=aggregate --retry-threshold=5
docker-compose down
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ major version upgrades will have incompatibilities that will be released in the
<?php

use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;

$registry = new PromisingRegistry(
$registry = new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
);

Expand Down Expand Up @@ -140,12 +140,12 @@ $schemaId = $registry->schemaId('test-subject', $schema)->wait();
```php
<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Decorators\BlockingDecorator;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use GuzzleHttp\Client;

$registry = new BlockingRegistry(
new PromisingRegistry(
$registry = new BlockingDecorator(
new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
)
);
Expand Down Expand Up @@ -173,30 +173,30 @@ It supports both async and sync APIs.
```php
<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\Decorators\BlockingDecorator;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use FlixTech\SchemaRegistryApi\Registry\Decorators\CachingDecorator;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;

$asyncApi = new PromisingRegistry(
$asyncApi = new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
);

$syncApi = new BlockingRegistry($asyncApi);
$syncApi = new BlockingDecorator($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry(
$doctrineCachedSyncApi = new CachingDecorator(
$asyncApi,
new DoctrineCacheAdapter(
new ArrayCache()
)
);

// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry(
$syncApi,
$avroObjectCachedAsyncApi = new CachingDecorator(
$asyncApi,
new AvroObjectCacheAdapter()
);

Expand All @@ -212,7 +212,7 @@ $sha1HashFunction = static function (AvroSchema $schema) {
};

// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachedRegistry(
$avroObjectCachedAsyncApi = new CachingDecorator(
$syncApi,
new AvroObjectCacheAdapter(),
$sha1HashFunction
Expand Down
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
"php": "^7.3",
"ext-curl": "*",
"ext-json": "*",
"guzzlehttp/guzzle": "~6.3",
"guzzlehttp/psr7": "<1.7",
"psr/http-client": "~1.0",
"guzzlehttp/guzzle": "~7.0",
"guzzlehttp/psr7": "^1.7",
"beberlei/assert": "~2.7|~3.0",
"flix-tech/avro-php": "^4.1"
},
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
avro-serializer-net:
schema-registry-net:
ipv4_address: ${ZOOKEEPER_IPV4}

broker:
Expand All @@ -25,7 +25,7 @@ services:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://172.68.0.102:9092'
networks:
avro-serializer-net:
schema-registry-net:
ipv4_address: ${KAFKA_BROKER_IPV4}

schema_registry:
Expand All @@ -40,11 +40,11 @@ services:
SCHEMA_REGISTRY_HOST_NAME: ${SCHEMA_REGISTRY_IPV4}
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
networks:
avro-serializer-net:
schema-registry-net:
ipv4_address: ${SCHEMA_REGISTRY_IPV4}

networks:
avro-serializer-net:
schema-registry-net:
driver: bridge
ipam:
config:
Expand Down
3 changes: 0 additions & 3 deletions src/AsynchronousRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
use AvroSchema;
use GuzzleHttp\Promise\PromiseInterface;

/**
* {@inheritdoc}
*/
interface AsynchronousRegistry extends Registry
{
/**
Expand Down
6 changes: 4 additions & 2 deletions src/Constants/Constants.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
const COMPATIBILITY_FULL = 'FULL';
const COMPATIBILITY_FULL_TRANSITIVE = 'FULL_TRANSITIVE';
const VERSION_LATEST = 'latest';
const ACCEPT_HEADER = ['Accept' => 'application/vnd.schemaregistry.v1+json'];
const CONTENT_TYPE_HEADER = ['Content-Type' => 'application/vnd.schemaregistry.v1+json'];
const ACCEPT = 'Accept';
const ACCEPT_HEADER = [ACCEPT => 'application/vnd.schemaregistry.v1+json'];
const CONTENT_TYPE = 'Content-Type';
const CONTENT_TYPE_HEADER = [CONTENT_TYPE => 'application/vnd.schemaregistry.v1+json'];
5 changes: 2 additions & 3 deletions src/Exception/AbstractSchemaRegistryException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

namespace FlixTech\SchemaRegistryApi\Exception;

use LogicException;
use RuntimeException;
use RuntimeException as PHPRuntimeException;

abstract class AbstractSchemaRegistryException extends RuntimeException implements SchemaRegistryException
abstract class AbstractSchemaRegistryException extends PHPRuntimeException implements SchemaRegistryException
{
public const ERROR_CODE = 0;

Expand Down
84 changes: 36 additions & 48 deletions src/Exception/ExceptionMap.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
namespace FlixTech\SchemaRegistryApi\Exception;

use Exception;
use GuzzleHttp\Exception\RequestException;
use Psr\Http\Message\ResponseInterface;
use RuntimeException;
use function array_key_exists;
use function FlixTech\SchemaRegistryApi\Requests\jsonDecode;
use function sprintf;

final class ExceptionMap
Expand All @@ -31,38 +30,56 @@ public static function instance(): ExceptionMap
return self::$instance;
}

/**
* @var array<int, callable>
*/
private $map;

private function __construct()
{
$factoryFn = static function (string $exceptionClass): callable {
return static function (int $errorCode, string $errorMessage) use ($exceptionClass): SchemaRegistryException {
return new $exceptionClass($errorMessage, $errorCode);
};
};

$this->map = [
IncompatibleAvroSchemaException::errorCode() => $factoryFn(IncompatibleAvroSchemaException::class),
BackendDataStoreException::errorCode() => $factoryFn(BackendDataStoreException::class),
OperationTimedOutException::errorCode() => $factoryFn(OperationTimedOutException::class),
MasterProxyException::errorCode() => $factoryFn(MasterProxyException::class),
InvalidVersionException::errorCode() => $factoryFn(InvalidVersionException::class),
InvalidAvroSchemaException::errorCode() => $factoryFn(InvalidAvroSchemaException::class),
SchemaNotFoundException::errorCode() => $factoryFn(SchemaNotFoundException::class),
SubjectNotFoundException::errorCode() => $factoryFn(SubjectNotFoundException::class),
VersionNotFoundException::errorCode() => $factoryFn(VersionNotFoundException::class),
InvalidCompatibilityLevelException::errorCode() => $factoryFn(InvalidCompatibilityLevelException::class),
];
}

/**
* Maps a RequestException to the internal SchemaRegistryException types.
* Maps a ResponseInterface to the internal SchemaRegistryException types.
*
* @param RequestException $exception
* @param ResponseInterface $response
*
* @return SchemaRegistryException
*
* @throws RuntimeException
*/
public function __invoke(RequestException $exception): SchemaRegistryException
public function exceptionFor(ResponseInterface $response): SchemaRegistryException
{
$response = $this->guardAgainstMissingResponse($exception);
$decodedBody = $this->guardAgainstMissingErrorCode($response);
$errorCode = $decodedBody[self::ERROR_CODE_FIELD_NAME];
$errorMessage = $decodedBody[self::ERROR_MESSAGE_FIELD_NAME];
$errorMessage = $decodedBody[self::ERROR_MESSAGE_FIELD_NAME] ?? "Unknown Error";

return $this->mapErrorCodeToException($errorCode, $errorMessage);
}

private function guardAgainstMissingResponse(RequestException $exception): ResponseInterface
public function hasMappableError(ResponseInterface $response): bool
{
$response = $exception->getResponse();
$statusCode = $response->getStatusCode();

if (!$response) {
throw new RuntimeException('RequestException has no response to inspect', 0, $exception);
}

return $response;
return $statusCode >= 400 && $statusCode < 600;
}

/**
Expand All @@ -72,7 +89,7 @@ private function guardAgainstMissingResponse(RequestException $exception): Respo
private function guardAgainstMissingErrorCode(ResponseInterface $response): array
{
try {
$decodedBody = \GuzzleHttp\json_decode((string) $response->getBody(), true);
$decodedBody = jsonDecode((string) $response->getBody());

if (!array_key_exists(self::ERROR_CODE_FIELD_NAME, $decodedBody)) {
throw new RuntimeException(
Expand All @@ -98,39 +115,10 @@ private function guardAgainstMissingErrorCode(ResponseInterface $response): arra

private function mapErrorCodeToException(int $errorCode, string $errorMessage): SchemaRegistryException
{
switch ($errorCode) {
case IncompatibleAvroSchemaException::errorCode():
return new IncompatibleAvroSchemaException($errorMessage, $errorCode);

case BackendDataStoreException::errorCode():
return new BackendDataStoreException($errorMessage, $errorCode);

case OperationTimedOutException::errorCode():
return new OperationTimedOutException($errorMessage, $errorCode);

case MasterProxyException::errorCode():
return new MasterProxyException($errorMessage, $errorCode);

case InvalidVersionException::errorCode():
return new InvalidVersionException($errorMessage, $errorCode);

case InvalidAvroSchemaException::errorCode():
return new InvalidAvroSchemaException($errorMessage, $errorCode);

case SchemaNotFoundException::errorCode():
return new SchemaNotFoundException($errorMessage, $errorCode);

case SubjectNotFoundException::errorCode():
return new SubjectNotFoundException($errorMessage, $errorCode);

case VersionNotFoundException::errorCode():
return new VersionNotFoundException($errorMessage, $errorCode);

case InvalidCompatibilityLevelException::errorCode():
return new InvalidCompatibilityLevelException($errorMessage, $errorCode);

default:
throw new RuntimeException(sprintf('Unknown error code "%d"', $errorCode));
if (!array_key_exists($errorCode, $this->map)) {
throw new RuntimeException(sprintf('Unknown error code "%d"', $errorCode));
}

return $this->map[$errorCode]($errorCode, $errorMessage);
}
}
17 changes: 17 additions & 0 deletions src/Exception/LogicException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace FlixTech\SchemaRegistryApi\Exception;

use LogicException as PHPLogicException;

class LogicException extends PHPLogicException implements SchemaRegistryException
{
public const ERROR_CODE = 99997;

public static function errorCode(): int
{
return self::ERROR_CODE;
}
}
17 changes: 17 additions & 0 deletions src/Exception/RuntimeException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace FlixTech\SchemaRegistryApi\Exception;

use RuntimeException as PHPRuntimeException;

class RuntimeException extends PHPRuntimeException implements SchemaRegistryException
{
public const ERROR_CODE = 99998;

public static function errorCode(): int
{
return self::ERROR_CODE;
}
}
4 changes: 0 additions & 4 deletions src/Registry/Cache/AvroObjectCacheAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
namespace FlixTech\SchemaRegistryApi\Registry\Cache;

use AvroSchema;
use FlixTech\SchemaRegistryApi\Registry\CacheAdapter;

/**
* {@inheritdoc}
*/
class AvroObjectCacheAdapter implements CacheAdapter
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace FlixTech\SchemaRegistryApi\Registry;
namespace FlixTech\SchemaRegistryApi\Registry\Cache;

use AvroSchema;

Expand Down
1 change: 0 additions & 1 deletion src/Registry/Cache/CacheItemPoolAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use AvroSchema;
use AvroSchemaParseException;
use FlixTech\SchemaRegistryApi\Registry\CacheAdapter;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Cache\Exception\InvalidArgumentException;

Expand Down
4 changes: 0 additions & 4 deletions src/Registry/Cache/DoctrineCacheAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
use AvroSchema;
use AvroSchemaParseException;
use Doctrine\Common\Cache\Cache;
use FlixTech\SchemaRegistryApi\Registry\CacheAdapter;

/**
* {@inheritdoc}
*/
class DoctrineCacheAdapter implements CacheAdapter
{
/**
Expand Down
Loading