Commit 35d03487 authored by Aaron Dees's avatar Aaron Dees
Browse files

Merge branch 'refactor' into 'devel'

Refactor

See merge request oilgas/ltfs/fiphoboserver!5
parents 08fdc907 9a8740af
......@@ -2,6 +2,6 @@ if(DEBUG)
add_compile_definitions(DEBUG)
endif(DEBUG)
add_subdirectory(backend)
add_subdirectory(fifo)
add_subdirectory(storage)
add_subdirectory(stream)
add_subdirectory(server)
#pragma once
#include <algorithm>
#include <future>
#include <string>
#include <vector>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class DbInterface {
public:
/* Interface functions */
virtual int db_put(
ssize_t size, std::string objectID, std::string fileName) = 0;
virtual int db_get(std::string objectID, std::string fileName) = 0;
/*
* virtual ssize_t db_getmd();
*/
void openFile(std::string fileName, int flags);
void closeFile();
protected:
int file_descriptor = -1;
};
class PhobosDb : public DbInterface {
public:
PhobosDb();
~PhobosDb();
int db_put(ssize_t size, std::string objectID, std::string fileName);
int db_get(std::string, std::string fileName);
/*
* ssize_t db_getmd();
*/
void setBucketName(std::string bucketName);
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
private:
struct pho_xfer_desc descriptor = {0};
};
} // namespace fiphoboserver
#include "fifo_ops.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
void FIFOGet::openFIFO()
{
#ifdef DEBUG
std::cout << "FIFOGet::openFIFO" << std::endl;
#endif
fifo_descriptor = open(fifoName.c_str(), O_RDONLY);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
ssize_t FIFOGet::getDataFromFIFO(void* buf, size_t count)
{
#ifdef DEBUG
std::cout << "FIFOGet::getDataFromFIFO" << std::endl;
#endif
ssize_t rc = read(fifo_descriptor, buf, count);
if (rc < 0) {
#ifdef DEBUG
std::cout << "getDataFromFIFO read failed" << std::endl;
#endif
throw fiphoexceptions::FIFOException("read", rc);
}
return rc;
}
} // namespace fiphoboserver
#include "fifo_ops.h"
#include <fcntl.h>
#include <iostream>
#include <sys/stat.h>
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
FIFO::FIFO()
{
#ifdef DEBUG
std::cout << "FIFO::FIFO" << std::endl;
#endif
startFIFO();
}
FIFO::~FIFO()
{
#ifdef DEBUG
std::cout << "FIFO::~FIFO" << std::endl;
#endif
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
}
void FIFO::startFIFO()
{
#ifdef DEBUG
std::cout << "FIFO::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
if (errno != EEXIST) {
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
}
} // namespace fiphoboserver
#pragma once
#include "FiPhoExceptions.h"
#include <folly/io/IOBuf.h>
#include <future>
namespace fiphoboserver {
class FIFO {
public:
FIFO();
void setBucketName(std::string bucketName);
std::string getFifoName() { return fifoName; }
virtual void openFIFO() = 0;
~FIFO();
protected:
void startFIFO();
std::string fifoName;
int fifo_descriptor = -1;
};
class FIFOPut : public FIFO {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void openFIFO();
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
private:
std::future<long int> put_result;
bool putRunning = false;
};
class FIFOGet : public FIFO {
public:
void openFIFO();
ssize_t getDataFromFIFO(void* buf, size_t count);
/*
* finishGet();
*/
};
} // namespace fiphoboserver
#include "fifo_ops.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
void FIFOPut::openFIFO()
{
#ifdef DEBUG
std::cout << "FIFOPut::openFIFO" << std::endl;
#endif
fifo_descriptor = open(fifoName.c_str(), O_WRONLY);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
void FIFOPut::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "FIFOPut::addDataToFIFO" << std::endl;
#endif
if (putRunning) {
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
putRunning = true;
if (buffer) {
put_result =
std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
}
void FIFOPut::finishPut()
{
#ifdef DEBUG
std::cout << "FIFOPut::finishPUT" << std::endl;
#endif
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
putRunning = false;
}
} // namespace fiphoboserver
......@@ -4,7 +4,7 @@ include_directories(/usr/lib64/glib-2.0/include)
add_executable(
fiphoboserver
server.cc
server.main.cc
get_request_handler.cc
put_request_handler.cc
unsupported_request_handler.cc
......@@ -24,5 +24,5 @@ target_compile_features(fiphoboserver PUBLIC cxx_std_14)
target_link_libraries(fiphoboserver PUBLIC proxygen::proxygen)
target_link_libraries(fiphoboserver PUBLIC proxygen::proxygenhttpserver)
target_link_libraries(fiphoboserver PUBLIC backend)
target_link_libraries(fiphoboserver PUBLIC fifops)
target_link_libraries(fiphoboserver PUBLIC stream)
target_link_libraries(fiphoboserver PUBLIC storage)
......@@ -17,7 +17,7 @@
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../S3_header.h"
#include "../fifo/FiPhoExceptions.h"
#include "../stream/FiPhoExceptions.h"
namespace fiphoboserver {
......@@ -32,38 +32,16 @@ void GetRequestHandler::onRequest(
return;
}
#ifdef DEBUG
std::cout << "Got the following GET request: \n";
std::string path = headers->getPath();
std::cout << path << '\n';
headers->getHeaders().forEach(
[&](const std::string& header, const std::string& val) {
std::cout << header << ": " << val << '\n';
});
#endif
/* Creating Bucket queries */
s3_header.setHeaders(std::move(headers));
try {
#ifdef DEBUG
std::cout << "Bucket: " << s3_header.getBucket() << '\n';
std::cout << "Key: " << s3_header.getKey() << '\n';
std::cout << std::endl;
#endif
backend.setBucketName(s3_header.getBucket());
#ifdef DEBUG
std::cout << "Creating backend GET thread" << std::endl;
#endif
db_result = std::async(
[this, key = s3_header.getKey(), fifoName = fifo.getFifoName()]() {
return backend.db_get(key, fifoName);
});
/* Send meta data to backend through the stream */
stream->set_storage_meta_data(s3_header.getMetaData(), s3_header.getBucket());
/* Tell stream to coordinate with backend to prepare for GET operation */
stream->start_get(s3_header.getKey());
fifo.openFIFO();
file_closed_ = false;
file_closed_ = false; // TODO: Better way of communicating this
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -84,6 +62,7 @@ void GetRequestHandler::onRequest(
proxygen::ResponseBuilder(downstream_).status(200, "Ok").send();
/* Initiating a read from the stream and creating a body to send */
readFileScheduled_ = true;
folly::getCPUExecutor()->add(std::bind(
&GetRequestHandler::readFile, this,
......@@ -92,17 +71,18 @@ void GetRequestHandler::onRequest(
void GetRequestHandler::readFile(folly::EventBase* evb)
{
#ifdef DEBUG
std::cout << "GetRequestHandler::readFile" << std::endl;
#endif
folly::IOBufQueue buf;
while (!file_closed_ && !paused_) {
// read 4k-ish chunks and foward each one to the client
/* read 4k-ish chunks and foward each one to the client */
auto data = buf.preallocate(4000, 4000);
auto rc = fifo.getDataFromFIFO(data.first, data.second);
#ifdef DEBUG
std::cout << "getDataFromFIFO: rc = " << rc << std::endl;
#endif
/*
* rc is set to error code or bytes read
* This informs the server how to proceed
* Note: zero will only be returned if the
* file is empty AND the file is closed
* for writing
*/
auto rc = stream->get(data.first, data.second);
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
......@@ -130,7 +110,7 @@ void GetRequestHandler::readFile(folly::EventBase* evb)
});
}
}
// Notify the request thread that we terminated the readFile loop
/* Notify the request thread that we terminated the readFile loop */
evb->runInEventBaseThread([this] {
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
......@@ -142,7 +122,7 @@ void GetRequestHandler::readFile(folly::EventBase* evb)
void GetRequestHandler::onEgressPaused() noexcept
{
// This will terminate readFile soon
/* This will terminate readFile soon */
VLOG(4) << "GetRequestHandler pause";
paused_ = true;
}
......@@ -151,7 +131,7 @@ void GetRequestHandler::onEgressResumed() noexcept
{
VLOG(4) << "GetRequestHandler resumed";
paused_ = false;
// If readFileScheduled_, it will reschedule itself
/* If readFileScheduled_, it will reschedule itself */
if (!readFileScheduled_) {
readFileScheduled_ = true;
folly::getCPUExecutor()->add(std::bind(
......@@ -170,7 +150,7 @@ void GetRequestHandler::onBody(std::unique_ptr<folly::IOBuf> /*body*/) noexcept
void GetRequestHandler::onEOM() noexcept
{
db_result.wait();
}
void GetRequestHandler::onUpgrade(
......
......@@ -13,8 +13,7 @@
#include <proxygen/httpserver/RequestHandler.h>
#include "../S3_header.h"
#include "../backend/backend.h"
#include "../fifo/fifo_ops.h"
#include "../stream/stream.h"
namespace proxygen {
class ResponseHandler;
......@@ -41,18 +40,19 @@ class GetRequestHandler : public proxygen::RequestHandler {
void onEgressResumed() noexcept override;
GetRequestHandler(std::unique_ptr<Stream> input_stream):
stream(std::move(input_stream)) {}
private:
void readFile(folly::EventBase* evb);
bool checkForCompletion();
std::unique_ptr<Stream> stream;
S3_header s3_header;
PhobosDb backend;
FIFOGet fifo;
bool readFileScheduled_{false};
std::atomic<bool> paused_{false};
bool finished_{false};
bool file_closed_{false};
std::future<int> db_result;
};
} // namespace fiphoboserver
......@@ -14,7 +14,7 @@
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../S3_header.h"
#include "../fifo/FiPhoExceptions.h"
#include "../stream/FiPhoExceptions.h"
namespace fiphoboserver {
......@@ -26,55 +26,22 @@ void PutRequestHandler::onRequest(
return;
}
#ifdef DEBUG
std::cout << "Got the following PUT request: \n";
std::string path = headers->getPath();
std::cout << path << '\n';
headers->getHeaders().forEach(
[&](const std::string& header, const std::string& val) {
std::cout << header << ": " << val << '\n';
});
#endif
#ifdef DEBUG
std::cout << "Creating Bucket queries..." << std::endl;
#endif
/* Creating Bucket queries */
s3_header.setHeaders(std::move(headers));
if (s3_header.isCreateBucketRequest()) {
// Ignore, since we don't really have buckets.
// Or do we want a list of the actual buckets to give an error, when a
// requested bucket doesn't exist?
/*
* Ignore, since we don't really have buckets.
* Or do we want a list of the actual buckets to give an error, when a
* requested bucket doesn't exist?
*/
proxygen::ResponseBuilder(downstream_).status(200, "Ok").sendWithEOM();
return;
}
try {
#ifdef DEBUG
std::cout << "Bucket: " << s3_header.getBucket() << '\n';
std::cout << "Key: " << s3_header.getKey() << '\n';
auto metaData = s3_header.getMetaData();
std::cout << "Metadata: " << '\n';
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
std::cout << pair.first << " : " << pair.second << '\n';
});
std::cout << std::endl;
#endif
backend.setBucketName(s3_header.getBucket());
backend.setMetaData(s3_header.getMetaData());
#ifdef DEBUG
std::cout << "Creating backend PUT thread" << std::endl;
#endif
db_result = std::async([this]() {
return backend.db_put(
s3_header.getBodyLength(), s3_header.getKey(),
fifo.getFifoName());
});
fifo.openFIFO();
/* Send meta data to backend through the stream */
stream->set_storage_meta_data(s3_header.getMetaData(), s3_header.getBucket());
/* Tell stream to coordinate with backend to prepare for PUT operation */
stream->start_put(s3_header.getBodyLength(), s3_header.getKey());
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -96,16 +63,9 @@ void PutRequestHandler::onRequest(
void PutRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
{
#ifdef DEBUG
std::cout << "Adding data to FIFO... " << std::endl;
if (body) {
body->coalesce();
size_t totalBodyLength = body->computeChainDataLength();
std::cout << "Body Data Length = " << totalBodyLength << '\n';
}
#endif
try {
fifo.addDataToFIFO(std::move(body));
/* Hand message body over to stream for PUT operation */
stream->put(std::move(body));
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
......@@ -121,11 +81,8 @@ void PutRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
void PutRequestHandler::onEOM() noexcept
{
try {
#ifdef DEBUG
std::cout << "Finish IO..." << std::endl;
#endif
db_result.wait();
fifo.finishPut();
/* Tell stream it's time to clean up */
stream->finish_put();
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
......
......@@ -12,8 +12,7 @@
#include <proxygen/httpserver/RequestHandler.h>
#include "../S3_header.h"
#include "../backend/backend.h"
#include "../fifo/fifo_ops.h"
#include "../stream/stream.h"
namespace proxygen {
class ResponseHandler;
......@@ -36,11 +35,12 @@ class PutRequestHandler : public proxygen::RequestHandler {
void onError(proxygen::ProxygenError err) noexcept override;
PutRequestHandler(std::unique_ptr<Stream> input_stream):
stream(std::move(input_stream)) {}
private:
S3_header s3_header;
PhobosDb backend;
FIFOPut fifo;
std::future<int> db_result;
std::unique_ptr<Stream> stream;
};
} // namespace fiphoboserver
......@@ -20,6 +20,8 @@
#include "put_request_handler.h"
#include "unsupported_request_handler.h"
#include "../stream/fifo.h"
#include "../storage/phobos_file.h"
using folly::SocketAddress;
......@@ -46,25 +48,13 @@ class HandlerFactory : public proxygen::RequestHandlerFactory {
proxygen::RequestHandler*,
proxygen::HTTPMessage* headers) noexcept override
{
#ifdef DEBUG
std::cout << "Choosing method: ";
#endif
if (headers->getMethod() == proxygen::HTTPMethod::GET) {
#ifdef DEBUG
std::cout << "GET" << std::endl;
#endif
return new fiphoboserver::GetRequestHandler;
return new fiphoboserver::GetRequestHandler(std::make_unique<fiphoboserver::Fifo>(std::make_unique<fiphoboserver::Phobos_file>()));
}
else if (headers->getMethod() == proxygen::HTTPMethod::PUT) {
#ifdef DEBUG
std::cout << "PUT" << std::endl;
#endif
return new fiphoboserver::PutRequestHandler;
return new fiphoboserver::PutRequestHandler(std::make_unique<fiphoboserver::Fifo>(std::make_unique<fiphoboserver::Phobos_file>()));
}
else {
#ifdef DEBUG
std::cout << "Unsupported Request" << std::endl;
#endif
return new fiphoboserver::UnsupportedRequestHandler;
}
}
......@@ -102,10 +92,7 @@ int main(int argc, char* argv[])
proxygen::HTTPServer server(std::move(options));
server.bind(IPs);
// Start HTTPServer mainloop in a separate thread
#ifdef DEBUG
std::cout << "main: Starting HTTPServer thread" << std::endl;
#endif
/* Start HTTPServer mainloop in a separate thread */
std::thread t([&]() { server.start(); });
t.join();
......