Commit 6bb805d2 authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Refactor source code to separate server, fifo, and backend logic

Change Log:
* All source files relating to server remain in src/server
* Old backend source code separated into backend (database) and fifo
files.
* All new source files relating to backend are in src/backend
* All new source files relating to fifo are in src/fifo
* Fixed race condition casing incomplete GET request file
* Discovered hidden issue with blocking FIFO read in GET request
parent dcb20739
......@@ -2,5 +2,6 @@ if(DEBUG)
add_compile_definitions(DEBUG)
endif(DEBUG)
add_subdirectory(fiphobo)
add_subdirectory(backend)
add_subdirectory(fifo)
add_subdirectory(server)
include_directories(${PHOBOS_INCLUDE_DIRECTORY})
include_directories(/usr/include/glib-2.0)
include_directories(/usr/lib64/glib-2.0/include)
add_subdirectory(phobos_cpp_wrapper)
add_library(
backend
backend.cc
)
target_compile_features(backend PUBLIC cxx_std_14)
target_link_libraries(backend PUBLIC phobos_store)
target_link_libraries(backend PUBLIC phobos_cpp_wrapper)
target_link_libraries(backend PUBLIC proxygen::proxygen)
#include "fiphobo.h"
#include "backend.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <vector>
#include <string>
#include <algorithm>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
#include "../fifo/FiPhoExceptions.h"
namespace fiphoboserver {
void PutBackend::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
PhobosDb::PhobosDb()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setMetaData" << std::endl;
std::cout << "PhobosDb::PhobosDb" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
});
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
PhobosDb::~PhobosDb()
{
#ifdef DEBUG
std::cout << "PhobosDb::~PhobosDb" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// // desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
}
void PutBackend::startPutOperation(ssize_t size, std::string objectID)
void PhobosDb::db_put(ssize_t size, std::string objectID, int fd)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
std::cout << "PhobosDb::db_put" << std::endl;
std::cout << "PhobosDb: Object size = " << size << std::endl;
std::cout << "PhobosDb: fifo_descriptor = " << fd << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_fd = fd;
descriptor.xd_size = size;
#ifdef DEBUG
std::cout << "Starting async phobos_put_cpp" << std::endl;
std::cout << "Starting phobos_put_cpp" << std::endl;
#endif
phobos_result = std::async([&]() {
db_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
#ifdef DEBUG
std::cout << "Main Thread id " << std::this_thread::get_id()
<< std::endl;
#endif
// TODOOOO
// delete [] unconstedObjectID;
}
void PutBackend::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
void PhobosDb::db_get(std::string objectID, int fd)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
std::cout << "PhobosDb::db_get" << std::endl;
std::cout << "PhobosDb: fifo_descriptor = " << fd << std::endl;
#endif
if (putRunning) {
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
putRunning = true;
if (buffer) {
put_result =
std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fd;
#ifdef DEBUG
std::cout << "Starting phobos_get_cpp" << std::endl;
#endif
db_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL);
});
}
void PutBackend::finishPut()
void PhobosDb::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishPUT" << std::endl;
std::cout << "PhobosDb::setBucketName" << std::endl;
#endif
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
int phobosRC = phobos_result.get();
if (phobosRC) {
throw fiphoexceptions::PhobosException("pho_put", phobosRC);
}
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
}
putRunning = false;
void PhobosDb::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
std::cout << "PhobosDb::setMetaData" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
});
}
} // namespace fiphoboserver
#pragma once
#include <future>
#include <vector>
#include <string>
#include <algorithm>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class DbInterface {
public:
/* Interface functions */
virtual void db_put(ssize_t size, std::string objectID, int fd) = 0;
virtual void db_get(std::string, int fd) = 0;
/*
* virtual ssize_t db_getmd();
*/
std::future<int> get_db_result() { return std::move(db_result); }
protected:
std::future<int> db_result;
};
class PhobosDb : public DbInterface {
public:
PhobosDb();
~PhobosDb();
void db_put(ssize_t size, std::string objectID, int fd);
void db_get(std::string, int fd);
/*
* ssize_t db_getmd();
*/
void setBucketName(std::string bucketName);
void setMetaData(
std::vector<std::pair<std::string, std::string>> metaData);
private:
struct pho_xfer_desc descriptor = {0};
};
} // namespace fiphoboserver
include_directories(${PHOBOS_INCLUDE_DIRECTORY})
include_directories(/usr/include/glib-2.0)
include_directories(/usr/lib64/glib-2.0/include)
add_subdirectory(phobos_cpp_wrapper)
add_library(
fiphobo
fiphobo.cc
fifops
fifo_ops.cc
fifo_put.cc
fifo_get.cc
)
if(CUSTOM_OUTPUT_DIRECTORY)
set_target_properties( fiphobo
set_target_properties(fifops
PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
LIBRARY_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
......@@ -18,8 +14,6 @@ if(CUSTOM_OUTPUT_DIRECTORY)
)
endif(CUSTOM_OUTPUT_DIRECTORY)
target_compile_features(fiphobo PUBLIC cxx_std_14)
target_compile_features(fifops PUBLIC cxx_std_14)
target_link_libraries(fiphobo PUBLIC phobos_store)
target_link_libraries(fiphobo PUBLIC phobos_cpp_wrapper)
target_link_libraries(fiphobo PUBLIC proxygen::proxygen)
target_link_libraries(fifops PUBLIC proxygen::proxygen)
......@@ -2,6 +2,7 @@
#include <sstream>
#include <string>
#include <cstring>
namespace fiphoexceptions {
......
#include "fiphobo.h"
#include "fifo_ops.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
#include <poll.h>
namespace fiphoboserver {
void GetBackend::startGetOperation(std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
startFIFO();
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = 20662;
#ifdef DEBUG
std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
phobos_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL);
});
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
#endif
}
ssize_t GetBackend::getDataFromFIFO(void* buf, size_t count)
ssize_t FIFOGet::getDataFromFIFO(void* buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
......
#pragma once
#include "fifo_ops.h"
#include <future>
#include <folly/io/IOBuf.h>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
#include <sys/stat.h>
#include <fcntl.h>
#include <iostream>
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class Backend {
public:
Backend();
void setBucketName(std::string bucketName);
~Backend();
protected:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName;
int fifo_descriptor = -1;
std::future<int> phobos_result;
};
Backend::Backend()
FIFO::FIFO()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
startFIFO();
}
Backend::~Backend()
FIFO::~FIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
std::cout << "FIFO::~FIFO" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void Backend::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
}
void Backend::startFIFO()
void FIFO::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
std::cout << "FIFO::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
#ifdef DEBUG
......@@ -77,24 +45,14 @@ void Backend::startFIFO()
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
class PutBackend : public Backend {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(ssize_t size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
private:
std::future<long int> put_result;
bool putRunning = false;
};
class GetBackend : public Backend {
public:
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void* buf, size_t count);
};
} // namespace fiphoboserver
#pragma once
#include <future>
#include <vector>
#include <folly/io/IOBuf.h>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
#include "FiPhoExceptions.h"
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class Backend {
class FIFO {
public:
Backend();
FIFO();
void setBucketName(std::string bucketName);
~Backend();
int get_fifo_descriptor() { return fifo_descriptor; }
~FIFO();
protected:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName;
int fifo_descriptor = -1;
std::future<int> phobos_result;
};
class PutBackend : public Backend {
class FIFOPut : public FIFO {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(ssize_t size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
......@@ -40,10 +30,13 @@ class PutBackend : public Backend {
bool putRunning = false;
};
class GetBackend : public Backend {
class FIFOGet: public FIFO {
public:
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void* buf, size_t count);
/*
* finishGet();
*/
};
} // namespace fiphoboserver
} // namespace fiphoboserver
#include "fifo_ops.h"
#include <cstdio>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
void FIFOPut::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "FIFOPut::addDataToFIFO" << std::endl;
#endif
if (putRunning) {
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
putRunning = true;
if (buffer) {
put_result =
std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
}
void FIFOPut::finishPut()
{
#ifdef DEBUG
std::cout << "FIFOPut::finishPUT" << std::endl;
#endif
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
putRunning = false;
}
} // namespace fiphoboserver
#include "fiphobo.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
namespace fiphoboserver {
Backend::Backend()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
Backend::~Backend()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void Backend::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());