Commit c938649e authored by Sophie Wenzel-Teuber's avatar Sophie Wenzel-Teuber
Browse files

Clean up static handler, remove body output from push handler and make fiphobo compile again

parent df959ea4
......@@ -67,13 +67,21 @@ void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
std::cout << "Starting async phobos_put_cpp" << std::endl;
#endif
phobos_result = std::async([&](){
std::cout << "Phobos Thread id " << std::this_thread::get_id() << std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
//TODOOOO
// delete [] unconstedObjectID;
}
static long int fifoWriteTemp(std::unique_ptr<folly::IOBuf> buffer, int fifo_descriptor){
std::cout << "IO Thread id " << std::this_thread::get_id() << std::endl;
return write(fifo_descriptor, buffer->data(), buffer->length());
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
......@@ -119,7 +127,7 @@ void FiPhobo::finishPUT()
putRunning = false;
}
void FiPhobo::startGetOperation(std::string& objectID)
void FiPhobo::startGetOperation(std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
......@@ -154,6 +162,12 @@ std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
return std::move(buffer);
});
int phobosRC = phobos_result.get();
if(phobosRC)
{
throw fiphoexceptions::PhobosException("pho_get", phobosRC);
}
std::unique_ptr<folly::IOBuf> buffer;
try
{
......@@ -165,12 +179,6 @@ std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
throw fifoException;
}
int phobosRC = phobos_result.get();
if(phobosRC)
{
throw fiphoexceptions::PhobosException("pho_get", phobosRC);
}
return std::move(buffer);
}
......
......@@ -28,7 +28,7 @@ public:
void finishPUT();
void startGetOperation(std::string& objectID);
void startGetOperation(std::string objectID);
std::unique_ptr<folly::IOBuf> getDataFromFIFO(size_t size);
......
......@@ -38,10 +38,6 @@ class PushRequestHandler : public proxygen::RequestHandler {
private:
S3_header s3_header;
FiPhobo phobos;
std::unique_ptr<folly::IOBuf> body_;
// proxygen::ResponseHandler* downstreamPush_{nullptr};
};
} // namespace fiphoboserver
......@@ -17,17 +17,11 @@
#include <proxygen/httpserver/RequestHandler.h>
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../fiphobo/fiphobo.h"
#include "../S3_header.h"
#include "../fiphobo/FiPhoExceptions.h"
namespace fiphoboserver {
/**
* Handles requests by serving the file named in path. Only supports GET.
* reads happen in a CPU thread pool since read(2) is blocking.
* If egress pauses, file reading is also paused.
*/
void StaticHandler::onRequest(std::unique_ptr<proxygen::HTTPMessage> headers) noexcept
{
if (headers->getMethod() != proxygen::HTTPMethod::GET) {
......@@ -44,23 +38,26 @@ void StaticHandler::onRequest(std::unique_ptr<proxygen::HTTPMessage> headers) no
std::cout << path << '\n';
headers->getHeaders().forEach([&] (const std::string& header, const std::string& val) {
// builder.header("OH:" + header, val);
std::cout << header << ": " << val << '\n';
});
#endif
s3_header.setHeaders(std::move(headers));
std::string bucketName;
std::string fileKey;
// a real webserver would validate this path didn't contain malicious
// characters like '//' or '..'
try {
// + 1 to kill leading /
bucketName = s3_header.getBucket();
fileKey = s3_header.getKey();
// file_ = std::make_unique<folly::File>(
// headers->getPathAsStringPiece().subpiece(1));
#ifdef DEBUG
std::cout << "Bucket: " << s3_header.getBucket() << '\n';
std::cout << "Key: " << s3_header.getKey() << '\n';
std::cout << std::endl;
#endif
phobos.setBucketName(s3_header.getBucket());
#ifdef DEBUG
std::cout << "Send get to phobos layer" << std::endl;
#endif
phobos.startGetOperation(s3_header.getKey());
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -71,93 +68,15 @@ void StaticHandler::onRequest(std::unique_ptr<proxygen::HTTPMessage> headers) no
.sendWithEOM();
return;
}
#ifdef DEBUG
std::cout << "Bucket: " << bucketName << '\n';
std::cout << "Key: " << fileKey << '\n';
std::cout << std::endl;
#endif
catch (const fiphoexceptions::PhobosException& ex) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(ex.what())
.sendWithEOM();
return;
}
proxygen::ResponseBuilder(downstream_).status(200, "Ok").send();
// use a CPU executor since read(2) of a file can block
// readFileScheduled_ = true;
// folly::getCPUExecutor()->add(std::bind(
// &StaticHandler::readFile, this,
// folly::EventBaseManager::get()->getEventBase()));
readFileBlocking(bucketName, fileKey);
}
void StaticHandler::readFileBlocking(std::string bucketName, std::string fileKey)
{
// TODO phobos_layer
proxygen::ResponseBuilder(downstream_).sendWithEOM();
}
void StaticHandler::readFile(folly::EventBase* evb)
{
folly::IOBufQueue buf;
while (file_ && !paused_) {
// read 4k-ish chunks and foward each one to the client
auto data = buf.preallocate(4000, 4000);
auto rc = folly::readNoInt(file_->fd(), data.first, data.second);
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
file_.reset();
evb->runInEventBaseThread([this] {
LOG(ERROR) << "Error reading file";
downstream_->sendAbort();
});
break;
}
else if (rc == 0) {
// done
file_.reset();
VLOG(4) << "Read EOF";
evb->runInEventBaseThread(
[this] { proxygen::ResponseBuilder(downstream_).sendWithEOM(); });
break;
}
else {
buf.postallocate(rc);
evb->runInEventBaseThread([this, body = buf.move()]() mutable {
proxygen::ResponseBuilder(downstream_).body(std::move(body)).send();
});
}
}
// Notify the request thread that we terminated the readFile loop
evb->runInEventBaseThread([this] {
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
VLOG(4) << "Resuming deferred readFile";
onEgressResumed();
}
});
}
void StaticHandler::onEgressPaused() noexcept
{
// This will terminate readFile soon
VLOG(4) << "StaticHandler paused";
paused_ = true;
}
void StaticHandler::onEgressResumed() noexcept
{
VLOG(4) << "StaticHandler resumed";
paused_ = false;
// If readFileScheduled_, it will reschedule itself
if (!readFileScheduled_ && file_) {
readFileScheduled_ = true;
folly::getCPUExecutor()->add(std::bind(
&StaticHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
}
else {
VLOG(4) << "Deferred scheduling readFile";
}
}
......@@ -175,26 +94,12 @@ void StaticHandler::onUpgrade(proxygen::UpgradeProtocol /*protocol*/) noexcept
void StaticHandler::requestComplete() noexcept
{
finished_ = true;
paused_ = true;
checkForCompletion();
delete this;
}
void StaticHandler::onError(proxygen::ProxygenError /*err*/) noexcept
{
finished_ = true;
paused_ = true;
checkForCompletion();
}
bool StaticHandler::checkForCompletion()
{
if (finished_ && !readFileScheduled_) {
VLOG(4) << "deleting StaticHandler";
delete this;
return true;
}
return false;
delete this;
}
} // namespace fiphoboserver
......@@ -13,6 +13,7 @@
#include <proxygen/httpserver/RequestHandler.h>
#include "../S3_header.h"
#include "../fiphobo/fiphobo.h"
namespace proxygen {
class ResponseHandler;
......@@ -35,20 +36,9 @@ class StaticHandler : public proxygen::RequestHandler {
void onError(proxygen::ProxygenError err) noexcept override;
void onEgressPaused() noexcept override;
void onEgressResumed() noexcept override;
private:
void readFile(folly::EventBase* evb);
void readFileBlocking(std::string bucketName, std::string fileKey);
bool checkForCompletion();
S3_header s3_header;
std::unique_ptr<folly::File> file_;
bool readFileScheduled_{false};
std::atomic<bool> paused_{false};
bool finished_{false};
FiPhobo phobos;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment