Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 85 additions & 25 deletions components/audit_log_filter/audit_log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,72 @@
#include <string>
#include <vector>

namespace {

/**
SAX parser handler for audit JSON logs.
Saves "timestamp" field from the first encountered event, and stops parsing.
*/
class AuditJsonFirstTimestampHandler
: public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
AuditJsonFirstTimestampHandler> {
public:
bool Default() {
m_expect_timestamp = false;

// We want to make sure the first parsed token is "open-array".
// If we encounter something else first, we stop parsing.
return m_array_open;
}

bool String(const char *str, rapidjson::SizeType length,
[[maybe_unused]] bool copy) {
if (m_expect_timestamp) {
timestamp.assign(str, length);
return false;
}
return Default();
}

bool StartObject() {
++m_depth;
return Default();
}

bool Key(const char *str, rapidjson::SizeType length,
[[maybe_unused]] bool copy) {
// Depth 2 means we're in "[ { ...".
m_expect_timestamp =
m_depth == 2 && std::string_view(str, length) == "timestamp";
return true;
}

bool EndObject([[maybe_unused]] rapidjson::SizeType memberCount) {
--m_depth;
return Default();
}

bool StartArray() {
m_array_open = true;
++m_depth;
return Default();
}

bool EndArray([[maybe_unused]] rapidjson::SizeType elementCount) {
--m_depth;
return Default();
}

std::string timestamp;

private:
int m_depth = 0;
bool m_expect_timestamp = false;
bool m_array_open = false;
};

} // namespace

namespace audit_log_filter {

void AuditLogReader::set_files_to_read_list(
Expand All @@ -41,12 +107,14 @@ void AuditLogReader::set_files_to_read_list(
return;
}

std::vector<std::string> tp_list;
for (auto item = m_timestamp_to_file_map.cbegin();
item != m_timestamp_to_file_map.cend(); ++item) {
auto next_item = std::next(item);
bool is_last_item = next_item == m_timestamp_to_file_map.cend();

for (const auto &item : m_timestamp_to_file_map) {
if (item.second->first_timestamp >=
reader_context->next_event_bookmark.timestamp) {
auto *file_info = item.second.get();
if (is_last_item || reader_context->next_event_bookmark.timestamp <=
next_item->second->first_timestamp) {
auto *file_info = item->second.get();

if (file_info->is_encrypted && file_info->encryption_options == nullptr) {
continue;
Expand Down Expand Up @@ -149,16 +217,9 @@ bool AuditLogReader::init() noexcept {
}

for (const auto &log_name : new_files) {
bool is_current_log =
log_name.find(log_current_file_name) != std::string::npos;
bool is_compressed = log_name.find(".gz") != std::string::npos;
bool is_encrypted = log_name.find(".enc") != std::string::npos;

if (is_current_log && is_encrypted) {
// TODO: Improve handling of currently opened encrypted log
continue;
}

auto encryption_options_id =
audit_keyring::get_options_id_for_file_name(log_name);

Expand All @@ -183,23 +244,15 @@ bool AuditLogReader::init() noexcept {
auto json_reader_guard =
create_scope_guard([&] { json_reader_stream->close(); });

rapidjson::Document json_doc;
json_doc.ParseStream(*json_reader_stream);
rapidjson::Reader json_reader;
AuditJsonFirstTimestampHandler first_timestamp_handler;
json_reader.Parse(*json_reader_stream, first_timestamp_handler);

if (json_doc.HasParseError() || json_doc.Empty() || !json_doc.IsArray() ||
json_doc.GetArray().Empty()) {
if (first_timestamp_handler.timestamp.empty()) {
continue;
}

auto *first_event = json_doc.GetArray().Begin();

if (!first_event->IsObject() || !first_event->HasMember("timestamp") ||
!first_event->GetObject()["timestamp"].IsString()) {
continue;
}

file_info->first_timestamp =
first_event->GetObject()["timestamp"].GetString();
file_info->first_timestamp = first_timestamp_handler.timestamp;

try {
auto ts = LogFileTimestamp(log_name);
Expand Down Expand Up @@ -278,6 +331,13 @@ bool AuditLogReader::read(AuditLogReaderContext *reader_context) noexcept {
*reader_context->audit_json_read_stream,
*reader_context->audit_json_handler);

// We don't want reading to fail because of EOF during parsing.
// This would break reading of currently open log file, as its
// top-level array isn't closed yet.
if (reader_context->audit_json_read_stream->check_eof_reached()) {
break;
}

if (reader_context->reader->HasParseError()) {
return false;
}
Expand Down
9 changes: 8 additions & 1 deletion components/audit_log_filter/audit_log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@
namespace audit_log_filter {

struct AuditLogReaderArgs {
enum class Command {
ContinueRead,
ReadFromBookmark,
ReadFromTimestamp,
CloseSeq
};

Command command{Command::ContinueRead};
std::string timestamp{};
uint64_t id{0};
uint64_t max_array_length{0};
bool close_read_sequence{false};
};

struct AuditLogReaderContext {
Expand Down
7 changes: 4 additions & 3 deletions components/audit_log_filter/audit_udf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ char *AuditUdf::audit_log_read_udf(AuditUdf *udf [[maybe_unused]],
return result;
}

reader_args->command = AuditLogReaderArgs::Command::ReadFromTimestamp;
reader_args->timestamp = json_doc["start"]["timestamp"].GetString();
reader_args->id = 0;
} else if (has_timestamp_tag) {
if (!json_doc["timestamp"].IsString() || !json_doc["id"].IsUint64()) {
my_error(ER_UDF_ERROR, MYF(0), "audit_log_read",
Expand All @@ -903,6 +903,7 @@ char *AuditUdf::audit_log_read_udf(AuditUdf *udf [[maybe_unused]],
return result;
}

reader_args->command = AuditLogReaderArgs::Command::ReadFromBookmark;
reader_args->timestamp = json_doc["timestamp"].GetString();
reader_args->id = json_doc["id"].GetUint();
}
Expand All @@ -927,7 +928,7 @@ char *AuditUdf::audit_log_read_udf(AuditUdf *udf [[maybe_unused]],
}
}
} else if (json_doc.IsNull()) {
reader_args->close_read_sequence = true;
reader_args->command = AuditLogReaderArgs::Command::CloseSeq;
} else {
my_error(ER_UDF_ERROR, MYF(0), "audit_log_read", "Wrong argument format");
*error = 1;
Expand All @@ -940,7 +941,7 @@ char *AuditUdf::audit_log_read_udf(AuditUdf *udf [[maybe_unused]],
}

if (udf_args->arg_count == 1) {
if (reader_args->close_read_sequence) {
if (reader_args->command == AuditLogReaderArgs::Command::CloseSeq) {
if (reader_context != nullptr) {
log_reader->close_reader_session(reader_context);
SysVars::set_log_reader_context(thd, nullptr);
Expand Down
18 changes: 13 additions & 5 deletions components/audit_log_filter/json_reader/audit_json_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,19 @@ void AuditJsonHandler::clear_current_event() {

bool AuditJsonHandler::check_reading_start_reached() {
if (!m_reading_start_reached) {
m_reading_start_reached = m_reader_context->next_event_bookmark.timestamp ==
m_current_event_bookmark.timestamp &&
(m_reader_context->next_event_bookmark.id == 0 ||
m_reader_context->next_event_bookmark.id ==
m_current_event_bookmark.id);
switch (m_reader_context->batch_reader_args->command) {
case AuditLogReaderArgs::Command::ReadFromBookmark:
m_reading_start_reached =
m_reader_context->next_event_bookmark == m_current_event_bookmark;
break;
case AuditLogReaderArgs::Command::ReadFromTimestamp:
m_reading_start_reached =
m_reader_context->next_event_bookmark.timestamp <=
m_current_event_bookmark.timestamp;
break;
default:
assert(false);
}
}

return m_reading_start_reached;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ FileReaderDecompressing::FileReaderDecompressing(
std::unique_ptr<FileReaderBase> file_reader)
: FileReaderDecoratorBase(std::move(file_reader)) {}

FileReaderDecompressing::~FileReaderDecompressing() {
if (is_opened) {
close();
}
}
FileReaderDecompressing::~FileReaderDecompressing() { close(); }

bool FileReaderDecompressing::init() noexcept {
return FileReaderDecoratorBase::init();
Expand Down Expand Up @@ -66,9 +62,11 @@ bool FileReaderDecompressing::open(FileInfo *file_info) noexcept {
}

void FileReaderDecompressing::close() noexcept {
is_opened = false;
inflateEnd(&m_strm);
FileReaderDecoratorBase::close();
if (is_opened) {
is_opened = false;
inflateEnd(&m_strm);
FileReaderDecoratorBase::close();
}
}

ReadStatus FileReaderDecompressing::read(unsigned char *out_buffer,
Expand All @@ -95,7 +93,7 @@ ReadStatus FileReaderDecompressing::read(unsigned char *out_buffer,

*read_size = out_buffer_size - m_strm.avail_out;

if (ret == Z_STREAM_END) {
if (ret == Z_STREAM_END || status == ReadStatus::Eof) {
status = ReadStatus::Eof;
} else if (ret != Z_OK) {
status = ReadStatus::Error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#include "components/audit_log_filter/json_reader/file_reader_decrypting.h"
#include "components/audit_log_filter/log_writer/file_name.h"

#include "components/audit_log_filter/audit_encryption.h"
#include "components/audit_log_filter/audit_error_log.h"
Expand Down Expand Up @@ -160,6 +161,8 @@ bool FileReaderDecrypting::open(FileInfo *file_info) noexcept {
return false;
}

m_is_rotated = log_writer::FileName::from_path(file_info->name).is_rotated();

return true;
}

Expand Down Expand Up @@ -200,7 +203,7 @@ ReadStatus FileReaderDecrypting::read(unsigned char *out_buffer,

*read_size = decrypted_size;

if (status == ReadStatus::Eof) {
if (status == ReadStatus::Eof && m_is_rotated) {
int final_size = 0;

if (EVP_DecryptFinal(m_ctx, out_buffer + decrypted_size, &final_size) !=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class FileReaderDecrypting final : public FileReaderDecoratorBase {
std::unique_ptr<unsigned char[]> m_iv;
std::unique_ptr<unsigned char[]> m_in_buff;
const size_t m_in_buf_size;
bool m_is_rotated;
};

} // namespace audit_log_filter::json_reader
Expand Down
55 changes: 53 additions & 2 deletions components/audit_log_filter/log_writer/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void LogWriterFile::write(const std::string &record,

void LogWriterFile::do_write(const std::string &record,
bool print_separator) noexcept {
size_t written_size = 0;
std::string payload;
if (print_separator && !m_is_log_empty) {
const auto separator = get_formatter()->get_record_separator();
Expand All @@ -237,9 +238,12 @@ void LogWriterFile::do_write(const std::string &record,
payload.append(record);

m_file_writer->write(payload.c_str(), payload.length());
written_size += payload.length();

SysVars::update_current_log_size(payload.length());
SysVars::update_total_log_size(payload.length());
written_size += write_padding();

SysVars::update_current_log_size(written_size);
SysVars::update_total_log_size(written_size);

if (m_is_log_empty) {
m_is_log_empty = false;
Expand All @@ -258,6 +262,53 @@ void LogWriterFile::do_write(const std::string &record,
}
}

size_t LogWriterFile::write_padding() {
// This function writes whitespace padding after each logged event when
// log file encryption is enabled.
// This is necessary in order to make newly logged events be immediately
// available for reading by AuditLogReader. Padding pushes internal buffer of
// encryption context over the threshold after which EVP_EncryptUpdate is
// guaranteed to produce complete encrypted log event.

size_t written_size = 0;

// We add padding only for formats supported by audit log reader.
// Currently, it's only JSON and JSONL.
if (SysVars::get_format_type() != AuditLogFormatType::Json &&
SysVars::get_format_type() != AuditLogFormatType::Jsonl) {
return written_size;
}

// Padding is only needed for logs encrypted with certain block ciphers.
if (SysVars::get_encryption_type() != AuditLogEncryptionType::Aes) {
return written_size;
}

auto write_spaces = [&](size_t count) {
static constexpr char padding[] = " ";
m_file_writer->write(padding, count);
written_size += count;
};

switch (SysVars::get_compression_type()) {
case AuditLogCompressionType::None:
write_spaces(32);
break;
case AuditLogCompressionType::Gzip:
// Writes need to be done in separate chunks, each producing its own
// gzip block. Otherwise, all spaces would get compressed.
// Five such chunks are enough to produce needed padding.
for (int chunk = 0; chunk < 5; ++chunk) {
write_spaces(2);
}
break;
default:
assert(false);
}

return written_size;
}

uint64_t LogWriterFile::get_log_size() const noexcept {
std::lock_guard<std::mutex> write_guard{m_write_lock};
return do_get_log_size();
Expand Down
7 changes: 7 additions & 0 deletions components/audit_log_filter/log_writer/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class LogWriter<AuditLogHandlerType::File> : public LogWriterBase {
*/
[[nodiscard]] uint64_t do_get_log_size() const noexcept;

/**
* @brief Write whitespace padding.
*
* @return Number of written bytes
*/
size_t write_padding();

/**
* @brief Implement actual file rotation logic.
*
Expand Down
Loading