diff --git a/src/net/KNet.Serialization.Avro/AvroSerDes.cs b/src/net/KNet.Serialization.Avro/AvroSerDes.cs
index 316787d31c..b7eb0c9a92 100644
--- a/src/net/KNet.Serialization.Avro/AvroSerDes.cs
+++ b/src/net/KNet.Serialization.Avro/AvroSerDes.cs
@@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Avro
///
public class AvroSerDes : KNetSerDes
{
- ///
- /// Can manage any type in
- ///
- protected override bool ManagesAnyType => true;
///
/// The extension uses
///
diff --git a/src/net/KNet.Serialization.Json/JsonSerDes.cs b/src/net/KNet.Serialization.Json/JsonSerDes.cs
index 46b9ec06b3..370b60d859 100644
--- a/src/net/KNet.Serialization.Json/JsonSerDes.cs
+++ b/src/net/KNet.Serialization.Json/JsonSerDes.cs
@@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Json
///
public class JsonSerDes : KNetSerDes
{
- ///
- /// Can manage any type in
- ///
- protected override bool ManagesAnyType => true;
///
/// The extension uses
///
diff --git a/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs b/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs
index 4eaab80985..ab5fef0011 100644
--- a/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs
+++ b/src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs
@@ -28,10 +28,6 @@ namespace MASES.KNet.Serialization.MessagePack
///
public class MessagePackSerDes : KNetSerDes
{
- ///
- /// Can manage any type in
- ///
- protected override bool ManagesAnyType => true;
///
/// The extension uses
///
diff --git a/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs b/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs
index 872634d081..636a35f556 100644
--- a/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs
+++ b/src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs
@@ -31,10 +31,6 @@ public class ProtobufSerDes : KNetSerDes
{
readonly MessageParser _parser = new MessageParser(() => new T());
///
- /// Can manage any type in
- ///
- protected override bool ManagesAnyType => true;
- ///
/// The extension uses
///
public override bool UseHeaders => true;
diff --git a/src/net/KNet/Specific/Serialization/KNetSerDes.cs b/src/net/KNet/Specific/Serialization/KNetSerDes.cs
index c4647fb62f..f427a2e412 100644
--- a/src/net/KNet/Specific/Serialization/KNetSerDes.cs
+++ b/src/net/KNet/Specific/Serialization/KNetSerDes.cs
@@ -29,19 +29,10 @@ namespace MASES.KNet.Serialization
/// The type to serialize/deserialize
public class KNetSerDes : IKNetSerializer, IKNetDeserializer
{
- readonly bool _ManagesAnyType = KNetSerialization.IsInternalManaged();
readonly KNetSerialization.SerializationType _SerializationType = KNetSerialization.InternalSerDesType();
Serializer _KafkaSerializer = new ByteArraySerializer();
Deserializer _KafkaDeserializer = new ByteArrayDeserializer();
///
- /// Initialize a new
- ///
- /// The needs an external serializer
- public KNetSerDes()
- {
- if (!ManagesAnyType) throw new InvalidOperationException($"{typeof(T)} needs an external serializer, use a different constructor.");
- }
- ///
/// External serialization function
///
public Func OnSerialize { get; set; }
@@ -73,10 +64,6 @@ public void Dispose()
_KafkaDeserializer?.Dispose();
_KafkaDeserializer = null;
}
- ///
- /// Override in derived classes to indicate the class is able to manage complex types, default is the result of
- ///
- protected virtual bool ManagesAnyType => _ManagesAnyType;
///
public Serializer KafkaSerializer => _KafkaSerializer;
///
@@ -103,6 +90,7 @@ public virtual byte[] Serialize(string topic, T data)
KNetSerialization.SerializationType.String => KNetSerialization.SerializeString(topic, data as string),
KNetSerialization.SerializationType.Guid => KNetSerialization.SerializeGuid(topic, (Guid)Convert.ChangeType(data, typeof(Guid))),
KNetSerialization.SerializationType.Void => KNetSerialization.SerializeVoid(topic, data as Java.Lang.Void),
+ KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}."),
_ => default,
};
}
@@ -135,6 +123,7 @@ public virtual T Deserialize(string topic, byte[] data)
KNetSerialization.SerializationType.String => (T)(object)KNetSerialization.DeserializeString(topic, data),
KNetSerialization.SerializationType.Guid => (T)(object)KNetSerialization.DeserializeGuid(topic, data),
KNetSerialization.SerializationType.Void => (T)(object)KNetSerialization.DeserializeVoid(topic, data),
+ KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external deserializer: set {nameof(OnDeserialize)} or {nameof(OnDeserializeWithHeaders)}."),
_ => default,
};
}
diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs
index d2fb6f9137..f253cb0ea0 100644
--- a/tests/KNetBenchmark/ProgramKNet.cs
+++ b/tests/KNetBenchmark/ProgramKNet.cs
@@ -57,16 +57,22 @@ static IKNetProducer KNetProducer()
.WithValueSerializerClass("org.apache.kafka.common.serialization.ByteArraySerializer")
.ToProperties();
- knetKeySerializer = new KNetSerDes(serializeWithHeadersFun: (topic, headers, data) =>
+ knetKeySerializer = new KNetSerDes()
{
- var key = BitConverter.GetBytes(data);
- return key;
- });
- knetValueSerializer = new KNetSerDes(serializeWithHeadersFun: (topic, headers, data) =>
+ OnSerializeWithHeaders = (topic, headers, data) =>
+ {
+ var key = BitConverter.GetBytes(data);
+ return key;
+ }
+ };
+ knetValueSerializer = new KNetSerDes()
{
- // var value = Encoding.Unicode.GetBytes(data);
- return data;
- });
+ OnSerializeWithHeaders = (topic, headers, data) =>
+ {
+ // var value = Encoding.Unicode.GetBytes(data);
+ return data;
+ }
+ };
knetProducer = new KNetProducer(props, knetKeySerializer, knetValueSerializer);
}
return knetProducer;
@@ -186,16 +192,22 @@ static IKNetConsumer KNetConsumer()
.ToProperties();
if (UseSerdes)
{
- knetKeyDeserializer = new KNetSerDes(deserializeFun: (topic, data) =>
+ knetKeyDeserializer = new KNetSerDes()
{
- var key = BitConverter.ToInt32(data, 0);
- return key;
- });
- knetValueDeserializer = new KNetSerDes(deserializeFun: (topic, data) =>
+ OnDeserialize = (topic, data) =>
+ {
+ var key = BitConverter.ToInt32(data, 0);
+ return key;
+ }
+ };
+ knetValueDeserializer = new KNetSerDes()
{
- // var value = Encoding.Unicode.GetString(data);
- return data;
- });
+ OnDeserialize = (topic, data) =>
+ {
+ // var value = Encoding.Unicode.GetString(data);
+ return data;
+ }
+ };
}
knetConsumer = UseSerdes ? new KNetConsumer(props, knetKeyDeserializer, knetValueDeserializer) : new KNetConsumer(props);
diff --git a/tests/KNetTest/Program.cs b/tests/KNetTest/Program.cs
index 6b8ec29f7c..6d0d319287 100644
--- a/tests/KNetTest/Program.cs
+++ b/tests/KNetTest/Program.cs
@@ -71,8 +71,15 @@ static void Main(string[] args)
serverToUse = args[0];
}
- KNetSerDes serializer = new KNetSerDes((topic, type) => { return new byte[0]; });
- KNetSerDes deserializer = new KNetSerDes((topic, data) => { return new TestType(0); });
+ KNetSerDes serializer = new KNetSerDes()
+ {
+ OnSerialize = (topic, type) => { return new byte[0]; }
+ };
+
+ KNetSerDes deserializer = new KNetSerDes()
+ {
+ OnDeserialize = (topic, data) => { return new TestType(0); }
+ };
CreateTopic();
diff --git a/tests/KNetTestStreams/Program.cs b/tests/KNetTestStreams/Program.cs
index 9100f86e9a..bde0a9279a 100644
--- a/tests/KNetTestStreams/Program.cs
+++ b/tests/KNetTestStreams/Program.cs
@@ -161,9 +161,9 @@ static void WordCountDemo()
}
};
- KTable counts = source.FlatMapValues, string>(valueMapper)
- .GroupBy(keyValuemapper)
- .Count();
+ KTable counts = source.FlatMapValues, string>(valueMapper)
+ .GroupBy(keyValuemapper)
+ .Count();
/***** version using Dynamic engine ******
@@ -174,7 +174,7 @@ static void WordCountDemo()
******************************************/
// need to override value serde to Long type
- counts.ToStream().To(OUTPUT_TOPIC, Produced.With(Serdes.String(), Serdes.Long()));
+ counts.ToStream().To(OUTPUT_TOPIC, Produced.With(Serdes.String(), Serdes.Long()));
using (var streams = new KafkaStreams(builder.Build(), props))
{