Commit a61aef3d authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Ran Clang format

parent 2a3cb78f
......@@ -5,7 +5,6 @@
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <cstdio>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
......@@ -14,13 +13,29 @@
namespace fiphoboserver {
FiPhobo::FiPhobo()
Backend::Backend()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
void FiPhobo::setBucketName(std::string bucketName)
Backend::~Backend()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void Backend::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
......@@ -31,7 +46,7 @@ void FiPhobo::setBucketName(std::string bucketName)
}
}
void FiPhobo::setMetaData(
void PutBackend::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
......@@ -48,7 +63,35 @@ void FiPhobo::setMetaData(
});
}
void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
void Backend::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
if (errno != EEXIST) {
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
#ifdef DEBUG
std::cout << "End of Phobos_Layer::startFIFO" << std::endl;
#endif
}
void PutBackend::startPutOperation(ssize_t size, std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
......@@ -77,7 +120,7 @@ void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
// delete [] unconstedObjectID;
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
void PutBackend::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
......@@ -99,7 +142,7 @@ void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
}
}
void FiPhobo::finishPUT()
void PutBackend::finishPut()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishPUT" << std::endl;
......@@ -117,7 +160,7 @@ void FiPhobo::finishPUT()
putRunning = false;
}
void FiPhobo::startGetOperation(std::string objectID)
void GetBackend::startGetOperation(std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
......@@ -127,25 +170,25 @@ void FiPhobo::startGetOperation(std::string objectID)
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = 20662;
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = 20662;
#ifdef DEBUG
std::cout << "Starting async phobos_get_cpp" << std::endl;
std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
phobos_result = std::async(
[&]() {
phobos_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL); });
return phobos_get_cpp(&descriptor, 1, NULL, NULL);
});
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
#endif
}
ssize_t FiPhobo::getDataFromFIFO(void *buf, size_t count)
ssize_t GetBackend::getDataFromFIFO(void* buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
......@@ -164,47 +207,4 @@ ssize_t FiPhobo::getDataFromFIFO(void *buf, size_t count)
return rc;
}
FiPhobo::~FiPhobo()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void FiPhobo::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
if (errno != EEXIST) {
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
#ifdef DEBUG
std::cout << "End of Phobos_Layer::startFIFO" << std::endl;
#endif
}
} // namespace fiphoboserver
......@@ -13,35 +13,37 @@ namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class FiPhobo {
public:
FiPhobo();
class Backend {
public:
Backend();
void setBucketName(std::string bucketName);
~Backend();
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(unsigned int size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPUT();
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void *buf, size_t count);
~FiPhobo();
private:
protected:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName;
int fifo_descriptor = -1;
int fifo_descriptor = -1;
std::future<int> phobos_result;
}
class PutBackend : public Backend {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(ssize_t size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
private:
std::future<long int> put_result;
bool putRunning = false;
};
class GetBackend : public Backend {
public:
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void* buf, size_t count);
};
} // namespace fiphoboserver
......@@ -57,12 +57,12 @@ void GetRequestHandler::onRequest(
std::cout << "Key: " << s3_header.getKey() << '\n';
std::cout << std::endl;
#endif
phobos.setBucketName(s3_header.getBucket());
backend.setBucketName(s3_header.getBucket());
#ifdef DEBUG
std::cout << "Send get to phobos layer" << std::endl;
#endif
phobos.startGetOperation(s3_header.getKey());
backend.startGetOperation(s3_header.getKey());
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -84,78 +84,84 @@ void GetRequestHandler::onRequest(
proxygen::ResponseBuilder(downstream_).status(200, "Ok").send();
readFileScheduled_ = true;
folly::getCPUExecutor()->add(
std::bind(&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
folly::getCPUExecutor()->add(std::bind(
&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
}
void GetRequestHandler::readFile(folly::EventBase* evb) {
void GetRequestHandler::readFile(folly::EventBase* evb)
{
folly::IOBufQueue buf;
while (!paused_) {
// read 4k-ish chunks and foward each one to the client
auto data = buf.preallocate(4000, 4000);
auto rc = phobos.getDataFromFIFO(data.first, data.second);
auto rc = backend.getDataFromFIFO(data.first, data.second);
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
evb->runInEventBaseThread([this] {
LOG(ERROR) << "Error reading file";
downstream_->sendAbort();
});
LOG(ERROR) << "Error reading file";
downstream_->sendAbort();
});
break;
} else if (rc == 0) {
}
else if (rc == 0) {
// done
VLOG(4) << "Read EOF";
evb->runInEventBaseThread([this] {
proxygen::ResponseBuilder(downstream_)
.sendWithEOM();
});
proxygen::ResponseBuilder(downstream_).sendWithEOM();
});
break;
} else if (rc < 4000) {
}
else if (rc < 4000) {
// done
VLOG(4) << "Read EOF";
buf.postallocate(rc);
evb->runInEventBaseThread([this, body=buf.move()] () mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.sendWithEOM();
});
evb->runInEventBaseThread([this, body = buf.move()]() mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.sendWithEOM();
});
break;
} else {
}
else {
buf.postallocate(rc);
evb->runInEventBaseThread([this, body=buf.move()] () mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.send();
});
evb->runInEventBaseThread([this, body = buf.move()]() mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.send();
});
}
}
// Notify the request thread that we terminated the readFile loop
evb->runInEventBaseThread([this] {
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
VLOG(4) << "Resuming deferred readFile";
onEgressResumed();
}
});
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
VLOG(4) << "Resuming deferred readFile";
onEgressResumed();
}
});
}
void GetRequestHandler::onEgressPaused() noexcept {
void GetRequestHandler::onEgressPaused() noexcept
{
// This will terminate readFile soon
VLOG(4) << "GetRequestHandler pause";
paused_ = true;
}
void GetRequestHandler::onEgressResumed() noexcept {
void GetRequestHandler::onEgressResumed() noexcept
{
VLOG(4) << "GetRequestHandler resumed";
paused_ = false;
// If readFileScheduled_, it will reschedule itself
if (!readFileScheduled_) {
readFileScheduled_ = true;
folly::getCPUExecutor()->add(
std::bind(&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
} else {
folly::getCPUExecutor()->add(std::bind(
&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
}
else {
VLOG(4) << "Deferred scheduling readFile";
}
}
......@@ -167,7 +173,8 @@ void GetRequestHandler::onBody(std::unique_ptr<folly::IOBuf> /*body*/) noexcept
void GetRequestHandler::onEOM() noexcept {}
void GetRequestHandler::onUpgrade(proxygen::UpgradeProtocol /*protocol*/) noexcept
void GetRequestHandler::onUpgrade(
proxygen::UpgradeProtocol /*protocol*/) noexcept
{
// handler doesn't support upgrades
}
......@@ -175,18 +182,19 @@ void GetRequestHandler::onUpgrade(proxygen::UpgradeProtocol /*protocol*/) noexce
void GetRequestHandler::requestComplete() noexcept
{
finished_ = true;
paused_ = true;
paused_ = true;
checkForCompletion();
}
void GetRequestHandler::onError(proxygen::ProxygenError /*err*/) noexcept
{
finished_ = true;
paused_ = true;
paused_ = true;
checkForCompletion();
}
bool GetRequestHandler::checkForCompletion() {
bool GetRequestHandler::checkForCompletion()
{
if (finished_ && !readFileScheduled_) {
VLOG(4) << "deleting GetRequestHandler";
delete this;
......
......@@ -45,7 +45,7 @@ class GetRequestHandler : public proxygen::RequestHandler {
bool checkForCompletion();
S3_header s3_header;
FiPhobo phobos;
GetBackend backend;
bool readFileScheduled_{false};
std::atomic<bool> paused_{false};
bool finished_{false};
......
......@@ -64,13 +64,14 @@ void PushRequestHandler::onRequest(
});
std::cout << std::endl;
#endif
phobos.setBucketName(s3_header.getBucket());
phobos.setMetaData(s3_header.getMetaData());
backend.setBucketName(s3_header.getBucket());
backend.setMetaData(s3_header.getMetaData());
#ifdef DEBUG
std::cout << "Send put to phobos layer" << std::endl;
#endif
phobos.startPutOperation(s3_header.getBodyLength(), s3_header.getKey());
backend.startPutOperation(
s3_header.getBodyLength(), s3_header.getKey());
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -105,7 +106,7 @@ void PushRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
}
#endif
try {
phobos.addDataToFIFO(std::move(body));
backend.addDataToFIFO(std::move(body));
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
......@@ -124,7 +125,7 @@ void PushRequestHandler::onEOM() noexcept
#ifdef DEBUG
std::cout << "Finish IO..." << std::endl;
#endif
phobos.finishPUT();
backend.finishPut();
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
......
......@@ -37,7 +37,7 @@ class PushRequestHandler : public proxygen::RequestHandler {
private:
S3_header s3_header;
FiPhobo phobos;
PutBackend backend;
};
} // namespace fiphoboserver
......@@ -16,8 +16,8 @@
#include <proxygen/httpserver/HTTPServer.h>
#include <proxygen/httpserver/RequestHandlerFactory.h>
#include "push_request_handler.h"
#include "get_request_handler.h"
#include "push_request_handler.h"
#include "unsupported_request_handler.h"
......@@ -48,21 +48,21 @@ class HandlerFactory : public proxygen::RequestHandlerFactory {
{
#ifdef DEBUG
std::cout << "Choosing method: ";
#endif
#endif
if (headers->getMethod() == proxygen::HTTPMethod::GET) {
#ifdef DEBUG
#ifdef DEBUG
std::cout << "Chose GET" << std::endl;
#endif
return new fiphoboserver::GetRequestHandler;
}
else if (headers->getMethod() == proxygen::HTTPMethod::PUT) {
#ifdef DEBUG
#ifdef DEBUG
std::cout << "Chose PUT" << std::endl;
#endif
return new fiphoboserver::PushRequestHandler;
}
else {
#ifdef DEBUG
#ifdef DEBUG
std::cout << "Unsupported Request" << std::endl;
#endif
return new fiphoboserver::UnsupportedRequestHandler;
......
......@@ -9,8 +9,8 @@
#include "unsupported_request_handler.h"
#include <cstdio>
#include <stdlib.h>
#include <sstream>
#include <stdlib.h>
#include <proxygen/httpserver/RequestHandler.h>
#include <proxygen/httpserver/ResponseBuilder.h>
......@@ -23,13 +23,12 @@ void UnsupportedRequestHandler::onRequest(
std::string method = headers->getMethodString();
std::stringstream ss;
ss << "The method " << method << " is not supported!";
proxygen::ResponseBuilder(downstream_)
.status(200, "Ok")
.body(ss.str())
.sendWithEOM();
return;
}
} // namespace fiphoboserver
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment