diff --git a/CMakeLists.txt b/CMakeLists.txt index 3dfebc424..9412ee3cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,12 @@ FetchContent_Declare( GIT_TAG v1.4.0 ) +FetchContent_Declare( + mtcl + GIT_REPOSITORY https://github.com/ParaGroup/MTCL + GIT_TAG e5f2bfeea0fc3d704554c7df02e7857f0a00bbba +) + ##################################### # Targets ##################################### diff --git a/capio/common/requests.hpp b/capio/common/requests.hpp index 2ae70083a..e73f8a8c6 100644 --- a/capio/common/requests.hpp +++ b/capio/common/requests.hpp @@ -37,4 +37,6 @@ constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3; constexpr const int CAPIO_SERVER_NR_REQUEST = 4; +constexpr const int BACKEND_HAVE_FINISH_SEND_REQUEST = 4; + #endif // CAPIO_COMMON_REQUESTS_HPP diff --git a/capio/server/CMakeLists.txt b/capio/server/CMakeLists.txt index 7d6788fa5..d674b46ee 100644 --- a/capio/server/CMakeLists.txt +++ b/capio/server/CMakeLists.txt @@ -19,7 +19,7 @@ FetchContent_Declare( set(ARGS_BUILD_EXAMPLE OFF CACHE INTERNAL "") set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "") -FetchContent_MakeAvailable(args capio_cl) +FetchContent_MakeAvailable(args capio_cl mtcl) ##################################### # Target definition @@ -39,6 +39,7 @@ target_include_directories(${TARGET_NAME} PRIVATE ${MPI_INCLUDE_PATH} ${args_SOURCE_DIR} ${capio_cl_SOURCE_DIR} + ${mtcl_SOURCE_DIR}/include ) ##################################### diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index c5fb1e841..6c4f7d8b5 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -31,6 +31,7 @@ #include "common/requests.hpp" #include "common/semaphore.hpp" #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "storage/capio_file.hpp" #include "utils/common.hpp" #include "utils/env.hpp" @@ -39,6 +40,7 @@ ClientManager *client_manager; StorageManager *storage_manager; Backend *backend; +DiscoveryService *discovery_service; #include "handlers.hpp" #include "utils/location.hpp" @@ -289,6 +291,8 @@ int main(int argc, char **argv) { std::cout << CAPIO_LOG_SERVER_BANNER; + discovery_service = new DiscoveryService(); + parseCLI(argc, argv); START_LOG(gettid(), "call()"); @@ -306,6 +310,5 @@ int main(int argc, char **argv) { server_thread.join(); remote_listener_thread.join(); - delete backend; return 0; } \ No newline at end of file diff --git a/capio/server/include/remote/atomic_queue.hpp b/capio/server/include/remote/atomic_queue.hpp new file mode 100644 index 000000000..258dd1dc2 --- /dev/null +++ b/capio/server/include/remote/atomic_queue.hpp @@ -0,0 +1,72 @@ + +#ifndef CAPIO_BACKEND_ATOMIC_QUEUE_HPP +#define CAPIO_BACKEND_ATOMIC_QUEUE_HPP + +#include +#include +#include +#include +#include + +template struct AtomicQueueElement { + + AtomicQueueElement(T message, size_t message_size, const std::string &origin) { + this->object = message; + this->object_size = message_size; + this->target_or_source = origin; + } + + T object; + size_t object_size = 0; + std::string target_or_source; +}; + +template class AtomicQueue { + std::queue> _queue; + std::mutex _mutex; + std::condition_variable _lock_cond; + + bool _shutdown = false; + + public: + ~AtomicQueue() { + { + std::lock_guard lg(_mutex); + _shutdown = true; + } + _lock_cond.notify_all(); + } + + void push(T message, size_t message_size, const std::string &origin) { + { + std::lock_guard lg(_mutex); + if (_shutdown) { + return; + } + _queue.emplace(message, message_size, origin); + } + _lock_cond.notify_all(); + } + + AtomicQueueElement pop() { + std::unique_lock lock(_mutex); + _lock_cond.wait(lock, [this] { return !_queue.empty() || _shutdown; }); + auto s = std::move(_queue.front()); + _queue.pop(); + + return s; + } + + std::optional> try_pop() { + std::lock_guard lg(_mutex); + if (_queue.empty() || _shutdown) { + return std::nullopt; + } + + auto s = std::move(_queue.front()); + _queue.pop(); + return s; + } +}; + +#endif // CAPIO_BACKEND_ATOMIC_QUEUE_HPP diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index f1c827821..50b336b73 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -87,6 +87,12 @@ class Backend { * @param target */ virtual void send_request(const char *message, int message_len, const std::string &target) = 0; + + /** + * Connect this server instance to a remote server instance + * @param target Remote server instance identification + */ + virtual void connect_to(const std::string &target) = 0; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_HPP diff --git a/capio/server/include/remote/backend/include.hpp b/capio/server/include/remote/backend/include.hpp index 5518624cf..7f04735c4 100644 --- a/capio/server/include/remote/backend/include.hpp +++ b/capio/server/include/remote/backend/include.hpp @@ -5,5 +5,7 @@ */ #include "mpi.hpp" +#include "mtcl.hpp" #include "none.hpp" + #endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index fca12a752..be185f01e 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -27,6 +27,7 @@ class MPIBackend : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; class MPISYNCBackend final : public MPIBackend { diff --git a/capio/server/include/remote/backend/mtcl.hpp b/capio/server/include/remote/backend/mtcl.hpp new file mode 100644 index 000000000..c90da12ba --- /dev/null +++ b/capio/server/include/remote/backend/mtcl.hpp @@ -0,0 +1,92 @@ +#ifndef MTCL_BACKEND_HPP +#define MTCL_BACKEND_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/constants.hpp" +#include "common/logger.hpp" +#include "remote/atomic_queue.hpp" +#include "remote/backend.hpp" + +#include + +typedef unsigned long long int capio_off64_t; + +/** + * This avoids it to include the MTCL library here as it is a header-only library. + * this is equivalent to use extern in C but for class + */ +namespace MTCL { +class HandleUser; +} + +// TODO: extend backend class +class MTCLBackend : public Backend { + + int thread_sleep_times = 0; + bool continue_execution = true; + + const std::string selfToken, ownPort, usedProtocol; + + std::shared_mutex open_connections_lock; + std::unordered_map *> open_connections; + + std::thread *incoming_connection_thread = nullptr; + std::vector connection_threads; + + AtomicQueue incoming_request_queue; + + /** + * Waits for incoming new requests to connect to new server instances. When a new request + * arrives, it then handshakes with the remote servers, opening a new connection, and starting a + * new thread that will handle remote requests. If no request arrives within the sleep_time + * parameter, then the method will issue an advertisement on UDP multicast of its alive state + * so that other servers may instantiate a new connection with me. + * + * @param ownPort + * @param usedProtocol + * @param continue_execution + * @param sleep_time + * @param open_connections + * @param open_connection_guard + * @param _connection_threads + * @param incoming_request_queue + */ + void static incomingMTCLConnectionListener( + const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution, + int sleep_time, + std::unordered_map *> *open_connections, + std::shared_mutex *open_connection_guard, std::vector *_connection_threads, + AtomicQueue *incoming_request_queue); + + public: + explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); + + ~MTCLBackend() override; + + RemoteRequest read_next_request() override; + + void handshake_servers() override; + + const std::set get_nodes() override; + + void send_request(const char *message, int message_len, const std::string &target) override; + + void send_file(char *shm, long int nbytes, const std::string &target) override; + + void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + + void connect_to(const std::string &target_token) override; +}; + +#endif // MTCL_BACKEND_HPP \ No newline at end of file diff --git a/capio/server/include/remote/backend/none.hpp b/capio/server/include/remote/backend/none.hpp index 3faae205f..c0cd2298a 100644 --- a/capio/server/include/remote/backend/none.hpp +++ b/capio/server/include/remote/backend/none.hpp @@ -11,5 +11,6 @@ class NoneBackend final : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp new file mode 100644 index 000000000..5a5e7b345 --- /dev/null +++ b/capio/server/include/remote/discovery.hpp @@ -0,0 +1,37 @@ +#ifndef CAPIO_DISCOVERY_HPP +#define CAPIO_DISCOVERY_HPP + +#include +#include + +class DiscoveryService { + bool terminate = false; + + /// @brief Handle for thread listening for other server instances + std::thread *listener_thread = nullptr; + /// @brief Handle for thread advertising this server instance + std::thread *advertisement_thread = nullptr; + + /// @brief Token to be advertised by this server + std::string advertisement_token; + + public: + DiscoveryService() = default; + ~DiscoveryService(); + + /** + * Set the token to be advertised so that other server instance may connect to this instance. + * Token needs to be provided by an instance of a backend, according to backend specification + * for incoming connection + * @param token + */ + void setAdvertisementToken(const std::string &token); + + /** + * Start to advertise the token, and to scan for tokens from other servers + * @param adv_delay Delay between each advertisement. + */ + void start(unsigned int adv_delay); +}; + +#endif // CAPIO_DISCOVERY_HPP diff --git a/capio/server/include/remote/listener.hpp b/capio/server/include/remote/listener.hpp index 43a133bfb..b95cd1460 100644 --- a/capio/server/include/remote/listener.hpp +++ b/capio/server/include/remote/listener.hpp @@ -40,6 +40,15 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char * return new MPIBackend(argc, argv); } + if (backend_name == "mtcl") { + LOG("backend selected: MTCL"); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Starting CAPIO with MTCL backend" + << std::endl; + char hostname[HOST_NAME_MAX]{0}; + gethostname(hostname, HOST_NAME_MAX); + return new MTCLBackend("TCP", "1234", 1000000); + } + if (backend_name == "mpisync") { LOG("backend selected: mpisync"); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Starting CAPIO with MPI (SYNC) backend" diff --git a/capio/server/include/utils/signals.hpp b/capio/server/include/utils/signals.hpp index 4919b04fb..b12b66cdb 100644 --- a/capio/server/include/utils/signals.hpp +++ b/capio/server/include/utils/signals.hpp @@ -35,6 +35,7 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) { #endif delete backend; + delete discovery_service; delete shm_canary; std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "shutdown completed" << std::endl; diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp new file mode 100644 index 000000000..cba080fe6 --- /dev/null +++ b/capio/server/src/discovery_service.cpp @@ -0,0 +1,98 @@ +#include +#include + +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "remote/discovery.hpp" +#include "utils/common.hpp" + +extern Backend *backend; + +constexpr char CAPIO_MULTICAST_ADDRESS[] = "224.0.0.2"; +constexpr int CAPIO_MULTICAST_PORT = 22334; +int REUSE_MCAST_SOCKET = 1; + +void advertise(const bool *terminate, const unsigned int delay_ms, + const std::string &advertisement_token) { + const int advert_sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in advert_multicast_addr{}; + advert_multicast_addr.sin_family = AF_INET; + advert_multicast_addr.sin_port = htons(CAPIO_MULTICAST_PORT); + advert_multicast_addr.sin_addr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + + while (!*terminate) { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + sendto(advert_sock_fd, advertisement_token.data(), advertisement_token.size(), 0, + reinterpret_cast(&advert_multicast_addr), sizeof(advert_multicast_addr)); + } + + close(advert_sock_fd); +} + +void thread_discovery_service(const bool *terminate) { + START_LOG(gettid(), "call()"); + + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &REUSE_MCAST_SOCKET, sizeof(REUSE_MCAST_SOCKET)); + + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 100000; // 100,000 microseconds = 100ms + setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + sockaddr_in local_addr{}; + local_addr.sin_family = AF_INET; + local_addr.sin_port = htons(CAPIO_MULTICAST_PORT); + local_addr.sin_addr.s_addr = htonl(INADDR_ANY); + bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)); + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + + char incoming_token[2 * HOST_NAME_MAX] = {0}; + + while (!*terminate) { + + bzero(incoming_token, 2 * HOST_NAME_MAX); + + if (recvfrom(sockfd, incoming_token, sizeof(incoming_token) - 1, 0, nullptr, nullptr) > 0) { + backend->connect_to(incoming_token); + } + } + close(sockfd); +} + +void DiscoveryService::start(unsigned int adv_delay) { + if (advertisement_token.empty()) { + throw std::runtime_error("Advertisement token is empty"); + } + + listener_thread = new std::thread(thread_discovery_service, &terminate); + advertisement_thread = + new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token)); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "DiscoveryService will advertise " + + advertisement_token + " every " + + std::to_string(adv_delay) + "ms."); +} + +DiscoveryService::~DiscoveryService() { + terminate = true; + + if (listener_thread->joinable()) { + listener_thread->join(); + listener_thread = nullptr; + } + + if (advertisement_thread->joinable()) { + advertisement_thread->join(); + advertisement_thread = nullptr; + } +} + +void DiscoveryService::setAdvertisementToken(const std::string &token) { + this->advertisement_token = token; +} diff --git a/capio/server/src/mpi_backend.cpp b/capio/server/src/mpi_backend.cpp index 9d8fc770a..3fe2eab62 100644 --- a/capio/server/src/mpi_backend.cpp +++ b/capio/server/src/mpi_backend.cpp @@ -114,6 +114,12 @@ void MPIBackend::recv_file(char *shm, const std::string &source, long int bytes_ } } +void MPIBackend::connect_to(const std::string &target) { + START_LOG(gettid(), "call(target=%s)", target.c_str()); + LOG("connect_to called on backend that is not dynamic. ignoring call"); + return; +} + MPISYNCBackend::MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { START_LOG(gettid(), "call()"); LOG("Wrapped MPI backend with MPISYC backend"); diff --git a/capio/server/src/mtcl_backend.cpp b/capio/server/src/mtcl_backend.cpp new file mode 100644 index 000000000..ea63ee392 --- /dev/null +++ b/capio/server/src/mtcl_backend.cpp @@ -0,0 +1,318 @@ +#include "common/logger.hpp" +#include "common/requests.hpp" +#include "remote/backend.hpp" +#include "remote/backend/mtcl.hpp" +#include "remote/discovery.hpp" +#include "storage/manager.hpp" +#include "utils/common.hpp" + +#include +#include + +// TODO: THERE IS A MASSIVE MEMORY LEAK WHEN SENDING AND RECEIVING CONST CHAR*. FIX IT BEFORE MERGE + +// TODO: CLI args (with defaults) instead of hardcoded values + +constexpr int max_net_op = 10; + +extern Backend *backend; +extern DiscoveryService *discovery_service; +extern StorageManager *storage_service; + +RemoteRequest MTCLBackend::read_next_request() { + START_LOG(gettid(), "call()"); + + auto optional_request = incoming_request_queue.try_pop(); + while (!optional_request.has_value()) { + std::this_thread::sleep_for(std::chrono::milliseconds(thread_sleep_times)); + optional_request = incoming_request_queue.try_pop(); + } + + auto [req, req_size, source] = optional_request.value(); + LOG("Received %s from %d", req.c_str(), source.c_str()); + return {req.data(), source}; +} + +/** + * @brief Manages a dedicated P2P connection to a single remote capio_server instance. + * * The communication logic follows a deterministic role-assignment algorithm: + * 1. **Initial Role Assignment:** The initial sender is determined by the lexicographical + * comparison of the two participating hostnames (the smaller hostname starts as sender). + * 2. **Operational Phases:** The thread executes alternating phases of sending and receiving, + * processing up to `max_net_op` operations per phase. + * 3. **Role Switching:** Nodes synchronize a role swap using a `HAVE_FINISH_SEND_REQUEST` + * signal. This occurs when the current sender either exhausts its message queue or reaches + * the `max_net_op` threshold. + * 4. **Termination:** The loop persists as long as the remote handle remains valid and the + * `terminate` flag is false. + * @param HandlerPointer A valid MTCL HandlePointer for the connection. + * @param remote_hostname The hostname of the remote endpoint. + * @param queue Pointer to the communication queue containing inbound and outbound sub-queues. + * @param sleep_time Microseconds to sleep between thread cycles to prevent CPU pinning. + * @param continue_execution Reference to a boolean flag to know when to stop execution + * to signal execution shutdown. + * @param incoming_request_queue + */ +void serverConnectionHandler(MTCL::HandleUser HandlerPointer, const std::string &remote_hostname, + AtomicQueue *queue, const int sleep_time, + const bool *continue_execution, + AtomicQueue *incoming_request_queue) { + + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + bool my_turn_to_send = ownHostname > remote_hostname; + + char request_has_finished_to_send[CAPIO_REQ_MAX_SIZE]{0}; + sprintf(request_has_finished_to_send, "%03d", BACKEND_HAVE_FINISH_SEND_REQUEST); + + START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); + + LOG("Will begin execution with %s phase", my_turn_to_send ? "sending" : "receiving"); + + while (HandlerPointer.isValid()) { + // execute up to N operation of send &/or receive, to avoid starvation + + if (my_turn_to_send) { + LOG("Send PHASE"); + for (int i = 0; i < max_net_op; i++) { + if (const auto request_opt = queue->try_pop(); request_opt.has_value()) { + const auto &[request, request_size, target] = request_opt.value(); + LOG("Request to be sent = %s to %s", request, target.c_str()); + + HandlerPointer.send(&request_size, sizeof(request_size)); + HandlerPointer.send(request, request_size); + } + } + LOG("Completed SEND PHASE"); + // Send message I have finished the max number of allowed consecutive io operations + HandlerPointer.send(request_has_finished_to_send, sizeof(request_has_finished_to_send)); + + } else { + + bool continue_receive_phase = true; + size_t receive_size = 0; + LOG("Receive PHASE"); + while (continue_receive_phase) { + // Receive phase + HandlerPointer.probe(receive_size, false); + if (receive_size > 0) { + LOG("A request is incoming"); + + ssize_t incoming_request_size = 0; + HandlerPointer.receive(&incoming_request_size, sizeof(incoming_request_size)); + + const auto incoming_request = new char[incoming_request_size]; + const auto resp_size = + HandlerPointer.receive(incoming_request, incoming_request_size); + LOG("Received request with size = %ld", incoming_request_size); + + if (const auto code = + RemoteRequest{incoming_request, remote_hostname}.get_code(); + code == BACKEND_HAVE_FINISH_SEND_REQUEST) { + // Finished sending data. Set continue_receive_phase = false to go to next + // phase + LOG("CTRL MSG received: Other has finished sending phase. Switching me " + "from receive to send"); + continue_receive_phase = false; + } else { + incoming_request_queue->push(incoming_request, resp_size, remote_hostname); + } + } + } + } + + // terminate phase + if (!*continue_execution) { + LOG("[TERM PHASE] Closing connection"); + HandlerPointer.close(); + LOG("[TERM PHASE] Terminating thread server_connection_handler"); + return; + } + + my_turn_to_send = !my_turn_to_send; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } +} + +void MTCLBackend::incomingMTCLConnectionListener( + const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution, + int sleep_time, std::unordered_map *> *open_connections, + std::shared_mutex *open_connection_guard, std::vector *_connection_threads, + AtomicQueue *incoming_request_queue) { + + START_LOG(gettid(), "call(sleep_time=%d)", sleep_time); + + while (*continue_execution) { + + if (auto UserManager = MTCL::Manager::getNext(std::chrono::microseconds(sleep_time)); + UserManager.isValid()) { + // received MTCL handle + LOG("Handle user is valid"); + size_t remoteHostnameSize = -1; + if (UserManager.receive(&remoteHostnameSize, sizeof(remoteHostnameSize)) <= 0 || + remoteHostnameSize == 0 || remoteHostnameSize > HOST_NAME_MAX) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Remote hostname size received is zero or negative"); + UserManager.close(); + continue; + } + + std::string remote_hostname(remoteHostnameSize, '\0'); + UserManager.receive(remote_hostname.data(), remoteHostnameSize); + LOG("Received connection hostname: %s", remote_hostname.c_str()); + + auto *queue = new AtomicQueue(); + { + const std::unique_lock lock(*open_connection_guard); + (*open_connections)[remote_hostname] = queue; + } + _connection_threads->push_back( + new std::thread(serverConnectionHandler, std::move(UserManager), remote_hostname, + queue, sleep_time, continue_execution, incoming_request_queue)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Connected to " + usedProtocol + ":" + + remote_hostname + ":" + ownPort + + " (incoming)"); + } + } +} + +MTCLBackend::MTCLBackend(const std::string &proto, const std::string &port, const int sleep_time) + : Backend(HOST_NAME_MAX), thread_sleep_times(sleep_time), selfToken(proto + ":0.0.0.0:" + port), + ownPort(port), usedProtocol(proto) { + START_LOG(gettid(), "INFO: instance of MTCLBackend"); + + LOG("My hostname is %s. Starting to listen on connection %s", node_name.c_str(), + selfToken.c_str()); + + std::string hostname_id("server-"); + hostname_id += node_name; + MTCL::Manager::init(hostname_id); + + MTCL::Manager::listen(selfToken); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend initialization completed."); + + discovery_service->setAdvertisementToken(usedProtocol + ":" + node_name + ":" + ownPort); + discovery_service->start(1000); +} + +MTCLBackend::~MTCLBackend() { + START_LOG(gettid(), "call()"); + continue_execution = false; + + incoming_connection_thread->join(); + + for (const auto t : connection_threads) { + t->join(); + } + LOG("Terminated connection threads"); + + delete incoming_connection_thread; + + LOG("Handler closed."); + + MTCL::Manager::finalize(); + LOG("Finalizing MTCL backend"); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL backend cleanup completed."); +} + +void MTCLBackend::handshake_servers() { + incoming_connection_thread = + new std::thread(incomingMTCLConnectionListener, ownPort, usedProtocol, &continue_execution, + thread_sleep_times, &open_connections, &open_connections_lock, + &connection_threads, &incoming_request_queue); +} + +const std::set MTCLBackend::get_nodes() { + std::set keys; + shared_lock_guard slg(open_connections_lock); + for (const auto &[hostname, _handle] : open_connections) { + keys.insert(hostname); + } + + return keys; +} + +void MTCLBackend::send_request(const char *message, const int message_len, + const std::string &target) { + + START_LOG(gettid(), "call(target=%s, message=%s, message_len=%ld)", target.c_str(), message, + message_len); + + shared_lock_guard slg(open_connections_lock); + const auto queues = open_connections.at(target); + LOG("obtained access to queue"); + + queues->push(message, message_len, target); + LOG("Request pushed to output queue"); +} + +void MTCLBackend::send_file(char *shm, long int nbytes, const std::string &target) { + START_LOG(gettid(), "call(target=%s, nbytes=%ld)", target.c_str(), nbytes); + + shared_lock_guard slg(open_connections_lock); + const auto queue = open_connections.at(target); + queue->push(shm, nbytes, target); +} + +void MTCLBackend::recv_file(char *shm, const std::string &source, long int bytes_expected) { + shared_lock_guard slg(open_connections_lock); + const auto queues = open_connections.at(source); + const auto data = queues->pop(); + memcpy(shm, data.object, bytes_expected); +} + +void MTCLBackend::connect_to(const std::string &target_token) { + START_LOG(gettid(), "call(target=%s)", target_token.c_str()); + + if (std::string(target_token) == selfToken) { + LOG("Skipping to connect to self"); + return; + } + + std::string remoteHostname = + target_token.substr(target_token.find(':') + 1, // Drop proto + target_token.find_last_of(':') - target_token.find(':') - 1 // drop port + ); + + /* + * Connect to remote only if its hostname is lexically smaller than self hostname + * If current server hostname is equal to remoteHostname, avoid connection + * TODO: extend this to support also different workflows on same nodes. (NB: right now we expect + different MCAST groups ) + */ + if (node_name >= remoteHostname) { + return; + } + + { + shared_lock_guard slg(open_connections_lock); + if (open_connections.find(remoteHostname) != open_connections.end()) { + LOG("Remote host %s is already connected", remoteHostname.c_str()); + return; + } + } + + if (auto UserManager = MTCL::Manager::connect(target_token); UserManager.isValid()) { + LOG("Opened connection with: %s", target_token.c_str()); + + // send my hostname + const size_t ownHostnameLen = node_name.size(); + UserManager.send(&ownHostnameLen, sizeof(ownHostnameLen)); + UserManager.send(node_name.c_str(), ownHostnameLen); + + auto *queue = new AtomicQueue(); + { + const std::lock_guard lg(open_connections_lock); + open_connections[remoteHostname] = queue; + } + connection_threads.push_back( + new std::thread(serverConnectionHandler, std::move(UserManager), remoteHostname, queue, + thread_sleep_times, &continue_execution, &incoming_request_queue)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Connected to " + target_token + " (outgoing)"); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Warning: tried to connect to " + + std::string(remoteHostname) + + " but connection is not valid"); + } +} diff --git a/capio/server/src/none_backend.cpp b/capio/server/src/none_backend.cpp index f4f81a28b..f33357b77 100644 --- a/capio/server/src/none_backend.cpp +++ b/capio/server/src/none_backend.cpp @@ -26,4 +26,6 @@ void NoneBackend::send_request(const char *message, const int message_len, void NoneBackend::recv_file(char *shm, const std::string &source, const long int bytes_expected) { START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), bytes_expected); -} \ No newline at end of file +} + +void NoneBackend::connect_to(const std::string &target) {} \ No newline at end of file diff --git a/capio/tests/unit/server/CMakeLists.txt b/capio/tests/unit/server/CMakeLists.txt index 42f3f35fa..d9b6ac1a6 100644 --- a/capio/tests/unit/server/CMakeLists.txt +++ b/capio/tests/unit/server/CMakeLists.txt @@ -4,7 +4,7 @@ set(TARGET_NAME capio_server_unit_tests) find_package(MPI REQUIRED) -FetchContent_MakeAvailable(capio_cl) +FetchContent_MakeAvailable(capio_cl mtcl) set(TARGET_INCLUDE_FOLDER "${PROJECT_SOURCE_DIR}/capio/server") @@ -32,6 +32,7 @@ target_sources(${TARGET_NAME} PRIVATE target_include_directories(${TARGET_NAME} PRIVATE "${TARGET_INCLUDE_FOLDER}/include" ${capio_cl_SOURCE_DIR} + ${mtcl_SOURCE_DIR}/include ) ##################################### diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index 4a9abb4d1..57cefbae1 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -338,6 +338,7 @@ class MockBackend : public Backend { RemoteRequest read_next_request() override { return {nullptr, ""}; } void send_file(char *shm, long int nbytes, const std::string &target) override {} void send_request(const char *message, int message_len, const std::string &target) override {} + void connect_to(const std::string &target) override {} }; class MockBackendTestFixture : public ::testing::Test { diff --git a/capio/tests/unit/server/src/main.cpp b/capio/tests/unit/server/src/main.cpp index 28cb277e4..83be6583c 100644 --- a/capio/tests/unit/server/src/main.cpp +++ b/capio/tests/unit/server/src/main.cpp @@ -3,6 +3,7 @@ #include "capiocl.hpp" #include "capiocl/engine.h" #include "client-manager/client_manager.hpp" +#include "remote/discovery.hpp" #include "storage/manager.hpp" #include "utils/capiocl_adapter.hpp" #include "utils/location.hpp" @@ -11,6 +12,7 @@ capiocl::engine::Engine *capio_cl_engine = nullptr; StorageManager *storage_manager = nullptr; ClientManager *client_manager = nullptr; Backend *backend = nullptr; +DiscoveryService *discovery_service = nullptr; const capiocl::engine::Engine &CapioCLEngine::get() { return *capio_cl_engine; } @@ -19,15 +21,17 @@ class ServerUnitTestEnvironment : public testing::Environment { explicit ServerUnitTestEnvironment() = default; void SetUp() override { - capio_cl_engine = new capiocl::engine::Engine(false); - client_manager = new ClientManager(); - storage_manager = new StorageManager(); + capio_cl_engine = new capiocl::engine::Engine(false); + client_manager = new ClientManager(); + storage_manager = new StorageManager(); + discovery_service = new DiscoveryService(); } void TearDown() override { delete storage_manager; delete client_manager; delete capio_cl_engine; + delete discovery_service; } }; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..327ab3e44 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,49 @@ +services: + node1: + image: hpio/capio:latest + container_name: node1 + hostname: "node1" + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_private_net: + ipv4_address: 10.10.0.10 + environment: + - CAPIO_LOG_LEVEL=-1 + - APP_TYPE=writer + - CAPIO_DIR=. + command: | + capio_server -b mtcl --no-config + + node2: + image: hpio/capio:latest + container_name: node2 + hostname: "node2" + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_private_net: + ipv4_address: 10.10.0.11 + environment: + - CAPIO_LOG_LEVEL=-1 + - APP_TYPE=reader + - CAPIO_DIR=. + command: | + capio_server -b mtcl --no-config + +volumes: + shared_data: + +networks: + capio_private_net: + driver: macvlan + driver_opts: + parent: dummy0 + ipam: + config: + - subnet: 10.10.0.0/24 + gateway: 10.10.0.1 \ No newline at end of file