Skip to content

Commit 9c79c6d

Browse files
authored
Update Kafka version to 4.0 (#61)
* Update Kafka version to 4.0 Signed-off-by: Jakub Scholz <www@scholzj.com> * Review comments Signed-off-by: Jakub Scholz <www@scholzj.com> --------- Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent 5b7025a commit 9c79c6d

8 files changed

Lines changed: 62 additions & 57 deletions

File tree

.github/workflows/build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ jobs:
77
runs-on: ubuntu-latest
88
steps:
99
- uses: actions/checkout@v4
10-
- name: Set up JDK 11
10+
- name: Set up JDK 17
1111
uses: actions/setup-java@v4
1212
with:
1313
distribution: 'temurin'
14-
java-version: '11'
14+
java-version: '17'
1515
cache: 'gradle'
1616
- name: Download opa
1717
run: wget -O opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ Open Policy Agent (OPA) plugin for Kafka authorization.
77

88
### Prerequisites
99

10-
* Kafka 2.7.0+
11-
* Java 11 or above
10+
* Kafka 3.8.0+ (for older Kafka versions, please check previous release)
11+
* Java 17 or above
1212
* OPA installed and running on the brokers
1313

1414
## Installation

build.gradle

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ group 'org.openpolicyagent.kafka'
1111
version '1.5.1'
1212

1313
java {
14-
sourceCompatibility = JavaVersion.VERSION_11
15-
targetCompatibility = JavaVersion.VERSION_11
14+
sourceCompatibility = JavaVersion.VERSION_17
15+
targetCompatibility = JavaVersion.VERSION_17
1616
withJavadocJar()
1717
withSourcesJar()
1818
}
@@ -21,26 +21,28 @@ repositories {
2121
mavenCentral()
2222
}
2323

24-
// See versions used in Kafka here https://github.com/apache/kafka/blob/2.8.0/gradle/dependencies.gradle
24+
// See versions used in Kafka here https://github.com/apache/kafka/blob/4.0.0/gradle/dependencies.gradle
2525
dependencies {
26-
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.13', version: '2.10.5'
27-
compileOnly group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.8.0'
26+
compileOnly group: 'org.apache.kafka', name: 'kafka_2.13', version: '4.0.0'
27+
compileOnly group: 'com.typesafe.scala-logging', name: 'scala-logging_2.13', version: '3.9.5'
28+
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.13', version: '2.16.2'
2829
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
2930

3031
testImplementation group: 'org.scalatest', name: 'scalatest_2.13', version: '3.2.17'
3132
testImplementation group: 'org.scalatestplus', name: 'junit-4-13_2.13', version: '3.2.17.0'
3233
testImplementation group: 'junit', name: 'junit', version: '4.12'
3334
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.14.0'
34-
testImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.8.0'
35+
testImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '4.0.0'
36+
testImplementation group: 'org.apache.kafka', name: 'kafka-server', version: '4.0.0'
37+
testImplementation group: 'com.typesafe.scala-logging', name: 'scala-logging_2.13', version: '3.9.5'
3538
}
3639

3740
shadowJar {
3841
dependencies {
3942
exclude(dependency {
40-
!(it.moduleGroup in [
41-
'org.openpolicyagent.kafka',
42-
'com.google.guava'
43-
])
43+
!(it.moduleGroup in ['org.openpolicyagent.kafka', 'com.google.guava']
44+
|| (it.moduleGroup == 'com.fasterxml.jackson.module' && it.moduleName == 'jackson-module-scala_2.13')
45+
|| (it.moduleGroup == 'com.thoughtworks.paranamer' && it.moduleName == 'paranamer'))
4446
})
4547
}
4648
}

example/docker-compose.yaml

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,29 @@ services:
1919
- "--set=bundles.authz.resource=bundle.tar.gz"
2020
depends_on:
2121
- nginx
22-
zookeeper:
23-
image: confluentinc/cp-zookeeper:6.2.1
24-
ports:
25-
- "2181:2181"
26-
environment:
27-
- ALLOW_ANONYMOUS_LOGIN=yes
28-
- ZOOKEEPER_CLIENT_PORT=2181
2922
broker:
3023
# If experiencing hangs on darwin/arm64, explicitly setting the platform here seems to help
3124
# platform: linux/amd64
32-
image: confluentinc/cp-kafka:6.2.1
25+
image: apache/kafka:4.0.0
3326
ports:
3427
- "9093:9093"
3528
environment:
3629
CLASSPATH: "/plugin/*"
3730
KAFKA_AUTHORIZER_CLASS_NAME: org.openpolicyagent.kafka.OpaAuthorizer
3831
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
3932
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10 # For development only
40-
KAFKA_BROKER_ID: 1
41-
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
33+
KAFKA_NODE_ID: 1
34+
KAFKA_PROCESS_ROLES: broker,controller
35+
KAFKA_LISTENERS: CONTROLLER://broker:9092,SSL://broker:9093
4236
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
43-
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
37+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
38+
KAFKA_INTER_BROKER_LISTENER_NAME: SSL
39+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,SSL:SSL
40+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9092
4441
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
45-
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
46-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
4742
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
43+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
44+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
4845
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
4946
KAFKA_SSL_KEYSTORE_FILENAME: server.keystore
5047
KAFKA_SSL_KEYSTORE_CREDENTIALS: credentials.txt
@@ -57,4 +54,3 @@ services:
5754
- "./cert/server:/etc/kafka/secrets"
5855
depends_on:
5956
- opa
60-
- zookeeper

example/opa_tutorial/docker-compose.yaml

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,36 @@ services:
1818
- "--set=bundles.authz.resource=bundle.tar.gz"
1919
depends_on:
2020
- nginx
21-
zookeeper:
22-
image: confluentinc/cp-zookeeper:6.2.1
23-
ports:
24-
- "2181:2181"
25-
environment:
26-
- ALLOW_ANONYMOUS_LOGIN=yes
27-
- ZOOKEEPER_CLIENT_PORT=2181
2821
broker:
2922
image: confluentinc/cp-kafka:6.2.1
3023
ports:
3124
- "9093:9093"
3225
environment:
33-
# Set cache expiry to low value for development in order to see decisions
34-
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10
35-
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
26+
CLASSPATH: "/plugin/*"
3627
KAFKA_AUTHORIZER_CLASS_NAME: org.openpolicyagent.kafka.OpaAuthorizer
37-
KAFKA_BROKER_ID: 1
38-
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
39-
KAFKA_ADVERTISED_LISTENERS: SSL://broker:9093
40-
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
28+
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
29+
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10 # For development only
30+
KAFKA_NODE_ID: 1
31+
KAFKA_PROCESS_ROLES: broker,controller
32+
KAFKA_LISTENERS: CONTROLLER://broker:9092,SSL://broker:9093
33+
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
34+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
35+
KAFKA_INTER_BROKER_LISTENER_NAME: SSL
36+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,SSL:SSL
37+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9092
4138
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
42-
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
43-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
4439
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
40+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
41+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
4542
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
4643
KAFKA_SSL_KEYSTORE_FILENAME: server.keystore
4744
KAFKA_SSL_KEYSTORE_CREDENTIALS: credentials.txt
4845
KAFKA_SSL_KEY_CREDENTIALS: credentials.txt
4946
KAFKA_SSL_TRUSTSTORE_FILENAME: server.truststore
5047
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: credentials.txt
5148
KAFKA_SSL_CLIENT_AUTH: required
52-
CLASSPATH: "/plugin/*"
5349
volumes:
5450
- "./plugin:/plugin"
5551
- "./cert/server:/etc/kafka/secrets"
5652
depends_on:
57-
- opa
58-
- zookeeper
53+
- opa

src/test/scala/org/openpolicyagent/kafka/AzRequestContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ case class AzServerInfo(
2020
brokerId: Int,
2121
clusterResource: ClusterResource,
2222
endpoints: Collection[Endpoint],
23-
interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
23+
interBrokerEndpoint: Endpoint,
24+
earlyStartListeners: Collection[String]) extends AuthorizerServerInfo

src/test/scala/org/openpolicyagent/kafka/OpaAuthorizerBenchmark.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package org.openpolicyagent.kafka
33
import java.net.InetAddress
44
import java.util
55
import java.util.concurrent.TimeUnit
6-
7-
import kafka.network.RequestChannel
86
import org.apache.kafka.common.acl.AclOperation
97
import org.apache.kafka.common.resource.ResourcePattern
108
import org.apache.kafka.common.resource.PatternType
119
import org.apache.kafka.common.resource.ResourceType.TOPIC
1210
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
11+
import org.apache.kafka.network.Session
1312
import org.apache.kafka.server.authorizer.Action
13+
1414
import scala.jdk.CollectionConverters._
1515

1616
object OpaAuthorizerBenchmark {
@@ -45,7 +45,7 @@ class OpaAuthorizerBenchmark {
4545

4646
def createRequest = {
4747
val principal = new KafkaPrincipal("User", "user-" + new scala.util.Random().nextInt())
48-
val session = RequestChannel.Session(principal, InetAddress.getLoopbackAddress)
48+
val session = new Session(principal, InetAddress.getLoopbackAddress)
4949
val resource = new ResourcePattern(TOPIC, "my-topic", PatternType.LITERAL)
5050
val authzReqContext = new AzRequestContext(
5151
clientId = "rdkafka",

src/test/scala/org/openpolicyagent/kafka/OpaAuthorizerSpec.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package org.openpolicyagent.kafka
22

3+
import com.fasterxml.jackson.databind.json.JsonMapper
4+
import com.fasterxml.jackson.databind.module.SimpleModule
5+
36
import java.net.{InetAddress, URI}
47
import java.net.http.HttpRequest.BodyPublishers
58
import java.net.http.HttpResponse.BodyHandlers
69
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
710
import org.junit.runner.RunWith
811
import org.scalatestplus.junit.JUnitRunner
912
import com.typesafe.scalalogging.LazyLogging
10-
import com.fasterxml.jackson.databind.ObjectMapper
11-
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
12-
import kafka.network.RequestChannel
13+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
1314
import org.apache.kafka.common.acl.AclOperation
1415
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
1516
import org.apache.kafka.common.protocol.ApiKeys
@@ -18,10 +19,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
1819
import org.apache.kafka.common.resource.ResourcePattern
1920
import org.apache.kafka.common.resource.ResourceType
2021
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
22+
import org.apache.kafka.network.Session
2123
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult}
2224
import org.scalatest._
2325
import matchers.should._
2426
import flatspec._
27+
import org.apache.kafka.common.message.RequestHeaderData
2528

2629
import java.lang.management.ManagementFactory
2730
import javax.management.ObjectName
@@ -35,7 +38,15 @@ import scala.jdk.CollectionConverters._
3538
class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTester with LazyLogging {
3639

3740
private val opaUrl = "http://localhost:8181/v1/data/kafka/authz/allow"
38-
private val objectMapper = (new ObjectMapper() with ScalaObjectMapper).registerModule(DefaultScalaModule)
41+
private val requestSerializerModule = new SimpleModule()
42+
.addSerializer(classOf[ResourcePattern], new ResourcePatternSerializer)
43+
.addSerializer(classOf[Action], new ActionSerializer)
44+
.addSerializer(classOf[RequestContext], new RequestContextSerializer)
45+
.addSerializer(classOf[ClientInformation], new ClientInformationSerializer)
46+
.addSerializer(classOf[KafkaPrincipal], new KafkaPrincipalSerializer)
47+
.addSerializer(classOf[RequestHeader], new RequestHeaderSerializer)
48+
.addSerializer(classOf[RequestHeaderData], new RequestHeaderDataSerializer)
49+
private val objectMapper = JsonMapper.builder().addModule(requestSerializerModule).addModule(DefaultScalaModule).build()
3950
private val defaultCacheCapacity = 50000
4051
private lazy val opaResponse = testOpaConnection()
4152

@@ -295,8 +306,8 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
295306

296307
def createRequest(username: String, actions: List[Action]): FullRequest = {
297308
val principal = new KafkaPrincipal("User", username)
298-
val session = RequestChannel.Session(principal, InetAddress.getLoopbackAddress)
299-
val authzReqContext = new AzRequestContext(
309+
val session = new Session(principal, InetAddress.getLoopbackAddress)
310+
val authzReqContext = AzRequestContext(
300311
clientId = "rdkafka",
301312
requestType = 1,
302313
listenerName = "SASL_PLAINTEXT",

0 commit comments

Comments
 (0)