Commit 38c5e5fe authored by Noah Orensa's avatar Noah Orensa
Browse files

fix runaway allocations in distributed unit test due to concurrent allocation and message send/recv

parent cf262165
......@@ -28,15 +28,16 @@ private:
size_t _allocateCount = 0;
size_t _freeCount = 0;
static thread_local bool _locked;
static thread_local size_t _locked;
inline bool _enter() {
if (! _track || _locked) return false;
_locked = true;
++_locked;
return true;
}
inline void _exit() {
_locked = false;
--_locked;
}
bool _canTrackAlloc(const CallStack &callstack);
......@@ -53,6 +54,14 @@ public:
_track = val;
}
inline void lock() {
++_locked;
}
inline void unlock() {
--_locked;
}
void track(void *ptr, size_t size);
void retrack(void *oldPtr, void *newPtr, size_t newSize);
......
......@@ -26,6 +26,8 @@ private:
void _exit();
inline void _fit(size_t sz) {
_enter();
size_t len = _buf - (uint8_t *) _allocBuf;
size_t rem = _allocLen - len;
if (rem < sz) {
......@@ -33,6 +35,8 @@ private:
_allocBuf = realloc(_allocBuf, _allocLen);
_buf = (uint8_t *) _allocBuf + len;
}
_exit();
}
inline void _dispose() {
......@@ -64,13 +68,9 @@ private:
public:
inline Message(size_t len = _DEFAULT_BUFFER_SIZE) {
_enter();
_allocBuf = malloc(len);
_allocLen = len;
_buf = (uint8_t *) _allocBuf + sizeof(size_t);
_exit();
}
inline Message(const Message &rhs) {
......@@ -133,46 +133,30 @@ public:
}
inline Message & put(const void *data, size_t len) {
_enter();
_fit(len);
memcpy(_buf, data, len);
_buf += len;
_exit();
return *this;
}
template <typename T>
inline Message & operator<<(const T &x) {
_enter();
_fit(sizeof(T));
*((T *) _buf) = x;
_buf += sizeof(T);
_exit();
return *this;
}
inline void * get(void *data, size_t len) {
_enter();
memcpy(data, _buf, len);
_buf += len;
_exit();
return data;
}
template <typename T>
inline Message & operator>>(T &x) {
_enter();
x = *((T *) _buf);
_buf += sizeof(T);
_exit();
return *this;
}
......
......@@ -23,8 +23,10 @@ private:
double _chance = 1;
uint64_t _holeDuration = 0;
static thread_local size_t _locked;
inline bool _enter() {
if (! _track) return false;
if (! _track || _locked) return false;
_mtx.lock();
return true;
}
......@@ -37,6 +39,14 @@ public:
Network();
inline void lock() {
++_locked;
}
inline void unlock() {
--_locked;
}
inline void trackActivity(bool val) {
_track = val;
}
......
......@@ -106,6 +106,10 @@ public:
void exitAll();
void lock();
void unlock();
bool run(
uint64_t timeoutNanos,
const std::function<void()> &func,
......
......@@ -6,7 +6,7 @@
using namespace dtest;
thread_local bool Memory::_locked = false;
thread_local size_t Memory::_locked = false;
namespace dtest {
Memory *_mmgr_instance = nullptr;
......
......@@ -4,11 +4,11 @@
using namespace dtest;
void Message::_enter() {
sandbox().exit();
sandbox().lock();
}
void Message::_exit() {
sandbox().enter();
sandbox().unlock();
}
void Message::send(Socket &socket) {
......
......@@ -4,6 +4,8 @@
using namespace dtest;
thread_local size_t Network::_locked = false;
static Network *instance = nullptr;
Network::Network()
......
......@@ -158,6 +158,16 @@ void Sandbox::exitAll() {
_mtx.unlock();
}
void Sandbox::lock() {
_memory.lock();
_network.lock();
}
void Sandbox::unlock() {
_memory.unlock();
_network.unlock();
}
bool Sandbox::run(
uint64_t timeoutNanos,
const std::function<void()> &func,
......
......@@ -547,28 +547,28 @@ void DriverContext::_join(Test *test) {
}
Message DriverContext::createUserMessage() {
sandbox().exit();
sandbox().lock();
Message m;
m << OpCode::USER_MESSAGE;
sandbox().enter();
sandbox().unlock();
return m;
}
void DriverContext::sendUserMessage(Message &message) {
sandbox().exit();
sandbox().lock();
for (auto &w : _allocatedWorkers) {
message.send(w.second._socket);
}
sandbox().enter();
sandbox().unlock();
}
Message DriverContext::getUserMessage() {
sandbox().exit();
sandbox().lock();
while (_userMessages.empty()) {
_waitForEvent();
......@@ -576,23 +576,23 @@ Message DriverContext::getUserMessage() {
Message m = std::move(_userMessages.front());
_userMessages.pop_front();
sandbox().enter();
sandbox().unlock();
return m;
}
void DriverContext::notify() {
sandbox().exit();
sandbox().lock();
for (auto &w : _allocatedWorkers) {
w.second.notify();
}
sandbox().enter();
sandbox().unlock();
}
void DriverContext::wait(uint32_t n) {
sandbox().exit();
sandbox().lock();
if (n == -1u) n = _allocatedWorkers.size();
......@@ -622,7 +622,7 @@ void DriverContext::wait(uint32_t n) {
}
delete &pulled;
sandbox().enter();
sandbox().unlock();
}
Lazy<DriverContext> DriverContext::instance([] { return new DriverContext(); });
......
#include <dtest.h>
#include <thread>
module("distributed-unit-test")
.dependsOn({
......@@ -265,3 +266,29 @@ dunit("distributed-unit-test", "udp-faulty")
}
close(fd);
});
dunit("distributed-unit-test", "runaway-allocations-during-message")
.workers(1)
.driver([] {
volatile bool run = true;
auto t = std::thread([&run] {
while (run) {
void *ptr = malloc(1);
free(ptr);
}
});
int x = 5;
for (auto i = 0; i < 1000; ++i) {
dtest_sendMsg(x);
}
run = false;
t.join();
})
.worker([] {
int x;
for (auto i = 0; i < 1000; ++i) {
dtest_recvMsg() >> x;
}
});
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment