Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/net/KNet.Serialization.Avro/AvroSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Avro
/// <typeparam name="T"></typeparam>
public class AvroSerDes<T> : KNetSerDes<T>
{
/// <summary>
/// Can manage any type in <typeparamref name="T"/>
/// </summary>
protected override bool ManagesAnyType => true;
/// <summary>
/// The extension uses <see cref="Headers"/>
/// </summary>
Expand Down
4 changes: 0 additions & 4 deletions src/net/KNet.Serialization.Json/JsonSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ namespace MASES.KNet.Serialization.Json
/// <typeparam name="T"></typeparam>
public class JsonSerDes<T> : KNetSerDes<T>
{
/// <summary>
/// Can manage any type in <typeparamref name="T"/>
/// </summary>
protected override bool ManagesAnyType => true;
/// <summary>
/// The extension uses <see cref="Headers"/>
/// </summary>
Expand Down
4 changes: 0 additions & 4 deletions src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ namespace MASES.KNet.Serialization.MessagePack
/// <typeparam name="T"></typeparam>
public class MessagePackSerDes<T> : KNetSerDes<T>
{
/// <summary>
/// Can manage any type in <typeparamref name="T"/>
/// </summary>
protected override bool ManagesAnyType => true;
/// <summary>
/// The extension uses <see cref="Headers"/>
/// </summary>
Expand Down
4 changes: 0 additions & 4 deletions src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ public class ProtobufSerDes<T> : KNetSerDes<T>
{
readonly MessageParser<T> _parser = new MessageParser<T>(() => new T());
/// <summary>
/// Can manage any type in <typeparamref name="T"/>
/// </summary>
protected override bool ManagesAnyType => true;
/// <summary>
/// The extension uses <see cref="Headers"/>
/// </summary>
public override bool UseHeaders => true;
Expand Down
15 changes: 2 additions & 13 deletions src/net/KNet/Specific/Serialization/KNetSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,10 @@ namespace MASES.KNet.Serialization
/// <typeparam name="T">The type to serialize/deserialize</typeparam>
public class KNetSerDes<T> : IKNetSerializer<T>, IKNetDeserializer<T>
{
readonly bool _ManagesAnyType = KNetSerialization.IsInternalManaged<T>();
readonly KNetSerialization.SerializationType _SerializationType = KNetSerialization.InternalSerDesType<T>();
Serializer<byte[]> _KafkaSerializer = new ByteArraySerializer();
Deserializer<byte[]> _KafkaDeserializer = new ByteArrayDeserializer();
/// <summary>
/// Initialize a new <see cref="KNetSerDes{T}"/>
/// </summary>
/// <exception cref="InvalidOperationException">The <typeparamref name="T"/> needs an external serializer</exception>
public KNetSerDes()
{
if (!ManagesAnyType) throw new InvalidOperationException($"{typeof(T)} needs an external serializer, use a different constructor.");
}
/// <summary>
/// External serialization function
/// </summary>
public Func<string, T, byte[]> OnSerialize { get; set; }
Expand Down Expand Up @@ -73,10 +64,6 @@ public void Dispose()
_KafkaDeserializer?.Dispose();
_KafkaDeserializer = null;
}
/// <summary>
/// Override in derived classes to indicate the class is able to manage complex types, default is the result of <see cref="KNetSerialization.IsInternalManaged{T}()"/>
/// </summary>
protected virtual bool ManagesAnyType => _ManagesAnyType;
/// <inheritdoc cref="IKNetSerializer{T}.KafkaSerializer"/>
public Serializer<byte[]> KafkaSerializer => _KafkaSerializer;
/// <inheritdoc cref="IKNetDeserializer{T}.KafkaDeserializer"/>
Expand All @@ -103,6 +90,7 @@ public virtual byte[] Serialize(string topic, T data)
KNetSerialization.SerializationType.String => KNetSerialization.SerializeString(topic, data as string),
KNetSerialization.SerializationType.Guid => KNetSerialization.SerializeGuid(topic, (Guid)Convert.ChangeType(data, typeof(Guid))),
KNetSerialization.SerializationType.Void => KNetSerialization.SerializeVoid(topic, data as Java.Lang.Void),
KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}."),
_ => default,
};
}
Expand Down Expand Up @@ -135,6 +123,7 @@ public virtual T Deserialize(string topic, byte[] data)
KNetSerialization.SerializationType.String => (T)(object)KNetSerialization.DeserializeString(topic, data),
KNetSerialization.SerializationType.Guid => (T)(object)KNetSerialization.DeserializeGuid(topic, data),
KNetSerialization.SerializationType.Void => (T)(object)KNetSerialization.DeserializeVoid(topic, data),
KNetSerialization.SerializationType.External => throw new InvalidOperationException($"{typeof(T)} needs an external deserializer: set {nameof(OnDeserialize)} or {nameof(OnDeserializeWithHeaders)}."),
_ => default,
};
}
Expand Down
44 changes: 28 additions & 16 deletions tests/KNetBenchmark/ProgramKNet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ static IKNetProducer<long, byte[]> KNetProducer()
.WithValueSerializerClass("org.apache.kafka.common.serialization.ByteArraySerializer")
.ToProperties();

knetKeySerializer = new KNetSerDes<long>(serializeWithHeadersFun: (topic, headers, data) =>
knetKeySerializer = new KNetSerDes<long>()
{
var key = BitConverter.GetBytes(data);
return key;
});
knetValueSerializer = new KNetSerDes<byte[]>(serializeWithHeadersFun: (topic, headers, data) =>
OnSerializeWithHeaders = (topic, headers, data) =>
{
var key = BitConverter.GetBytes(data);
return key;
}
};
knetValueSerializer = new KNetSerDes<byte[]>()
{
// var value = Encoding.Unicode.GetBytes(data);
return data;
});
OnSerializeWithHeaders = (topic, headers, data) =>
{
// var value = Encoding.Unicode.GetBytes(data);
return data;
}
};
knetProducer = new KNetProducer<long, byte[]>(props, knetKeySerializer, knetValueSerializer);
}
return knetProducer;
Expand Down Expand Up @@ -186,16 +192,22 @@ static IKNetConsumer<long, byte[]> KNetConsumer()
.ToProperties();
if (UseSerdes)
{
knetKeyDeserializer = new KNetSerDes<long>(deserializeFun: (topic, data) =>
knetKeyDeserializer = new KNetSerDes<long>()
{
var key = BitConverter.ToInt32(data, 0);
return key;
});
knetValueDeserializer = new KNetSerDes<byte[]>(deserializeFun: (topic, data) =>
OnDeserialize = (topic, data) =>
{
var key = BitConverter.ToInt32(data, 0);
return key;
}
};
knetValueDeserializer = new KNetSerDes<byte[]>()
{
// var value = Encoding.Unicode.GetString(data);
return data;
});
OnDeserialize = (topic, data) =>
{
// var value = Encoding.Unicode.GetString(data);
return data;
}
};
}

knetConsumer = UseSerdes ? new KNetConsumer<long, byte[]>(props, knetKeyDeserializer, knetValueDeserializer) : new KNetConsumer<long, byte[]>(props);
Expand Down
11 changes: 9 additions & 2 deletions tests/KNetTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ static void Main(string[] args)
serverToUse = args[0];
}

KNetSerDes<TestType> serializer = new KNetSerDes<TestType>((topic, type) => { return new byte[0]; });
KNetSerDes<TestType> deserializer = new KNetSerDes<TestType>((topic, data) => { return new TestType(0); });
KNetSerDes<TestType> serializer = new KNetSerDes<TestType>()
{
OnSerialize = (topic, type) => { return new byte[0]; }
};

KNetSerDes<TestType> deserializer = new KNetSerDes<TestType>()
{
OnDeserialize = (topic, data) => { return new TestType(0); }
};

CreateTopic();

Expand Down
8 changes: 4 additions & 4 deletions tests/KNetTestStreams/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ static void WordCountDemo()
}
};

KTable<string, long?> counts = source.FlatMapValues<string, string, Java.Lang.Iterable<string>, string>(valueMapper)
.GroupBy(keyValuemapper)
.Count();
KTable<string, Java.Lang.Long> counts = source.FlatMapValues<string, string, Java.Lang.Iterable<string>, string>(valueMapper)
.GroupBy(keyValuemapper)
.Count();

/***** version using Dynamic engine ******

Expand All @@ -174,7 +174,7 @@ static void WordCountDemo()
******************************************/

// need to override value serde to Long type
counts.ToStream().To(OUTPUT_TOPIC, Produced<string, long?>.With(Serdes.String(), Serdes.Long()));
counts.ToStream().To(OUTPUT_TOPIC, Produced<string, Java.Lang.Long>.With(Serdes.String(), Serdes.Long()));

using (var streams = new KafkaStreams(builder.Build(), props))
{
Expand Down