Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ FetchContent_Declare(
GIT_TAG v1.4.0
)

FetchContent_Declare(
mtcl
GIT_REPOSITORY https://github.com/ParaGroup/MTCL
GIT_TAG e5f2bfeea0fc3d704554c7df02e7857f0a00bbba
)

#####################################
# Targets
#####################################
Expand Down
2 changes: 2 additions & 0 deletions capio/common/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3;

constexpr const int CAPIO_SERVER_NR_REQUEST = 4;

constexpr const int BACKEND_HAVE_FINISH_SEND_REQUEST = 4;

#endif // CAPIO_COMMON_REQUESTS_HPP
3 changes: 2 additions & 1 deletion capio/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ FetchContent_Declare(
set(ARGS_BUILD_EXAMPLE OFF CACHE INTERNAL "")
set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "")

FetchContent_MakeAvailable(args capio_cl)
FetchContent_MakeAvailable(args capio_cl mtcl)

#####################################
# Target definition
Expand All @@ -39,6 +39,7 @@ target_include_directories(${TARGET_NAME} PRIVATE
${MPI_INCLUDE_PATH}
${args_SOURCE_DIR}
${capio_cl_SOURCE_DIR}
${mtcl_SOURCE_DIR}/include
)

#####################################
Expand Down
5 changes: 4 additions & 1 deletion capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/requests.hpp"
#include "common/semaphore.hpp"
#include "remote/backend.hpp"
#include "remote/discovery.hpp"
#include "storage/capio_file.hpp"
#include "utils/common.hpp"
#include "utils/env.hpp"
Expand All @@ -39,6 +40,7 @@
ClientManager *client_manager;
StorageManager *storage_manager;
Backend *backend;
DiscoveryService *discovery_service;

#include "handlers.hpp"
#include "utils/location.hpp"
Expand Down Expand Up @@ -289,6 +291,8 @@ int main(int argc, char **argv) {

std::cout << CAPIO_LOG_SERVER_BANNER;

discovery_service = new DiscoveryService();

parseCLI(argc, argv);

START_LOG(gettid(), "call()");
Expand All @@ -306,6 +310,5 @@ int main(int argc, char **argv) {
server_thread.join();
remote_listener_thread.join();

delete backend;
return 0;
}
72 changes: 72 additions & 0 deletions capio/server/include/remote/atomic_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

#ifndef CAPIO_BACKEND_ATOMIC_QUEUE_HPP
#define CAPIO_BACKEND_ATOMIC_QUEUE_HPP

#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <string>

template <typename T> struct AtomicQueueElement {

AtomicQueueElement(T message, size_t message_size, const std::string &origin) {
this->object = message;
this->object_size = message_size;
this->target_or_source = origin;
}

T object;
size_t object_size = 0;
std::string target_or_source;
};

template <typename T> class AtomicQueue {
std::queue<AtomicQueueElement<T>> _queue;
std::mutex _mutex;
std::condition_variable _lock_cond;

bool _shutdown = false;

public:
~AtomicQueue() {
{
std::lock_guard lg(_mutex);
_shutdown = true;
}
_lock_cond.notify_all();
}

void push(T message, size_t message_size, const std::string &origin) {
{
std::lock_guard lg(_mutex);
if (_shutdown) {
return;
}
_queue.emplace(message, message_size, origin);
}
_lock_cond.notify_all();
}

AtomicQueueElement<T> pop() {
std::unique_lock lock(_mutex);
_lock_cond.wait(lock, [this] { return !_queue.empty() || _shutdown; });
auto s = std::move(_queue.front());
_queue.pop();

return s;
}

std::optional<AtomicQueueElement<T>> try_pop() {
std::lock_guard lg(_mutex);
if (_queue.empty() || _shutdown) {
return std::nullopt;
}

auto s = std::move(_queue.front());
_queue.pop();
return s;
}
};

#endif // CAPIO_BACKEND_ATOMIC_QUEUE_HPP
6 changes: 6 additions & 0 deletions capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class Backend {
* @param target
*/
virtual void send_request(const char *message, int message_len, const std::string &target) = 0;

/**
* Connect this server instance to a remote server instance
* @param target Remote server instance identification
*/
virtual void connect_to(const std::string &target) = 0;
};

#endif // CAPIO_SERVER_REMOTE_BACKEND_HPP
2 changes: 2 additions & 0 deletions capio/server/include/remote/backend/include.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
*/

#include "mpi.hpp"
#include "mtcl.hpp"
#include "none.hpp"

#endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MPIBackend : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};

class MPISYNCBackend final : public MPIBackend {
Expand Down
92 changes: 92 additions & 0 deletions capio/server/include/remote/backend/mtcl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#ifndef MTCL_BACKEND_HPP
#define MTCL_BACKEND_HPP

#include <condition_variable>
#include <filesystem>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <thread>
#include <unistd.h>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/constants.hpp"
#include "common/logger.hpp"
#include "remote/atomic_queue.hpp"
#include "remote/backend.hpp"

#include <shared_mutex>

typedef unsigned long long int capio_off64_t;

/**
* This avoids it to include the MTCL library here as it is a header-only library.
* this is equivalent to use extern in C but for class
*/
namespace MTCL {
class HandleUser;
}

// TODO: extend backend class
class MTCLBackend : public Backend {

int thread_sleep_times = 0;
bool continue_execution = true;

const std::string selfToken, ownPort, usedProtocol;

std::shared_mutex open_connections_lock;
std::unordered_map<std::string, AtomicQueue<const char *> *> open_connections;

std::thread *incoming_connection_thread = nullptr;
std::vector<std::thread *> connection_threads;

AtomicQueue<std::string> incoming_request_queue;

/**
* Waits for incoming new requests to connect to new server instances. When a new request
* arrives, it then handshakes with the remote servers, opening a new connection, and starting a
* new thread that will handle remote requests. If no request arrives within the sleep_time
* parameter, then the method will issue an advertisement on UDP multicast of its alive state
* so that other servers may instantiate a new connection with me.
*
* @param ownPort
* @param usedProtocol
* @param continue_execution
* @param sleep_time
* @param open_connections
* @param open_connection_guard
* @param _connection_threads
* @param incoming_request_queue
*/
void static incomingMTCLConnectionListener(
const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution,
int sleep_time,
std::unordered_map<std::string, AtomicQueue<const char *> *> *open_connections,
std::shared_mutex *open_connection_guard, std::vector<std::thread *> *_connection_threads,
AtomicQueue<std::string> *incoming_request_queue);

public:
explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time);

~MTCLBackend() override;

RemoteRequest read_next_request() override;

void handshake_servers() override;

const std::set<std::string> get_nodes() override;

void send_request(const char *message, int message_len, const std::string &target) override;

void send_file(char *shm, long int nbytes, const std::string &target) override;

void recv_file(char *shm, const std::string &source, long int bytes_expected) override;

void connect_to(const std::string &target_token) override;
};

#endif // MTCL_BACKEND_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/none.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ class NoneBackend final : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};
#endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP
37 changes: 37 additions & 0 deletions capio/server/include/remote/discovery.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef CAPIO_DISCOVERY_HPP
#define CAPIO_DISCOVERY_HPP

#include <string>
#include <thread>

class DiscoveryService {
bool terminate = false;

/// @brief Handle for thread listening for other server instances
std::thread *listener_thread = nullptr;
/// @brief Handle for thread advertising this server instance
std::thread *advertisement_thread = nullptr;

/// @brief Token to be advertised by this server
std::string advertisement_token;

public:
DiscoveryService() = default;
~DiscoveryService();

/**
* Set the token to be advertised so that other server instance may connect to this instance.
* Token needs to be provided by an instance of a backend, according to backend specification
* for incoming connection
* @param token
*/
void setAdvertisementToken(const std::string &token);

/**
* Start to advertise the token, and to scan for tokens from other servers
* @param adv_delay Delay between each advertisement.
*/
void start(unsigned int adv_delay);
};

#endif // CAPIO_DISCOVERY_HPP
9 changes: 9 additions & 0 deletions capio/server/include/remote/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char *
return new MPIBackend(argc, argv);
}

if (backend_name == "mtcl") {
LOG("backend selected: MTCL");
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Starting CAPIO with MTCL backend"
<< std::endl;
char hostname[HOST_NAME_MAX]{0};
gethostname(hostname, HOST_NAME_MAX);
return new MTCLBackend("TCP", "1234", 1000000);
}

if (backend_name == "mpisync") {
LOG("backend selected: mpisync");
std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Starting CAPIO with MPI (SYNC) backend"
Expand Down
1 change: 1 addition & 0 deletions capio/server/include/utils/signals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) {
#endif

delete backend;
delete discovery_service;
delete shm_canary;

std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "shutdown completed" << std::endl;
Expand Down
Loading
Loading