Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,9 @@ namespace NKikimr::NHttpProxy {
, ProtoCall(protoCall)
, Method(method)
{
if (Signature && Signature->Empty()) {
Signature.Reset();
}
}

TStringBuilder LogPrefix() const {
Expand Down Expand Up @@ -1385,10 +1388,17 @@ namespace NKikimr::NHttpProxy {
"stream '" << ExtractStreamName<TProtoRequest>(Request) << "'");

ReportInputCounters(ctx);
if (HttpContext.IamToken.empty() && Signature) {
if (!HttpContext.IamToken.empty() || Signature) {
AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(
ctx.SelfID, HttpContext, std::move(Signature)));
} else {
if (AppData(ctx)->EnforceUserTokenRequirement || AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
return ReplyWithMessageQueueError(
ctx,
NSQS::NErrors::INCOMPLETE_SIGNATURE.HttpStatusCode,
NSQS::NErrors::INCOMPLETE_SIGNATURE.ErrorCode,
NSQS::NErrors::INCOMPLETE_SIGNATURE.DefaultMessage);
}
SendGrpcRequestNoDriver(ctx);
}

Expand Down
118 changes: 89 additions & 29 deletions ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,44 @@ THttpProxyTestMock::THttpProxyTestMock() = default;
THttpProxyTestMock::~THttpProxyTestMock() = default;

void THttpProxyTestMock::TearDown(NUnitTest::TTestContext&) {
Monitoring->Stop();
GRpcServer->Stop();
if (Monitoring) {
Monitoring->Stop();
}
if (GRpcServer) {
GRpcServer->Stop();
}
}

void THttpProxyTestMock::SetUp(NUnitTest::TTestContext&) {
InitAll();
InitAll(TInitParameters{});
}

void THttpProxyTestMock::InitAll(bool yandexCloudMode, bool enableMetering, bool enableSqsTopic) {
void THttpProxyTestMock::InitAll(const TInitParameters initParameters) {
AccessServicePort = PortManager.GetPort(8443);
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
InitKikimr(yandexCloudMode, enableMetering);
InitKikimr(initParameters.YandexCloudMode, initParameters.EnableMetering, initParameters.EnforceUserTokenRequirement);
InitAccessServiceService();
InitHttpServer(yandexCloudMode, enableSqsTopic);
InitHttpServer(initParameters.YandexCloudMode, initParameters.EnableSqsTopic);
}

TString THttpProxyTestMock::FormAuthorizationStr(const TString& region) {
TString THttpProxyTestMock::FormAuthorizationStr(const TString& region) const {
if (!SendAuthorizationStr) {
return "";
}
return TStringBuilder() <<
"Authorization: AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20150830/" << region <<
"/service/aws4_request, SignedHeaders=host;x-amz-date, Signature="
"5da7c1a2acd57cee7505fc6676e4e544621c30862966e37dddb68e92efbe5d6b)__";
}

void THttpProxyTestMock::EnableAuthorization() {
SendAuthorizationStr = true;
}

void THttpProxyTestMock::DisableAuthorization() {
SendAuthorizationStr = false;
}

NJson::TJsonValue THttpProxyTestMock::CreateCreateStreamRequest() {
NJson::TJsonValue record;
record["StreamName"] = "testtopic";
Expand Down Expand Up @@ -319,6 +334,7 @@ TMaybe<NYdb::TResultSet> THttpProxyTestMock::RunYqlDataQuery(TString query) {
TString endpoint = TStringBuilder() << "localhost:" << KikimrGrpcPort;
auto driverConfig = NYdb::TDriverConfig()
.SetEndpoint(endpoint)
.SetAuthToken("root@builtin")
.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()));
NYdb::TDriver driver(driverConfig);
auto tableClient = NYdb::NTable::TTableClient(driver);
Expand All @@ -343,7 +359,7 @@ TMaybe<NYdb::TResultSet> THttpProxyTestMock::RunYqlDataQuery(TString query) {
return resultSet;
}

void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering, bool enforceUserTokenRequirement) {
AuthFactory = std::make_shared<NKikimr::NHttpProxy::TIamAuthFactory>();
NKikimrConfig::TAppConfig appConfig;
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
Expand All @@ -354,6 +370,10 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);

appConfig.MutableFeatureFlags()->SetEnableTopicMessageLevelParallelism(true);
if (enforceUserTokenRequirement) {
auto* securityConfig = appConfig.MutableDomainsConfig()->MutableSecurityConfig();
securityConfig->SetEnforceUserTokenRequirement(true);
}

appConfig.MutableSqsConfig()->SetEnableSqs(true);
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
Expand Down Expand Up @@ -423,12 +443,12 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"},
{"cloud_id", "cloud4"},
{"database_id", "database4"}}));
{"database_id", "database4"}}, {}, {}, "root@builtin"));
NACLib::TDiffACL acl;
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "Service1_id@as");
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "proxy_sa@as");

client.ModifyACL("/", "Root", acl.SerializeAsString());
client.ModifyACL("/", "Root", acl.SerializeAsString(), "root@builtin");

client.MkDir("/Root", "SQS");

Expand All @@ -450,7 +470,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"DlqName\" Type: \"Utf8\"}"
"Columns { Name: \"TablesFormat\" Type: \"Uint32\"}"
"Columns { Name: \"Tags\" Type: \"Utf8\"}"
"KeyColumnNames: [\"Account\", \"QueueName\"]"
"KeyColumnNames: [\"Account\", \"QueueName\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS",
Expand All @@ -466,7 +488,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"TablesFormat\" Type: \"Uint32\"}"
"Columns { Name: \"StartProcessTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"NodeProcess\" Type: \"Uint32\"}"
"KeyColumnNames: [\"RemoveTimestamp\", \"QueueIdNumber\"]"
"KeyColumnNames: [\"RemoveTimestamp\", \"QueueIdNumber\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.MkDir("/Root/SQS", ".STD");
Expand All @@ -479,7 +503,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"RandomId\" Type: \"Uint64\"}"
"Columns { Name: \"SentTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"DelayDeadline\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.MkDir("/Root/SQS", ".FIFO");
Expand All @@ -495,7 +521,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"ReceiveCount\" Type: \"Uint32\"}"
"Columns { Name: \"FirstReceiveTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"SentTimestamp\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.FIFO",
Expand All @@ -506,7 +534,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"Deadline\" Type: \"Uint64\"}"
"Columns { Name: \"Offset\" Type: \"Uint64\"}"
"Columns { Name: \"MessageId\" Type: \"String\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"DedupId\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"DedupId\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.FIFO",
Expand All @@ -520,7 +550,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"Tail\" Type: \"Uint64\"}"
"Columns { Name: \"ReceiveAttemptId\" Type: \"Utf8\"}"
"Columns { Name: \"LockTimestamp\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"GroupId\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"GroupId\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.FIFO",
Expand All @@ -534,7 +566,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"Attributes\" Type: \"String\"}"
"Columns { Name: \"Data\" Type: \"String\"}"
"Columns { Name: \"MessageId\" Type: \"String\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"RandomId\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"RandomId\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.FIFO",
Expand All @@ -543,22 +577,28 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
"Columns { Name: \"ReceiveAttemptId\" Type: \"Utf8\"}"
"Columns { Name: \"Deadline\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"ReceiveAttemptId\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"ReceiveAttemptId\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS",
"Name: \".Settings\""
"Columns { Name: \"Account\" Type: \"Utf8\"}"
"Columns { Name: \"Name\" Type: \"Utf8\"}"
"Columns { Name: \"Value\" Type: \"Utf8\"}"
"KeyColumnNames: [\"Account\", \"Name\"]"
"KeyColumnNames: [\"Account\", \"Name\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS",
"Name: \".AtomicCounter\""
"Columns { Name: \"counter_key\" Type: \"Uint64\"}"
"Columns { Name: \"value\" Type: \"Uint64\"}"
"KeyColumnNames: [\"counter_key\"]"
"KeyColumnNames: [\"counter_key\"]",
TDuration::Seconds(5000),
"root@builtin"
);
RunYqlDataQuery("INSERT INTO `/Root/SQS/.AtomicCounter` (counter_key, value) VALUES (0, 0)");

Expand All @@ -577,8 +617,14 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"MaxReceiveCount\" Type: \"Uint64\"}"
"Columns { Name: \"ShowDetailedCountersDeadline\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\"]";
client.CreateTable("/Root/SQS/.STD", attributesTable);
client.CreateTable("/Root/SQS/.FIFO", attributesTable);
client.CreateTable("/Root/SQS/.STD",
attributesTable,
TDuration::Seconds(5000),
"root@builtin");
client.CreateTable("/Root/SQS/.FIFO",
attributesTable,
TDuration::Seconds(5000),
"root@builtin");

client.CreateTable("/Root/SQS",
"Name: \".Events\""
Expand All @@ -589,7 +635,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"EventTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"FolderId\" Type: \"Utf8\"}"
"Columns { Name: \"Labels\" Type: \"Utf8\"}"
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]"
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]",
TDuration::Seconds(5000),
"root@builtin"
);

auto stateTableCommon =
Expand All @@ -610,12 +658,16 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
TStringBuilder()
<< stateTableCommon
<< "Columns { Name: \"Shard\" Type: \"Uint32\"}"
<< "KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"Shard\"]"
<< "KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"Shard\"]",
TDuration::Seconds(5000),
"root@builtin"
);
client.CreateTable("/Root/SQS/.FIFO",
TStringBuilder()
<< stateTableCommon
<< "KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\"]"
<< "KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\"]",
TDuration::Seconds(5000),
"root@builtin"
);


Expand All @@ -633,7 +685,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"SentTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"VisibilityDeadline\" Type: \"Uint64\"}"
"Columns { Name: \"DelayDeadline\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);

auto sentTimestampIdxCommonColumns=
Expand All @@ -649,7 +703,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
TStringBuilder()
<< "Name: \"SentTimestampIdx\""
<< sentTimestampIdxCommonColumns
<< sendTimestampIdsKeys
<< sendTimestampIdsKeys,
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.FIFO",
Expand All @@ -661,7 +717,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"GroupId\" Type: \"String\"}"
"Columns { Name: \"RandomId\" Type: \"Uint64\"}"
"Columns { Name: \"DelayDeadline\" Type: \"Uint64\"}"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"SentTimestamp\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberHash\", \"QueueIdNumber\", \"SentTimestamp\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);

client.CreateTable("/Root/SQS/.STD",
Expand All @@ -675,7 +733,9 @@ void THttpProxyTestMock::InitKikimr(bool yandexCloudMode, bool enableMetering) {
"Columns { Name: \"Data\" Type: \"String\"}"
"Columns { Name: \"MessageId\" Type: \"String\"}"
"Columns { Name: \"SenderId\" Type: \"String\"}"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"RandomId\", \"Offset\"]"
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"RandomId\", \"Offset\"]",
TDuration::Seconds(5000),
"root@builtin"
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,19 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {

void SetUp(NUnitTest::TTestContext&) override;

void InitAll(bool yandexCloudMode = true, bool enableMetering = false, bool extendedQueueUrl = false);
struct TInitParameters {
bool YandexCloudMode : 1 = true;
bool EnableMetering : 1 = false;
bool EnableSqsTopic : 1 = false;
bool EnforceUserTokenRequirement : 1 = false;
};

static TString FormAuthorizationStr(const TString& region);
void InitAll(const TInitParameters initParameters);

TString FormAuthorizationStr(const TString& region) const;

void EnableAuthorization();
void DisableAuthorization();

static NJson::TJsonValue CreateCreateStreamRequest();

Expand Down Expand Up @@ -223,7 +233,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
private:
TMaybe<NYdb::TResultSet> RunYqlDataQuery(TString query);

void InitKikimr(bool yandexCloudMode, bool enableMetering);
void InitKikimr(bool yandexCloudMode, bool enableMetering, bool enforceUserTokenRequirement);

void InitAccessServiceService();

Expand Down Expand Up @@ -251,22 +261,32 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
ui16 MonPort = 0;
ui16 KikimrGrpcPort = 0;
bool SqsTopicMode = false;
bool SendAuthorizationStr = true;
};

class THttpProxyTestMockForSQS : public THttpProxyTestMock {
public:
void SetUp(NUnitTest::TTestContext&) override {
InitAll(false);
InitAll(TInitParameters{
.YandexCloudMode = false,
});
}
};

class THttpProxyTestMockWithMetering : public THttpProxyTestMock {
public:
void SetUp(NUnitTest::TTestContext&) override {
InitAll(true, true);
InitAll(TInitParameters{
.EnableMetering = true,
});
}
};

class THttpProxyTestMockForSQSTopic : public THttpProxyTestMock {
public:
void SetUp(NUnitTest::TTestContext&) override {
InitAll(true, false, true);
InitAll(TInitParameters{
.EnableSqsTopic = true,
});
}
};
Loading
Loading