Commit df959ea4 authored by Sophie Wenzel-Teuber's avatar Sophie Wenzel-Teuber
Browse files

Clean up a bit and add Exceptions

parent 7c547c4a
......@@ -22,4 +22,4 @@ target_compile_features(fiphobo PUBLIC cxx_std_14)
target_link_libraries(fiphobo PUBLIC phobos_store)
target_link_libraries(fiphobo PUBLIC phobos_cpp_wrapper)
target_link_libraries(fiphobo PUBLIC proxygen::proxygen)
\ No newline at end of file
target_link_libraries(fiphobo PUBLIC proxygen::proxygen)
#pragma once
#include <string>
#include <sstream>
namespace fiphoexceptions{
class PhobosException : std::exception
{
public:
PhobosException(std::string callingPhobosFunction, int rc) :
function(callingPhobosFunction),
rc(rc)
{}
const char* what() const noexcept override
{
std::stringstream ss;
ss <<"Phobos had an error in function " << function << ": " << std::strerror(rc);
return ss.str().c_str();
}
private:
int rc;
std::string function;
};
class FIFOException : std::exception
{
public:
FIFOException(std::string caller, int value) :
caller(caller),
value(value)
{}
const char* what() const noexcept override
{
std::stringstream ss;
ss <<"FIFO IO had an error in function " << caller << ": " << std::strerror(errno);
return ss.str().c_str();
}
private:
int value;
std::string caller;
};
}
\ No newline at end of file
......@@ -9,6 +9,8 @@
#include <stdlib.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
namespace fiphoboserver
{
......@@ -26,8 +28,7 @@ void FiPhobo::setBucketName(std::string bucketName)
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if(rc)
{
printf("pho_attr_set returned an error: %d\n", rc);
return;
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
}
......@@ -42,8 +43,7 @@ void FiPhobo::setMetaData(std::vector<std::pair<std::string, std::string>> metaD
pair.second.c_str());
if(rc)
{
printf("pho_attr_set returned an error: %d\n", rc);
return;
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
});
}
......@@ -67,55 +67,58 @@ void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
std::cout << "Starting async phobos_put_cpp" << std::endl;
#endif
phobos_result = std::async([&](){
std::cout << "Phobos Thread id " << std::this_thread::get_id() << std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
//TODOOOO
// delete [] unconstedObjectID;
}
static long int fifoWriteTemp(std::unique_ptr<folly::IOBuf> buffer, int fifo_descriptor){
std::cout << "IO Thread id " << std::this_thread::get_id() << std::endl;
return write(fifo_descriptor, buffer->data(), buffer->length());
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
if(ioRunning)
if(putRunning)
{
try
{
int ioRC = io_result.get();
if(ioRC < 0)
{
std::cout << "io async did return a bad result!" << std::endl;
//TODO error handling
}
}
catch(std::exception& e)
int ioRC = put_result.get();
if(ioRC < 0)
{
std::cerr << "Excpetion: " << e.what() << std::endl;
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
ioRunning = true;
putRunning = true;
if(buffer && buffer->data())
if(buffer)
{
// char* tmp = new char[buffer->length()];
// memcpy(tmp, buffer->data(), buffer->length());
// std::cout << tmp << std::endl;
// buffer->coalesce();
io_result = std::async(fifoWriteTemp, std::move(buffer), fifo_descriptor);
put_result = std::async([fd = fifo_descriptor, buffer = std::move(buffer)]{
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
}
void FiPhobo::finishPUT()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishPUT" << std::endl;
#endif
int ioRC = put_result.get();
if(ioRC < 0)
{
throw fiphoexceptions::FIFOException("write", ioRC);
}
int phobosRC = phobos_result.get();
if(phobosRC)
{
throw fiphoexceptions::PhobosException("pho_put", phobosRC);
}
putRunning = false;
}
void FiPhobo::startGetOperation(std::string& objectID)
{
#ifdef DEBUG
......@@ -123,7 +126,8 @@ void FiPhobo::startGetOperation(std::string& objectID)
#endif
startFIFO();
char* unconstedObjectID = (char*) objectID.c_str();
char *unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_objid = unconstedObjectID;
......@@ -139,61 +143,35 @@ std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
if(ioRunning)
{
try
{
int ioRC = io_result.get();
if(ioRC < 0)
{
std::cout << "io async did return a bad result!" << std::endl;
//TODO error handling
}
}
catch(std::exception& e)
get_result = std::async([&]{
std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
int readRC = read(fifo_descriptor, buffer->writableData(), size);
if(readRC < 0)
{
std::cerr << "Excpetion: " << e.what() << std::endl;
throw fiphoexceptions::FIFOException("read", readRC);
}
}
ioRunning = true;
io_result = std::async([&](){
buffer->unshare();
return read(fifo_descriptor, buffer->writableData(), size);
return std::move(buffer);
});
return std::move(buffer);
}
void FiPhobo::finishIO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishIO" << std::endl;
#endif
std::unique_ptr<folly::IOBuf> buffer;
try
{
int ioRC = io_result.get();
if(ioRC < 0)
{
std::cout << "io async did return a bad result!" << std::endl;
//TODO error handling
}
int phobosRC = phobos_result.get();
if(phobosRC)
{
std::cout << "phobos async did return a bad result!" << std::endl;
//TODO error handling!
}
buffer = std::move(get_result.get());
}
//TODO: Does this make any sense or is it ok if I don't catch this here but
// let the caller catch it - what he has to do anyway?
catch(fiphoexceptions::FIFOException & fifoException){
throw fifoException;
}
catch(std::exception& e)
int phobosRC = phobos_result.get();
if(phobosRC)
{
std::cerr << "Excpetion: " << e.what() << std::endl;
throw fiphoexceptions::PhobosException("pho_get", phobosRC);
}
ioRunning = false;
return std::move(buffer);
}
FiPhobo::~FiPhobo()
......@@ -221,14 +199,19 @@ void FiPhobo::startFIFO()
int rc = mkfifo(fifoName.c_str(), 0777);
if(rc < 0)
{
printf("mkfifo returned an error: %s\n", std::strerror(errno));
// return;
if(errno != EEXIST)
{
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0)
{
std::cout << "Could not open fifo: " << fifoName << std::endl;
std::stringstream ss;
ss <<"open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
......
......@@ -25,12 +25,12 @@ public:
void startPutOperation(unsigned int size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPUT();
void startGetOperation(std::string& objectID);
std::unique_ptr<folly::IOBuf> getDataFromFIFO(size_t size);
void finishIO();
~FiPhobo();
......@@ -42,8 +42,9 @@ private:
std::string fifoName = "/tmp/fiphoboserver_fifotemp";
int fifo_descriptor = -1;
std::future<int> phobos_result;
std::future<long int> io_result;
bool ioRunning = false;
std::future<long int> put_result;
std::future<std::unique_ptr<folly::IOBuf> > get_result;
bool putRunning = false;
};
} // namespace fiphoboserver
......@@ -14,6 +14,7 @@
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../S3_header.h"
#include "../fiphobo/FiPhoExceptions.h"
namespace fiphoboserver {
......@@ -31,7 +32,6 @@ void PushRequestHandler::onRequest(
std::cout << path << '\n';
headers->getHeaders().forEach([&] (const std::string& header, const std::string& val) {
// builder.header("OH:" + header, val);
std::cout << header << ": " << val << '\n';
});
#endif
......@@ -52,10 +52,6 @@ void PushRequestHandler::onRequest(
try {
#ifdef DEBUG
std::cout << "Set Values from headers" << std::endl;
#endif
#ifdef DEBUG
std::cout << "Bucket: " << s3_header.getBucket() << '\n';
std::cout << "Key: " << s3_header.getKey() << '\n';
......@@ -69,9 +65,7 @@ void PushRequestHandler::onRequest(
#endif
phobos.setBucketName(s3_header.getBucket());
phobos.setMetaData(s3_header.getMetaData());
// std::string key = s3_header.getKey();
#ifdef DEBUG
std::cout << "Send put to phobos layer" << std::endl;
#endif
......@@ -86,19 +80,42 @@ void PushRequestHandler::onRequest(
.sendWithEOM();
return;
}
catch (const fiphoexceptions::PhobosException& ex) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(ex.what())
.sendWithEOM();
return;
}
}
void PushRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
{
// if (body_) {
// body_->prependChain(std::move(body));
// } else {
// body_ = std::move(body);
// }
#ifdef DEBUG
std::cout << "Adding data to FIFO... " << std::endl;
if(body)
{
body->coalesce();
size_t totalBodyLength = body->computeChainDataLength();
std::cout << "Body Data Length " << totalBodyLength << '\n';
char* buffer = new char[totalBodyLength];
memcpy(buffer, body->data(), totalBodyLength);
std::cout << buffer << '\n';
std::cout << "End of body part! \n" << std::endl;
}
#endif
phobos.addDataToFIFO(std::move(body));
try
{
phobos.addDataToFIFO(std::move(body));
}
catch(fiphoexceptions::FIFOException& fifoExcp)
{
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(fifoExcp.what())
.sendWithEOM();
return;
}
proxygen::ResponseBuilder(downstream_)
.status(100, "Continue")
......@@ -107,30 +124,26 @@ void PushRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
void PushRequestHandler::onEOM() noexcept
{
#ifdef DEBUG
if(body_)
{
body_->coalesce();
size_t totalBodyLength = body_->computeChainDataLength();
std::cout << "Body Data Length " << totalBodyLength << '\n';
char* buffer = new char[totalBodyLength];
memcpy(buffer, body_->data(), totalBodyLength);
std::cout << buffer << '\n';
std::cout << "End of body! \n" << std::endl;
}
#endif
try
{
#ifdef DEBUG
std::cout << "Finish IO..." << std::endl;
#endif
phobos.finishIO();
phobos.finishPUT();
}
catch(fiphoexceptions::FIFOException& fifoExcp)
{
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(fifoExcp.what())
.sendWithEOM();
return;
}
catch(std::exception)
catch(fiphoexceptions::PhobosException& phoExcp)
{
//TODO some error, depending on what went wrong
proxygen::ResponseBuilder(downstream_)
.status(200, "OK")
.status(409, "Conflict")
.body(phoExcp.what())
.sendWithEOM();
return;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment