Commit bf66169c authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Merge commit '993050da'

parents df4563b9 993050da
cmake_minimum_required(VERSION 3.0)
project(fiphoboserver)
set(CUSTOM_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
find_package(proxygen REQUIRED)
add_subdirectory(src)
# FIPhoboServer
Build with
```
cmake -DDEBUG=ON -DCMAKE_PREFIX_PATH=/path/to/proxygen /path/to/CMakeLists/file/
make
```
from any directory you want and hope for the best! ;)
Oh, make sure Phobos include directories are under `../phobos/src/include` from the main `CMakeLists.txt`, or change that path in there
if(DEBUG)
add_compile_definitions(DEBUG)
endif(DEBUG)
add_subdirectory(storage)
add_subdirectory(stream)
add_subdirectory(server)
#pragma once
#include <proxygen/lib/http/HTTPMessage.h>
namespace fiphoboserver {
class S3_header {
public:
void setHeaders(std::unique_ptr<proxygen::HTTPMessage> newHeaders)
{
headers = std::move(newHeaders);
}
std::string getBucket()
{
if (!headers) {
return "";
}
std::string path = headers->getPath();
path = path.substr(1, path.size());
std::string bucketName = path.substr(0, path.find('/'));
return bucketName;
}
std::string getKey()
{
if (!headers) {
return "";
}
std::string path = headers->getPath();
path = path.substr(1, path.size());
std::string fileKey = path.substr(path.find('/') + 1, path.size() - 1);
if (fileKey.find('/') != std::string::npos
|| fileKey.find('\\') != std::string::npos) {
std::cout << "Bad Key! " << std::endl;
return "";
}
return fileKey;
}
bool isCreateBucketRequest()
{
if (!headers) {
return false;
}
if (getKey() == "") return true;
return false;
}
size_t getBodyLength()
{
std::string contentLength =
headers->getHeaders().getSingleOrEmpty("Content-Length");
return static_cast<size_t>(std::stoi(contentLength));
}
std::vector<std::pair<std::string, std::string>> getMetaData()
{
std::vector<std::pair<std::string, std::string>> metadata;
std::string metaSearchString = "x-amz-meta-";
headers->getHeaders().forEach([&](const std::string& header,
const std::string& val) {
if (header.find(metaSearchString) != std::string::npos) {
std::string metaName =
header.substr(metaSearchString.size(), header.size() - 1);
std::pair<std::string, std::string> currentPair(metaName, val);
metadata.push_back(currentPair);
}
});
return metadata;
}
private:
std::unique_ptr<proxygen::HTTPMessage> headers;
};
} // namespace fiphoboserver
include_directories(${PHOBOS_INCLUDE_DIRECTORY})
include_directories(/usr/include/glib-2.0)
include_directories(/usr/lib64/glib-2.0/include)
add_executable(
fiphoboserver
server.main.cc
get_request_handler.cc
put_request_handler.cc
unsupported_request_handler.cc
)
if(CUSTOM_OUTPUT_DIRECTORY)
set_target_properties( fiphoboserver
PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
LIBRARY_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
RUNTIME_OUTPUT_DIRECTORY "${CUSTOM_OUTPUT_DIRECTORY}/fiphoboserver"
)
endif(CUSTOM_OUTPUT_DIRECTORY)
target_compile_features(fiphoboserver PUBLIC cxx_std_14)
target_link_libraries(fiphoboserver PUBLIC proxygen::proxygen)
target_link_libraries(fiphoboserver PUBLIC proxygen::proxygenhttpserver)
target_link_libraries(fiphoboserver PUBLIC stream)
target_link_libraries(fiphoboserver PUBLIC storage)
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
* * This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "get_request_handler.h"
#include <cstdio>
#include <stdlib.h>
#include <folly/FileUtil.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/io/async/EventBaseManager.h>
#include <proxygen/httpserver/RequestHandler.h>
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../S3_header.h"
#include "../stream/FiPhoExceptions.h"
namespace fiphoboserver {
void GetRequestHandler::onRequest(
std::unique_ptr<proxygen::HTTPMessage> headers) noexcept
{
if (headers->getMethod() != proxygen::HTTPMethod::GET) {
proxygen::ResponseBuilder(downstream_)
.status(400, "Bad method")
.body("Only GET is supported")
.sendWithEOM();
return;
}
/* Creating Bucket queries */
s3_header.setHeaders(std::move(headers));
try {
/* Send meta data to backend through the stream */
stream->set_storage_meta_data(s3_header.getMetaData(), s3_header.getBucket());
/* Tell stream to coordinate with backend to prepare for GET operation */
stream->start_get(s3_header.getKey());
file_closed_ = false; // TODO: Better way of communicating this
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
.status(404, "Not Found")
.body(folly::to<std::string>(
"Could not find ", headers->getPathAsStringPiece(),
" ex=", folly::exceptionStr(ex)))
.sendWithEOM();
return;
}
catch (const fiphoexceptions::PhobosException& ex) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(ex.what())
.sendWithEOM();
return;
}
proxygen::ResponseBuilder(downstream_).status(200, "Ok").send();
/* Initiating a read from the stream and creating a body to send */
readFileScheduled_ = true;
folly::getCPUExecutor()->add(std::bind(
&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
}
void GetRequestHandler::readFile(folly::EventBase* evb)
{
folly::IOBufQueue buf;
while (!file_closed_ && !paused_) {
/* read 4k-ish chunks and foward each one to the client */
auto data = buf.preallocate(4000, 4000);
/*
* rc is set to error code or bytes read
* This informs the server how to proceed
* Note: zero will only be returned if the
* file is empty AND the file is closed
* for writing
*/
auto rc = stream->get(data.first, data.second);
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
evb->runInEventBaseThread([this] {
LOG(ERROR) << "Error reading file";
downstream_->sendAbort();
});
break;
}
else if (rc == 0) {
// done
file_closed_ = true;
VLOG(4) << "Read EOF";
evb->runInEventBaseThread([this] {
proxygen::ResponseBuilder(downstream_).sendWithEOM();
});
break;
}
else {
buf.postallocate(rc);
evb->runInEventBaseThread([this, body = buf.move()]() mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.send();
});
}
}
/* Notify the request thread that we terminated the readFile loop */
evb->runInEventBaseThread([this] {
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
VLOG(4) << "Resuming deferred readFile";
onEgressResumed();
}
});
}
void GetRequestHandler::onEgressPaused() noexcept
{
/* This will terminate readFile soon */
VLOG(4) << "GetRequestHandler pause";
paused_ = true;
}
void GetRequestHandler::onEgressResumed() noexcept
{
VLOG(4) << "GetRequestHandler resumed";
paused_ = false;
/* If readFileScheduled_, it will reschedule itself */
if (!readFileScheduled_) {
readFileScheduled_ = true;
folly::getCPUExecutor()->add(std::bind(
&GetRequestHandler::readFile, this,
folly::EventBaseManager::get()->getEventBase()));
}
else {
VLOG(4) << "Deferred scheduling readFile";
}
}
void GetRequestHandler::onBody(std::unique_ptr<folly::IOBuf> /*body*/) noexcept
{
// ignore, only support GET
}
void GetRequestHandler::onEOM() noexcept
{
}
void GetRequestHandler::onUpgrade(
proxygen::UpgradeProtocol /*protocol*/) noexcept
{
// handler doesn't support upgrades
}
void GetRequestHandler::requestComplete() noexcept
{
finished_ = true;
paused_ = true;
checkForCompletion();
}
void GetRequestHandler::onError(proxygen::ProxygenError /*err*/) noexcept
{
finished_ = true;
paused_ = true;
checkForCompletion();
}
bool GetRequestHandler::checkForCompletion()
{
if (finished_ && !readFileScheduled_) {
VLOG(4) << "deleting GetRequestHandler";
delete this;
return true;
}
return false;
}
} // namespace fiphoboserver
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
//#include <folly/File.h>
#include <folly/Memory.h>
#include <proxygen/httpserver/RequestHandler.h>
#include "../S3_header.h"
#include "../stream/stream.h"
namespace proxygen {
class ResponseHandler;
}
namespace fiphoboserver {
class GetRequestHandler : public proxygen::RequestHandler {
public:
void onRequest(
std::unique_ptr<proxygen::HTTPMessage> headers) noexcept override;
void onBody(std::unique_ptr<folly::IOBuf> body) noexcept override;
void onEOM() noexcept override;
void onUpgrade(proxygen::UpgradeProtocol proto) noexcept override;
void requestComplete() noexcept override;
void onError(proxygen::ProxygenError err) noexcept override;
void onEgressPaused() noexcept override;
void onEgressResumed() noexcept override;
GetRequestHandler(std::unique_ptr<Stream> input_stream):
stream(std::move(input_stream)) {}
private:
void readFile(folly::EventBase* evb);
bool checkForCompletion();
std::unique_ptr<Stream> stream;
S3_header s3_header;
bool readFileScheduled_{false};
std::atomic<bool> paused_{false};
bool finished_{false};
bool file_closed_{false};
};
} // namespace fiphoboserver
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "put_request_handler.h"
#include <folly/FileUtil.h>
#include <proxygen/httpserver/PushHandler.h>
#include <proxygen/httpserver/RequestHandler.h>
#include <proxygen/httpserver/ResponseBuilder.h>
#include "../S3_header.h"
#include "../stream/FiPhoExceptions.h"
namespace fiphoboserver {
void PutRequestHandler::onRequest(
std::unique_ptr<proxygen::HTTPMessage> headers) noexcept
{
if (!downstream_) {
// can't push
return;
}
/* Creating Bucket queries */
s3_header.setHeaders(std::move(headers));
if (s3_header.isCreateBucketRequest()) {
/*
* Ignore, since we don't really have buckets.
* Or do we want a list of the actual buckets to give an error, when a
* requested bucket doesn't exist?
*/
proxygen::ResponseBuilder(downstream_).status(200, "Ok").sendWithEOM();
return;
}
try {
/* Send meta data to backend through the stream */
stream->set_storage_meta_data(s3_header.getMetaData(), s3_header.getBucket());
/* Tell stream to coordinate with backend to prepare for PUT operation */
stream->start_put(s3_header.getBodyLength(), s3_header.getKey());
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
.status(404, "Not Found")
.body(folly::to<std::string>(
"Could not find ", headers->getPathAsStringPiece(),
" ex=", folly::exceptionStr(ex)))
.sendWithEOM();
return;
}
catch (const fiphoexceptions::PhobosException& ex) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(ex.what())
.sendWithEOM();
return;
}
}
void PutRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
{
try {
/* Hand message body over to stream for PUT operation */
stream->put(std::move(body));
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(fifoExcp.what())
.sendWithEOM();
return;
}
proxygen::ResponseBuilder(downstream_).status(100, "Continue").send();
}
void PutRequestHandler::onEOM() noexcept
{
try {
/* Tell stream it's time to clean up */
stream->finish_put();
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(fifoExcp.what())
.sendWithEOM();
return;
}
catch (fiphoexceptions::PhobosException& phoExcp) {
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(phoExcp.what())
.sendWithEOM();
return;
}
proxygen::ResponseBuilder(downstream_).status(200, "OK").sendWithEOM();
}
void PutRequestHandler::onUpgrade(
proxygen::UpgradeProtocol /*protocol*/) noexcept
{
// handler doesn't support upgrades
}
void PutRequestHandler::requestComplete() noexcept
{
delete this;
}
void PutRequestHandler::onError(proxygen::ProxygenError /*err*/) noexcept
{
delete this;
}
} // namespace fiphoboserver
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/Memory.h>
#include <proxygen/httpserver/RequestHandler.h>
#include "../S3_header.h"
#include "../stream/stream.h"
namespace proxygen {
class ResponseHandler;
}
namespace fiphoboserver {
class PutRequestHandler : public proxygen::RequestHandler {
public:
void onRequest(
std::unique_ptr<proxygen::HTTPMessage> headers) noexcept override;
void onBody(std::unique_ptr<folly::IOBuf> body) noexcept override;
void onEOM() noexcept override;
void onUpgrade(proxygen::UpgradeProtocol proto) noexcept override;
void requestComplete() noexcept override;
void onError(proxygen::ProxygenError err) noexcept override;
PutRequestHandler(std::unique_ptr<Stream> input_stream):
stream(std::move(input_stream)) {}
private:
S3_header s3_header;
std::unique_ptr<Stream> stream;
};
} // namespace fiphoboserver
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <folly/Memory.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/init/Init.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/Unistd.h>
#include <proxygen/httpserver/HTTPServer.h>
#include <proxygen/httpserver/RequestHandlerFactory.h>
#include "get_request_handler.h"
#include "put_request_handler.h"
#include "unsupported_request_handler.h"
#include "../stream/fifo.h"
#include "../storage/phobos_file.h"
using folly::SocketAddress;
using Protocol = proxygen::HTTPServer::Protocol;
DEFINE_int32(http_port, 11000, "Port to listen on with HTTP protocol");
DEFINE_int32(h2_port, 11002, "Port to listen on with HTTP/2 protocol");
DEFINE_string(ip, "localhost", "IP/Hostname to bind to");
DEFINE_int32(
threads,
0,