Commit 13fccc8a authored by Noah Orensa's avatar Noah Orensa
Browse files

Merge branch 'feature/remote-workers'

parents cb0bf6ab 5dd5da9a
......@@ -62,3 +62,7 @@ bin/
# log files
*.log
*.log.*
# script environments and configurations
sbin/.env
sbin/workers
......@@ -108,7 +108,12 @@ public:
static std::string ipv4_to_str(const sockaddr &addr);
static sockaddr str_to_ipv4(const std::string &ip, uint16_t port);
static sockaddr str_to_ipv4(const std::string &host, const std::string &port);
static sockaddr str_to_ipv4(const std::string &str);
static uint16_t get_port(const sockaddr &addr);
static sockaddr set_port(const sockaddr &addr, uint16_t port);
};
struct sockaddr_in_hash {
......
......@@ -307,6 +307,11 @@ private:
Socket _socket;
Socket _superSocket;
uint16_t _port = 0;
sockaddr _address;
sockaddr _superAddress;
DriverContext() = default;
WorkerHandle _spawnWorker();
......@@ -326,6 +331,15 @@ private:
void _join(Test *test);
public:
void setPort(uint16_t port) {
_port = port;
}
void setAddress(const char *address);
void addWorker(uint32_t id) {
_workers[id] = WorkerHandle(id);
}
Message createUserMessage() override;
......@@ -353,8 +367,6 @@ private:
uint32_t _notifyCount = 0;
std::list<Message> _userMessages;
Socket _socket;
Socket _driverSocket;
Socket _superDriverSocket;
std::unordered_map<std::string, Test *> _tests;
bool _inTest = false;
......
CLUSTER_USER=
HOME_DIR=
PORT=11110
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd $DIR
source ./.env
cd ..
EXEC=$1
TEST_DIR=$2
#remove empty lines from sbin/workers
sed -i '/^$/d' sbin/workers
for m in $(cat sbin/workers) ; do
ssh -A $CLUSTER_USER@$m "sh -c 'mkdir -p $HOME_DIR/tmp'" &
done
wait
for m in $(cat sbin/workers) ; do
scp -r $TEST_DIR $CLUSTER_USER@$m:$HOME_DIR/tmp > /dev/null &
done
wait
$EXEC --port $PORT --workers $(wc -l sbin/workers | awk '{ print $1 }') $TEST_DIR &
DRIVER=$!
id=0
WORKERS=()
for m in $(cat sbin/workers) ; do
ssh -A $CLUSTER_USER@$m "sh -c 'nohup $HOME_DIR/$EXEC --driver $(hostname):$PORT --worker-id $id $HOME_DIR/tmp > /dev/null 2>&1 & echo \$! > $HOME_DIR/tmp/pid'" &
let id=id+1
done
wait $DRIVER
for m in $(cat sbin/workers) ; do
ssh -A $CLUSTER_USER@$m "sh -c 'kill \$(cat $HOME_DIR/tmp/pid) ; rm -rf $HOME_DIR/tmp'" &
done
wait
......@@ -15,6 +15,9 @@ using namespace dtest;
static std::vector<std::string> dynamicTests;
bool runWorker = false;
uint32_t workerId = 0;
static void loadTests(const char *path) {
std::cerr << "Loading " << path << "\n";
......@@ -61,19 +64,80 @@ static void findTests(const char *path) {
}
}
int main(int argc, char *argv[]) {
void printHelp() {
std::cout <<
"Usage: dtest <options> <test directories or files>\n"
"\n"
"Options\n"
" --port <port-num> Specifies the port number for the test driver.\n"
" --workers <num-workers> Specifies the number of remote workers available.\n"
" --driver <address> Connects to a remote test driver at <address>.\n"
" --worker-id <id> Runs a test worker, using <id> as its unique\n"
" identifier.\n"
"\n\n"
;
}
char cwd[PATH_MAX];
getcwd(cwd, PATH_MAX);
void parseArguments(int argc, char *argv[], const char *cwd) {
bool gotTestLocation = false;
if (argc > 1) {
for (int i = 1; i < argc; ++i) {
for (int i = 0; i < argc; ++i) {
if (argv[i][0] == '-' || strncmp(argv[i], "--", 2) == 0) {
if (strcasecmp(argv[i], "--port") == 0) {
DriverContext::instance->setPort(atoi(argv[++i]));
}
else if (strcasecmp(argv[i], "--workers") == 0) {
uint32_t numWorkers = atoi(argv[++i]);
for (uint32_t i = 0; i < numWorkers; ++i) {
DriverContext::instance->addWorker(i);
}
}
else if (strcasecmp(argv[i], "--driver") == 0) {
runWorker = true;
DriverContext::instance->setAddress(argv[++i]);
}
else if (strcasecmp(argv[i], "--worker-id") == 0) {
workerId = atoi(argv[++i]);
}
else if (strcasecmp(argv[i], "-h") == 0 || strcasecmp(argv[i], "--help") == 0) {
printHelp();
exit(0);
}
else {
std::cerr << "Unknown option '" << argv[i] << "'\n\n";
exit(1);
}
}
else if (access(argv[i], F_OK) == 0) {
findTests(argv[i]);
gotTestLocation = true;
}
else {
std::cerr << "Argument '" << argv[i] << "' is not a valid option or test location.\n\n";
exit(1);
}
}
else {
if (! gotTestLocation) {
findTests(cwd);
}
}
int main(int argc, char *argv[]) {
char cwd[PATH_MAX];
getcwd(cwd, PATH_MAX);
parseArguments(argc - 1, argv + 1, cwd);
if (runWorker) {
try {
Test::runWorker(workerId);
exit(0);
}
catch (...) {
exit(1);
}
}
Test::logStatsToStderr(true);
......
......@@ -31,6 +31,7 @@ static TrackingException allocEx[] = {
{ 2, "GOMP_parallel", 0x26 },
{ 2, "GOMP_parallel", 0x2a },
{ 2, "GOMP_parallel", 0x3a },
{ 2, "GOMP_parallel", 0x3d },
{ 2, "GOMP_parallel", 0x41 },
{ 1, "__tls_get_addr", 0x38 },
{ 1, "__tls_get_addr", 0x3c },
......
......@@ -8,6 +8,7 @@
#include <unistd.h>
#include <poll.h>
#include <unordered_map>
#include <netdb.h>
using namespace dtest;
......@@ -149,6 +150,9 @@ Socket::Socket(uint16_t port, int maxWaitingQueueLength) {
std::string("Error getting socket address. ") + strerror(errno)
);
}
port = get_port(_addr);
_addr = self_address_ipv4(port);
}
void Socket::send(void *data, size_t len) {
......@@ -267,6 +271,9 @@ Socket * Socket::pollOrAcceptOrTimeout() {
_openConnections[incoming] = new Socket(incoming, addr);
}
}
else if ((p.revents & POLLIN)) {
incoming = p.fd;
}
else if (
(p.revents & POLLRDHUP)
|| (p.revents & POLLERR)
......@@ -278,9 +285,6 @@ Socket * Socket::pollOrAcceptOrTimeout() {
delete c;
_openConnections.erase(p.fd);
}
else if ((p.revents & POLLIN)) {
incoming = p.fd;
}
}
return (incoming == -1) ? nullptr : _openConnections[incoming];
......@@ -355,20 +359,58 @@ sockaddr Socket::str_to_ipv4(const std::string &ip, uint16_t port) {
return addr;
}
sockaddr Socket::str_to_ipv4(const std::string &host, const std::string &port) {
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = 0;
hints.ai_flags = 0;
hints.ai_protocol = 0;
addrinfo *result;
int s = getaddrinfo(host.c_str(), port.c_str(), &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
if (result != nullptr) {
sockaddr addr = *result->ai_addr;
freeaddrinfo(result);
return addr;
}
freeaddrinfo(result);
throw std::invalid_argument(
"Failed to resolve host '" + host + ':' + port + "'."
);
}
sockaddr Socket::str_to_ipv4(const std::string &str) {
char ip[INET_ADDRSTRLEN + 6];
strncpy(ip, str.c_str(), INET_ADDRSTRLEN + 6);
char *colon = std::strchr(ip, ':');
char host[INET_ADDRSTRLEN + 6];
strncpy(host, str.c_str(), INET_ADDRSTRLEN + 6);
char *colon = std::strchr(host, ':');
if (colon == NULL) {
throw std::invalid_argument(
"Error parsing IP address '" + std::string(ip) + "'. "
"Error parsing address '" + std::string(host) + "'. "
);
}
*colon = '\0';
uint16_t port = std::atoi(colon + 1);
return str_to_ipv4(host, colon + 1);
}
return str_to_ipv4(ip, port);
uint16_t Socket::get_port(const sockaddr &addr) {
sockaddr_in *addr_in = (sockaddr_in *) &addr;
return ntohs(addr_in->sin_port);
}
sockaddr Socket::set_port(const sockaddr &addr, uint16_t port) {
sockaddr a = addr;
sockaddr_in *addr_in = (sockaddr_in *) &a;
addr_in->sin_port = htons(port);
return a;
}
......@@ -407,8 +407,11 @@ DriverContext::WorkerHandle DriverContext::_spawnWorker() {
}
void DriverContext::_start() {
_socket = Socket(0, 128);
_superSocket = Socket(0, 128);
_socket = Socket(_port, 128);
_address = _socket.address();
_superSocket = Socket(_port == 0 ? 0 : _port + 1, 128);
_superAddress = _superSocket.address();
}
uint32_t DriverContext::_waitForSuperEvent() {
......@@ -546,6 +549,13 @@ void DriverContext::_join(Test *test) {
}
}
void DriverContext::setAddress(const char *address) {
_address = Socket::str_to_ipv4(address);
_port = Socket::get_port(_address);
_superAddress = Socket::set_port(_address, _port == 0 ? 0 : _port + 1);
}
Message DriverContext::createUserMessage() {
sandbox().lock();
......@@ -633,12 +643,11 @@ void WorkerContext::_start(uint32_t id) {
_id = id;
_socket = Socket(0, 128);
_driverSocket = Socket(DriverContext::instance->_socket.address());
_superDriverSocket = Socket(DriverContext::instance->_superSocket.address());
auto superDriverSocket = Socket(DriverContext::instance->_superAddress);
Message m;
m << OpCode::WORKER_STARTED << _id << _socket.address();
m.send(_superDriverSocket);
m.send(superDriverSocket);
}
void WorkerContext::_waitForEvent() {
......@@ -678,9 +687,10 @@ void WorkerContext::_waitForEvent() {
Test *t = (*it)->copy();
t->_run();
auto superDriverSocket = Socket(DriverContext::instance->_superAddress);
Message m;
m << OpCode::FINISHED_TEST << _id << t->_status << t->_detailedReport;
m.send(_superDriverSocket);
m.send(superDriverSocket);
}
_inTest = false;
......@@ -723,7 +733,10 @@ Message WorkerContext::createUserMessage() {
void WorkerContext::sendUserMessage(Message &message) {
sandbox().lock();
message.send(_driverSocket);
{
auto driverSocket = Socket(DriverContext::instance->_address);
message.send(driverSocket);
}
sandbox().unlock();
}
......@@ -745,9 +758,12 @@ Message WorkerContext::getUserMessage() {
void WorkerContext::notify() {
sandbox().lock();
Message m;
m << OpCode::NOTIFY << _id;
m.send(_driverSocket);
{
auto driverSocket = Socket(DriverContext::instance->_address);
Message m;
m << OpCode::NOTIFY << _id;
m.send(driverSocket);
}
sandbox().unlock();
}
......
#include <dtest.h>
#include <thread>
#include <dtest_core/socket.h>
module("distributed-unit-test")
.dependsOn({
......@@ -158,7 +159,7 @@ static sockaddr get_sock_addr(int fd) {
sockaddr addr;
socklen_t len = sizeof(addr);
assert(getsockname(fd, &addr, &len) != -1);
return addr;
return dtest::Socket::self_address_ipv4(dtest::Socket::get_port(addr));
}
dunit("distributed-unit-test", "tcp")
......
......@@ -25,20 +25,20 @@ perf("performance-test", "too-slow")
perf("performance-test", "pass-ratio")
.performanceMarginAsBaselineRatio(0.7)
.body([] {
for (int i = 0; i < 4000000; ++i);
for (int i = 0; i < 20000000; ++i);
})
.baseline([] {
for (int i = 0; i < 8000000; ++i);
for (int i = 0; i < 80000000; ++i);
});
perf("performance-test", "too-slow-ratio")
.performanceMarginAsBaselineRatio(0.7)
.expect(Status::TOO_SLOW)
.body([] {
for (int i = 0; i < 8000000; ++i);
for (int i = 0; i < 80000000; ++i);
})
.baseline([] {
for (int i = 0; i < 4000000; ++i);
for (int i = 0; i < 20000000; ++i);
});
perf("performance-test", "error-msg")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment