Commit df4563b9 authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Prepare fiphoboserver pull

parent ddbbf8c0
project(fiphoboserver)
get_filename_component(PHOBOS_INCLUDE_DIRECTORY ../phobos/src/include ABSOLUTE
${CMAKE_CURRENT_SOURCE_DIR})
add_subdirectory(src)
# FIPhoboServer
Build with
```
cmake -DDEBUG=ON -DCMAKE_PREFIX_PATH=/path/to/proxygen /path/to/CMakeLists/file/
make
```
from any directory you want and hope for the best! ;)
Oh, make sure Phobos include directories are under `../phobos/src/include` from the main `CMakeLists.txt`, or change that path in there
if(DEBUG)
add_compile_definitions(DEBUG)
endif(DEBUG)
add_subdirectory(fiphobo)
add_subdirectory(server)
#pragma once
#include <proxygen/lib/http/HTTPMessage.h>
namespace fiphoboserver {
class S3_header {
public:
void setHeaders(std::unique_ptr<proxygen::HTTPMessage> newHeaders)
{
headers = std::move(newHeaders);
}
std::string getBucket()
{
if (!headers) {
return "";
}
std::string path = headers->getPath();
path = path.substr(1, path.size());
std::string bucketName = path.substr(0, path.find('/'));
return bucketName;
}
std::string getKey()
{
if (!headers) {
return "";
}
std::string path = headers->getPath();
path = path.substr(1, path.size());
std::string fileKey = path.substr(path.find('/') + 1, path.size() - 1);
if (fileKey.find('/') != std::string::npos
|| fileKey.find('\\') != std::string::npos) {
std::cout << "Bad Key! " << std::endl;
return "";
}
return fileKey;
}
bool isCreateBucketRequest()
{
if (!headers) {
return false;
}
if (getKey() == "") return true;
return false;
}
size_t getBodyLength()
{
std::string contentLength =
headers->getHeaders().getSingleOrEmpty("Content-Length");
return static_cast<size_t>(std::stoi(contentLength));
}
std::vector<std::pair<std::string, std::string>> getMetaData()
{
std::vector<std::pair<std::string, std::string>> metadata;
std::string metaSearchString = "x-amz-meta-";
headers->getHeaders().forEach([&](const std::string& header,
const std::string& val) {
if (header.find(metaSearchString) != std::string::npos) {
std::string metaName =
header.substr(metaSearchString.size(), header.size() - 1);
std::pair<std::string, std::string> currentPair(metaName, val);
metadata.push_back(currentPair);
}
});
return metadata;
}
private:
std::unique_ptr<proxygen::HTTPMessage> headers;
};
} // namespace fiphoboserver
include_directories(${PHOBOS_INCLUDE_DIRECTORY})
include_directories(/usr/include/glib-2.0)
include_directories(/usr/lib64/glib-2.0/include)
add_subdirectory(phobos_cpp_wrapper)
add_library(
fiphobo
fiphobo.cc
)
if(CUSTOM_OUTPUT_DIRECTORY)
set_target_properties( fiphobo
PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
LIBRARY_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
RUNTIME_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
)
endif(CUSTOM_OUTPUT_DIRECTORY)
target_compile_features(fiphobo PUBLIC cxx_std_14)
target_link_libraries(fiphobo PUBLIC phobos_store)
target_link_libraries(fiphobo PUBLIC phobos_cpp_wrapper)
target_link_libraries(fiphobo PUBLIC proxygen::proxygen)
#pragma once
#include <sstream>
#include <string>
namespace fiphoexceptions {
class PhobosException : std::exception {
public:
PhobosException(std::string callingPhobosFunction, int rc) :
function(callingPhobosFunction),
rc(rc)
{
}
const char* what() const noexcept override
{
std::stringstream ss;
ss << "Phobos had an error in function " << function << ": "
<< std::strerror(rc);
return ss.str().c_str();
}
private:
int rc;
std::string function;
};
class FIFOException : std::exception {
public:
FIFOException(std::string caller, int value) : caller(caller), value(value)
{
}
const char* what() const noexcept override
{
std::stringstream ss;
ss << "FIFO IO had an error in function " << caller << ": "
<< std::strerror(errno);
return ss.str().c_str();
}
private:
int value;
std::string caller;
};
} // namespace fiphoexceptions
#include "fiphobo.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <cstdio>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
namespace fiphoboserver {
FiPhobo::FiPhobo()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
void FiPhobo::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
}
void FiPhobo::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setMetaData" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
});
}
void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = size;
#ifdef DEBUG
std::cout << "Starting async phobos_put_cpp" << std::endl;
#endif
phobos_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
// TODOOOO
// delete [] unconstedObjectID;
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
if (putRunning) {
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
putRunning = true;
if (buffer) {
put_result =
std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
}
void FiPhobo::finishPUT()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishPUT" << std::endl;
#endif
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
int phobosRC = phobos_result.get();
if (phobosRC) {
throw fiphoexceptions::PhobosException("pho_put", phobosRC);
}
putRunning = false;
}
void FiPhobo::startGetOperation(std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
startFIFO();
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = 20662;
#ifdef DEBUG
std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
phobos_result = std::async(
[&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL); });
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
#endif
}
ssize_t FiPhobo::getDataFromFIFO(void *buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
ssize_t rc = read(fifo_descriptor, buf, count);
if (rc < 0) {
#ifdef DEBUG
std::cout << "getDataFromFIFO read failed" << std::endl;
#endif
throw fiphoexceptions::FIFOException("read", rc);
}
#ifdef DEBUG
std::cout << "Count = " << rc << std::endl;
#endif
return rc;
}
FiPhobo::~FiPhobo()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void FiPhobo::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
if (errno != EEXIST) {
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
#ifdef DEBUG
std::cout << "End of Phobos_Layer::startFIFO" << std::endl;
#endif
}
} // namespace fiphoboserver
#pragma once
#include <future>
#include <vector>
#include <folly/io/IOBuf.h>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class FiPhobo {
public:
FiPhobo();
void setBucketName(std::string bucketName);
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(unsigned int size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPUT();
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void *buf, size_t count);
~FiPhobo();
private:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName;
int fifo_descriptor = -1;
std::future<int> phobos_result;
std::future<long int> put_result;
bool putRunning = false;
};
} // namespace fiphoboserver
#include "fiphobo_blocking.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
FiPhobo::FiPhobo()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
void FiPhobo::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
printf("pho_attr_set returned an error: %d\n", rc);
return;
}
}
void FiPhobo::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setMetaData" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
printf("pho_attr_set returned an error: %d\n", rc);
return;
}
});
}
void FiPhobo::startPutOperation(int size, std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = size;
// #ifdef DEBUG
// std::cout << "Starting async phobos_put_cpp" << std::endl;
// #endif
// phobos_result = std::async([&](){
// return phobos_put_cpp(&descriptor, 1, NULL, NULL);
// });
//
// TODOOOO
// delete [] unconstedObjectID;
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
// if(ioRunning)
// {
// int ioRC = io_result.get();
// if(ioRC < 0)
// {
// //TODO error handling
// }
// }
// ioRunning = true;
//
// buffer->coalesce();
// io_result = std::async([&](){
// return write(fifo_descriptor, buffer->data(), buffer->length());
// });
size_t written = write(fifo_descriptor, buffer->data(), buffer->length());
}
void FiPhobo::startGetOperation(std::string& objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
// phobos_result = std::async([&](){
// return phobos_get_cpp(&descriptor, 1, NULL, NULL);
// });
}
std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
// if(ioRunning)
// {
// int ioRC = io_result.get();
// if(ioRC < 0)
// {
// //TODO error handling
// }
// }
// ioRunning = true;
//
// io_result = std::async([&](){
// buffer->unshare();
// return read(fifo_descriptor, buffer->writableData(), size);
// });
int phobos_return = phobos_get_cpp(&descriptor, 1, NULL, NULL);
if (phobos_return < 0) {
std::cout << "GET: schade schade schade " << std::endl;
// error handling
}
lseek(fifo_descriptor, 0, SEEK_SET);
buffer->unshare();
read(fifo_descriptor, buffer->writableData(), size);
return std::move(buffer);
}