Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.Map;

/**
* The result of the {@link Admin#deleteConsumerGroups(Collection <String>, DeleteConsumerGroupsOptions)} call.
* The result of the {@link Admin#deleteConsumerGroups(Collection, DeleteConsumerGroupsOptions)} call for
* {@code Collection<String>} input.
*/
public class DeleteConsumerGroupsResult {
private final Map<String, KafkaFuture<Void>> futures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@
* this.consumer = consumer;
* }
*
* {@literal}@Override
* {@literal @Override}
* public void run() {
* try {
* consumer.subscribe(Arrays.asList("topic"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
* information to Kafka.
*
* <h3>Reading Transactional Records</h3>
* The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}</code>
* The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}
* configuration property. In a share group, the isolation level applies to the entire share group, not just individual
* consumers.
* <p>
Expand Down Expand Up @@ -326,7 +326,7 @@
* exceeds the acquisition lock duration. Consumers which use renewal acknowledgements can impact the delivery
* progress of the share group. If the leadership of the partition for a record being delivered changes or the
* application's connection to the leader broker is disconnected, the current delivery attempt ends.

*
* <h3><a name="multithreaded">Multithreaded Processing</a></h3>
* The consumer is NOT thread-safe. It is the responsibility of the user to ensure that multithreaded access
* is properly synchronized. Unsynchronized access will result in {@link java.util.ConcurrentModificationException}.
Expand All @@ -345,7 +345,7 @@
* this.consumer = consumer;
* }
*
* {@literal}@Override
* {@literal @Override}
* public void run() {
* try {
* consumer.subscribe(Arrays.asList("topic"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,10 +1210,11 @@ public String toStringBase() {

/**
* <p>This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}.
* <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</>
* <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li>
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</>.
* <p>
* <ul>
* <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</li>
* <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li>
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</li>
* </ul>
* {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests.
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ public static <K, V> List<ConsumerInterceptor<K, V>> configuredConsumerIntercept

/**
* Update subscription state and metadata using the provided committed offsets:
* <li>Update partition offsets with the committed offsets</li>
* <li>Update the metadata with any newer leader epoch discovered in the committed offsets
* metadata</li>
* </p>
* <ul>
* <li>Update partition offsets with the committed offsets</li>
* <li>Update the metadata with any newer leader epoch discovered in the committed offsets
* metadata</li>
* </ul>
* This will ignore any partition included in the <code>offsetsAndMetadata</code> parameter that
* may no longer be assigned.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ public FetchCollector(final LogContext logContext,
* Return the fetched {@link ConsumerRecord records}, empty the {@link FetchBuffer record buffer}, and
* update the consumed position.
*
* </p>
*
* NOTE: returning an {@link Fetch#empty() empty} fetch guarantees the consumed position is not updated.
*
* @param fetchBuffer {@link FetchBuffer} from which to retrieve the {@link ConsumerRecord records}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,11 @@ public Fetch<K, V> collectFetch() {
}

/**
* This method is called by {@link #close(Timer)} which is guarded by the {@link IdempotentCloser}) such as to only
* This method is called by {@link #close(Timer)} which is guarded by the {@link IdempotentCloser} such as to only
* be executed once the first time that any of the {@link #close()} methods are called. Subclasses can override
* this method without the need for extra synchronization at the instance level.
*
* <p/>
*
* <p>
* <em>Note</em>: this method is <code>synchronized</code> to reinstitute the 3.5 behavior:
*
* <blockquote>
Expand All @@ -171,9 +170,8 @@ protected synchronized void closeInternal(Timer timer) {
}

/**
* Creates the {@link FetchRequest.Builder fetch request},
* {@link NetworkClient#send(ClientRequest, long) enqueues/sends it, and adds the {@link RequestFuture callback}
* for the response.
* Creates the {@link FetchRequest.Builder fetch request}, {@link NetworkClient#send(ClientRequest, long)}
* enqueues/sends it, and adds the {@link RequestFuture} callback for the response.
*
* @param fetchRequests {@link Map} of {@link Node nodes} to their
* {@link FetchSessionHandler.FetchRequestData request data}
Expand Down Expand Up @@ -209,4 +207,4 @@ public void onFailure(RuntimeException e) {

return requestFutures;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
* On authentication failure, clients abort the operation requested and raise one
* of the subclasses of this exception:
* <ul>
* </li>{@link SaslAuthenticationException} if SASL handshake fails with invalid credentials
* <li>{@link SaslAuthenticationException} if SASL handshake fails with invalid credentials
* or any other failure specific to the SASL mechanism used for authentication</li>
* <li>{@link UnsupportedSaslMechanismException} if the SASL mechanism requested by the client
* is not supported on the broker.</li>
* <li>{@link IllegalSaslStateException} if an unexpected request is received on during SASL
* handshake. This could be due to misconfigured security protocol.</li>
* <li>{@link SslAuthenticationException} if SSL handshake failed due to any {@link SSLException}.
* <li>{@link SslAuthenticationException} if SSL handshake failed due to any {@link SSLException}.</li>
* </ul>
*/
public class AuthenticationException extends InvalidConfigurationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ public static Properties loadProps(String filename, List<String> onlyIncludeKeys
}

/**
* Converts a Properties object to a Map<String, String>, calling {@link #toString} to ensure all keys and values
* are Strings.
* Converts a Properties object to a {@code Map<String, String>}, calling {@link #toString} to ensure all keys and
* values are Strings.
*/
public static Map<String, String> propsToStringMap(Properties props) {
Map<String, String> result = new HashMap<>();
Expand Down Expand Up @@ -1406,7 +1406,7 @@ public static Set<Byte> from32BitField(final int intValue) {
* @param <K> The Map key type
* @param <V> The Map value type
* @param <M> The type of the Map itself.
* @return new Collector<Map.Entry<K, V>, M, M>
* @return new {@code Collector<Map.Entry<K, V>, M, M>}
*/
public static <K, V, M extends Map<K, V>> Collector<Map.Entry<K, V>, M, M> entriesToMap(final Supplier<M> mapSupplier) {
return new Collector<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@
* <p>
* Properties passed to these methods are used to construct internal {@link Admin} and {@link Consumer} clients.
* Sub-configs like "admin.xyz" are also supported. For example:
* </p>
* <pre>
* bootstrap.servers = host1:9092
* consumer.client.id = mm2-client
* </pre>
* <p>
* @see MirrorClientConfig for additional properties used by the internal MirrorClient.
* </p>
*/
public final class RemoteClusterUtils {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public abstract class RestServerConfig extends AbstractConfig {

/**
* Add the properties related to a user-facing server to the given {@link ConfigDef}.
* </p>
* <p>
* This automatically adds the properties for intra-cluster communication; it is not necessary to
* invoke both {@link #addInternalConfig(ConfigDef)} and this method on the same {@link ConfigDef}.
* @param configDef the {@link ConfigDef} to add the properties to; may not be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ public static VerificationMode anyTimes() {
* <p>
* Sample usage:
* <p>
* <pre>
* <pre>{@code
* Producer<byte[], byte[]> producer = mock(Producer.class);
* // ... run through some test case that uses the mocked producer
* assertEquals(
* "Producer should have aborted every record it sent",
* countInvocations(producer, "abortTransaction"),
* countInvocations(producer, "send", ProducerRecord.class, Callback.class)
* );
* </pre>
* }</pre>
* @param mock the mock object whose method invocations should be counted; may not be null
* @param methodName the name of the method whose invocations should be counted; may not be null
* @param parameters the types of the parameters for the method whose invocations should be counted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
*
* <p>The co-partitioned assignment will be:
* <ul>
* <li<code> M0: [T1P0, T1P1, T2P0, T2P1] </code></li>
* <li><code> M0: [T1P0, T1P1, T2P0, T2P1] </code></li>
* <li><code> M1: [T1P2, T2P2] </code></li>
* </ul>
*
Expand All @@ -64,8 +64,8 @@
*
* <p>The assignment could be completely shuffled to:
* <ul>
* <li><code> M3 (was M0): [T1P2, T2P2] (before it was [T1P0, T1P1, T2P0, T2P1]) </code>
* <li><code> M2 (was M1): [T1P0, T1P1, T2P0, T2P1] (before it was [T1P2, T2P2]) </code>
* <li><code> M3 (was M0): [T1P2, T2P2] (before it was [T1P0, T1P1, T2P0, T2P1]) </code></li>
* <li><code> M2 (was M1): [T1P0, T1P1, T2P0, T2P1] (before it was [T1P2, T2P2]) </code></li>
* </ul>
*
* The assignment change was caused by the change of <code>member.id</code> relative order, and
Expand All @@ -77,8 +77,8 @@
*
* <p>The assignment will always be:
* <ul>
* <li><code> I0: [T1P0, T1P1, T2P0, T2P1] </code>
* <li><code> I1: [T1P2, T2P2] </code>
* <li><code> I0: [T1P0, T1P1, T2P0, T2P1] </code></li>
* <li><code> I1: [T1P2, T2P2] </code></li>
* </ul>
* <p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ long generateRandomLongWithExactBytesSet(int bytesSet) {
/**
* Generates a random int32 number which occupies exactly bytesSet in the variable length encoding for int32
*
* @see {@link #generateRandomLongWithExactBytesSet(int)} for implementation details.
* @see #generateRandomLongWithExactBytesSet(int) for implementation details.
*
*/
int generateRandomIntWithExactBytesSet(int bytesSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static ControllerResult<Void> recordsForNonEmptyLog(

/**
* Generate the set of activation records.
* </p>
* <p>
* If the metadata version is empty, write the bootstrap records. If the metadata version is not empty, do some validation and
* possibly write some records to put the log into a valid state. For bootstrap records, if KIP-868
* metadata transactions are supported, use them. Otherwise, write the bootstrap records as an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public final void resetToImage(MetadataImage image) {
* translate between batch offsets and record offsets, and track the number of bytes we
* have read. Additionally, there is the chance that one of the records is a metadata
* version change which needs to be handled differently.
* </p>
* <p>
* If this batch starts a transaction, any records preceding the transaction in this
* batch will be implicitly added to the transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@
import java.util.Optional;

/**
* A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type
* A simple wrapper over Jackson's JsonNode that enables type safe parsing via the {@code DecodeJson} type
* class.
* <br>
* Typical usage would be something like:
* <pre><code>
* // Given a jsonNode containing a parsed JSON:
* JsonObject jsonObject = JsonValue.apply(jsonNode).asJsonObject();
* Integer intField = jsonObject.apply("int_field").to(new DecodeJson.DecodeInteger());
* Optional<Integer> optionLongField = jsonObject.apply("option_long_field").to(DecodeJson.decodeOptional(new DecodeJson.DecodeInteger()));
* Map<String, Integer> mapStringIntField = jsonObject.apply("map_string_int_field").to(DecodeJson.decodeMap(new DecodeJson.DecodeInteger()));
* List<String> seqStringField = jsonObject.apply("seq_string_field").to(DecodeJson.decodeList(new DecodeJson.DecodeString()));
* Optional&lt;Integer&gt; optionLongField =
* jsonObject.apply("option_long_field").to(DecodeJson.decodeOptional(new DecodeJson.DecodeInteger()));
* Map&lt;String, Integer&gt; mapStringIntField =
* jsonObject.apply("map_string_int_field").to(DecodeJson.decodeMap(new DecodeJson.DecodeInteger()));
* List&lt;String&gt; seqStringField =
* jsonObject.apply("seq_string_field").to(DecodeJson.decodeList(new DecodeJson.DecodeString()));
* </code></pre>
* The `to` method throws an exception if the value cannot be converted to the requested type.
* The {@code to} method throws an exception if the value cannot be converted to the requested type.
*/

public interface JsonValue {
Expand All @@ -55,7 +58,8 @@ default JsonObject asJsonObject() throws JsonMappingException {
}

/**
* If this is a JSON object, return a JsonObject wrapped by an `Optional`. Otherwise, return Empty.
* If this is a JSON object, return a JsonObject wrapped by an {@code Optional}. Otherwise, return
* {@code Optional.empty()}.
*/
default Optional<JsonObject> asJsonObjectOptional() {
if (this instanceof JsonObject) {
Expand All @@ -77,7 +81,8 @@ default JsonArray asJsonArray() throws JsonMappingException {
}

/**
* If this is a JSON array, return a JsonArray wrapped by an `Optional`. Otherwise, return Empty.
* If this is a JSON array, return a JsonArray wrapped by an {@code Optional}. Otherwise, return
* {@code Optional.empty()}.
*/
default Optional<JsonArray> asJsonArrayOptional() {
if (this instanceof JsonArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,20 @@ public void processThrottledChannelReaperDoWork() {
/**
* Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
* for all clients.
* <p/>
* Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection,
* the most specific quota matching the connection will be applied. For example, if both a <user, client-id>
* and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes
* precedence over client-id quota. The order of precedence is:
* <p>
* Quotas can be set at {@code <user, client-id>}, user or client-id levels. For a given client connection,
* the most specific quota matching the connection will be applied. For example, if both a
* {@code <user, client-id>} and a user quota match a connection, the {@code <user, client-id>} quota will be
* used. Otherwise, user quota takes precedence over client-id quota. The order of precedence is:
* <ul>
* <li>/config/users/<user>/clients/<client-id>
* <li>/config/users/<user>/clients/<default>
* <li>/config/users/<user>
* <li>/config/users/<default>/clients/<client-id>
* <li>/config/users/<default>/clients/<default>
* <li>/config/users/<default>
* <li>/config/clients/<client-id>
* <li>/config/clients/<default>
* <li>{@code /config/users/<user>/clients/<client-id>}</li>
* <li>{@code /config/users/<user>/clients/<default>}</li>
* <li>{@code /config/users/<user>}</li>
* <li>{@code /config/users/<default>/clients/<client-id>}</li>
* <li>{@code /config/users/<default>/clients/<default>}</li>
* <li>{@code /config/users/<default>}</li>
* <li>{@code /config/clients/<client-id>}</li>
* <li>{@code /config/clients/<default>}</li>
* </ul>
* Quota limits including defaults may be updated dynamically. The implementation is optimized for the case
* where a single level of quotas is configured.
Expand Down Expand Up @@ -508,11 +508,12 @@ protected Sensor getOrCreateSensor(String sensorName, long expirationTimeSeconds
}

/**
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* Overrides quotas for {@code <user>}, {@code <client-id>} or {@code <user, client-id>} or the dynamic defaults
* for any of these levels.
*
* @param userEntity user to override if quota applies to <user> or <user, client-id>
* @param clientEntity sanitized client entity to override if quota applies to <client-id> or <user, client-id>
* @param userEntity user to override if quota applies to {@code <user>} or {@code <user, client-id>}
* @param clientEntity sanitized client entity to override if quota applies to {@code <client-id>} or
* {@code <user, client-id>}
* @param quota custom quota to apply or None if quota override is being removed
*/
public void updateQuota(
Expand Down Expand Up @@ -895,4 +896,4 @@ public Map<String, String> quotaMetricTags(String sanitizedUser, String clientId
@Override
public void close() {}
}
}
}
Loading