diff --git a/src/net/KNet.Serialization.Avro/AvroSerDes.cs b/src/net/KNet.Serialization.Avro/AvroSerDes.cs index 316787d31c..b7eb0c9a92 100644 --- a/src/net/KNet.Serialization.Avro/AvroSerDes.cs +++ b/src/net/KNet.Serialization.Avro/AvroSerDes.cs @@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Avro /// public class AvroSerDes : KNetSerDes { - /// - /// Can manage any type in - /// - protected override bool ManagesAnyType => true; /// /// The extension uses /// diff --git a/src/net/KNet.Serialization.Json/JsonSerDes.cs b/src/net/KNet.Serialization.Json/JsonSerDes.cs index 46b9ec06b3..370b60d859 100644 --- a/src/net/KNet.Serialization.Json/JsonSerDes.cs +++ b/src/net/KNet.Serialization.Json/JsonSerDes.cs @@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Json /// public class JsonSerDes : KNetSerDes { - /// - /// Can manage any type in - /// - protected override bool ManagesAnyType => true; /// /// The extension uses /// diff --git a/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs b/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs index 4eaab80985..ab5fef0011 100644 --- a/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs +++ b/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs @@ -28,10 +28,6 @@ namespace MASES.KNet.Serialization.MessagePack /// public class MessagePackSerDes : KNetSerDes { - /// - /// Can manage any type in - /// - protected override bool ManagesAnyType => true; /// /// The extension uses /// diff --git a/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs b/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs index 872634d081..636a35f556 100644 --- a/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs +++ b/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs @@ -31,10 +31,6 @@ public class ProtobufSerDes : KNetSerDes { readonly MessageParser _parser = new MessageParser(() => new T()); /// - /// Can manage any type in - /// - protected override bool ManagesAnyType => true; - /// /// The extension uses /// public override bool UseHeaders => true; diff --git a/src/net/KNet/Specific/Serialization/KNetSerDes.cs b/src/net/KNet/Specific/Serialization/KNetSerDes.cs index c4647fb62f..f427a2e412 100644 --- a/src/net/KNet/Specific/Serialization/KNetSerDes.cs +++ b/src/net/KNet/Specific/Serialization/KNetSerDes.cs @@ -29,19 +29,10 @@ namespace MASES.KNet.Serialization /// The type to serialize/deserialize public class KNetSerDes : IKNetSerializer, IKNetDeserializer { - readonly bool _ManagesAnyType = KNetSerialization.IsInternalManaged(); readonly KNetSerialization.SerializationType _SerializationType = KNetSerialization.InternalSerDesType(); Serializer _KafkaSerializer = new ByteArraySerializer(); Deserializer _KafkaDeserializer = new ByteArrayDeserializer(); /// - /// Initialize a new - /// - /// The needs an external serializer - public KNetSerDes() - { - if (!ManagesAnyType) throw new InvalidOperationException($"{typeof(T)} needs an external serializer, use a different constructor."); - } - /// /// External serialization function /// public Func OnSerialize { get; set; } @@ -73,10 +64,6 @@ public void Dispose() _KafkaDeserializer?.Dispose(); _KafkaDeserializer = null; } - /// - /// Override in derived classes to indicate the class is able to manage complex types, default is the result of - /// - protected virtual bool ManagesAnyType => _ManagesAnyType; /// public Serializer KafkaSerializer => _KafkaSerializer; /// @@ -103,6 +90,7 @@ public virtual byte[] Serialize(string topic, T data) KNetSerialization.SerializationType.String => KNetSerialization.SerializeString(topic, data as string), KNetSerialization.SerializationType.Guid => KNetSerialization.SerializeGuid(topic, (Guid)Convert.ChangeType(data, typeof(Guid))), KNetSerialization.SerializationType.Void => KNetSerialization.SerializeVoid(topic, data as Java.Lang.Void), + KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}."), _ => default, }; } @@ -135,6 +123,7 @@ public virtual T Deserialize(string topic, byte[] data) KNetSerialization.SerializationType.String => (T)(object)KNetSerialization.DeserializeString(topic, data), KNetSerialization.SerializationType.Guid => (T)(object)KNetSerialization.DeserializeGuid(topic, data), KNetSerialization.SerializationType.Void => (T)(object)KNetSerialization.DeserializeVoid(topic, data), + KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external deserializer: set {nameof(OnDeserialize)} or {nameof(OnDeserializeWithHeaders)}."), _ => default, }; } diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs index d2fb6f9137..f253cb0ea0 100644 --- a/tests/KNetBenchmark/ProgramKNet.cs +++ b/tests/KNetBenchmark/ProgramKNet.cs @@ -57,16 +57,22 @@ static IKNetProducer KNetProducer() .WithValueSerializerClass("org.apache.kafka.common.serialization.ByteArraySerializer") .ToProperties(); - knetKeySerializer = new KNetSerDes(serializeWithHeadersFun: (topic, headers, data) => + knetKeySerializer = new KNetSerDes() { - var key = BitConverter.GetBytes(data); - return key; - }); - knetValueSerializer = new KNetSerDes(serializeWithHeadersFun: (topic, headers, data) => + OnSerializeWithHeaders = (topic, headers, data) => + { + var key = BitConverter.GetBytes(data); + return key; + } + }; + knetValueSerializer = new KNetSerDes() { - // var value = Encoding.Unicode.GetBytes(data); - return data; - }); + OnSerializeWithHeaders = (topic, headers, data) => + { + // var value = Encoding.Unicode.GetBytes(data); + return data; + } + }; knetProducer = new KNetProducer(props, knetKeySerializer, knetValueSerializer); } return knetProducer; @@ -186,16 +192,22 @@ static IKNetConsumer KNetConsumer() .ToProperties(); if (UseSerdes) { - knetKeyDeserializer = new KNetSerDes(deserializeFun: (topic, data) => + knetKeyDeserializer = new KNetSerDes() { - var key = BitConverter.ToInt32(data, 0); - return key; - }); - knetValueDeserializer = new KNetSerDes(deserializeFun: (topic, data) => + OnDeserialize = (topic, data) => + { + var key = BitConverter.ToInt32(data, 0); + return key; + } + }; + knetValueDeserializer = new KNetSerDes() { - // var value = Encoding.Unicode.GetString(data); - return data; - }); + OnDeserialize = (topic, data) => + { + // var value = Encoding.Unicode.GetString(data); + return data; + } + }; } knetConsumer = UseSerdes ? new KNetConsumer(props, knetKeyDeserializer, knetValueDeserializer) : new KNetConsumer(props); diff --git a/tests/KNetTest/Program.cs b/tests/KNetTest/Program.cs index 6b8ec29f7c..6d0d319287 100644 --- a/tests/KNetTest/Program.cs +++ b/tests/KNetTest/Program.cs @@ -71,8 +71,15 @@ static void Main(string[] args) serverToUse = args[0]; } - KNetSerDes serializer = new KNetSerDes((topic, type) => { return new byte[0]; }); - KNetSerDes deserializer = new KNetSerDes((topic, data) => { return new TestType(0); }); + KNetSerDes serializer = new KNetSerDes() + { + OnSerialize = (topic, type) => { return new byte[0]; } + }; + + KNetSerDes deserializer = new KNetSerDes() + { + OnDeserialize = (topic, data) => { return new TestType(0); } + }; CreateTopic(); diff --git a/tests/KNetTestStreams/Program.cs b/tests/KNetTestStreams/Program.cs index 9100f86e9a..bde0a9279a 100644 --- a/tests/KNetTestStreams/Program.cs +++ b/tests/KNetTestStreams/Program.cs @@ -161,9 +161,9 @@ static void WordCountDemo() } }; - KTable counts = source.FlatMapValues, string>(valueMapper) - .GroupBy(keyValuemapper) - .Count(); + KTable counts = source.FlatMapValues, string>(valueMapper) + .GroupBy(keyValuemapper) + .Count(); /***** version using Dynamic engine ****** @@ -174,7 +174,7 @@ static void WordCountDemo() ******************************************/ // need to override value serde to Long type - counts.ToStream().To(OUTPUT_TOPIC, Produced.With(Serdes.String(), Serdes.Long())); + counts.ToStream().To(OUTPUT_TOPIC, Produced.With(Serdes.String(), Serdes.Long())); using (var streams = new KafkaStreams(builder.Build(), props)) {