Commit 9a00f70e authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Merge branch 'non_block_fifo' into 'devel'

Non block fifo

See merge request oilgas/ltfs/fiphoboserver!3
parents 6bb805d2 2836db9f
#include "backend.h"
#include <algorithm>
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <vector>
#include <string>
#include <algorithm>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include "../fifo/FiPhoExceptions.h"
namespace fiphoboserver {
void DbInterface::openFile(std::string fileName, int flags)
{
#ifdef DEBUG
std::cout << "Backend::openFile" << std::endl;
#endif
file_descriptor = open(fileName.c_str(), flags);
if (file_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fileName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
void DbInterface::closeFile()
{
if (file_descriptor > 0) {
close(file_descriptor);
}
}
PhobosDb::PhobosDb()
{
#ifdef DEBUG
......@@ -35,12 +56,14 @@ PhobosDb::~PhobosDb()
pho_xfer_desc_destroy_cpp(&descriptor);
}
void PhobosDb::db_put(ssize_t size, std::string objectID, int fd)
int PhobosDb::db_put(ssize_t size, std::string objectID, std::string fileName)
{
#ifdef DEBUG
std::cout << "PhobosDb::db_put" << std::endl;
#endif
openFile(fileName, O_RDONLY);
#ifdef DEBUG
std::cout << "PhobosDb: Object size = " << size << std::endl;
std::cout << "PhobosDb: fifo_descriptor = " << fd << std::endl;
#endif
char* unconstedObjectID = new char[objectID.length() + 1];
......@@ -48,46 +71,39 @@ void PhobosDb::db_put(ssize_t size, std::string objectID, int fd)
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fd;
descriptor.xd_fd = file_descriptor;
descriptor.xd_size = size;
#ifdef DEBUG
std::cout << "Starting phobos_put_cpp" << std::endl;
std::cout << "PhobosDb: Starting phobos_put_cpp" << std::endl;
#endif
db_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
int rc = phobos_put_cpp(&descriptor, 1, NULL, NULL);
#ifdef DEBUG
std::cout << "Main Thread id " << std::this_thread::get_id()
<< std::endl;
#endif
// TODOOOO
// delete [] unconstedObjectID;
closeFile();
return rc;
}
void PhobosDb::db_get(std::string objectID, int fd)
int PhobosDb::db_get(std::string objectID, std::string fileName)
{
#ifdef DEBUG
std::cout << "PhobosDb::db_get" << std::endl;
std::cout << "PhobosDb: fifo_descriptor = " << fd << std::endl;
#endif
openFile(fileName, O_WRONLY);
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fd;
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = file_descriptor;
#ifdef DEBUG
std::cout << "Starting phobos_get_cpp" << std::endl;
std::cout << "PhobosDB: Starting phobos_get_cpp" << std::endl;
#endif
db_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL);
});
int rc = phobos_get_cpp(&descriptor, 1, NULL, NULL);
closeFile();
return rc;
}
void PhobosDb::setBucketName(std::string bucketName)
......@@ -98,7 +114,7 @@ void PhobosDb::setBucketName(std::string bucketName)
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
}
}
void PhobosDb::setMetaData(
......
#pragma once
#include <algorithm>
#include <future>
#include <vector>
#include <string>
#include <algorithm>
#include <vector>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
......@@ -15,35 +15,36 @@ namespace fiphoboserver {
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class DbInterface {
public:
/* Interface functions */
virtual void db_put(ssize_t size, std::string objectID, int fd) = 0;
virtual void db_get(std::string, int fd) = 0;
/*
* virtual ssize_t db_getmd();
*/
std::future<int> get_db_result() { return std::move(db_result); }
protected:
std::future<int> db_result;
public:
/* Interface functions */
virtual int db_put(
ssize_t size, std::string objectID, std::string fileName) = 0;
virtual int db_get(std::string objectID, std::string fileName) = 0;
/*
* virtual ssize_t db_getmd();
*/
void openFile(std::string fileName, int flags);
void closeFile();
protected:
int file_descriptor = -1;
};
class PhobosDb : public DbInterface {
public:
PhobosDb();
~PhobosDb();
void db_put(ssize_t size, std::string objectID, int fd);
void db_get(std::string, int fd);
/*
* ssize_t db_getmd();
*/
void setBucketName(std::string bucketName);
void setMetaData(
std::vector<std::pair<std::string, std::string>> metaData);
private:
struct pho_xfer_desc descriptor = {0};
public:
PhobosDb();
~PhobosDb();
int db_put(ssize_t size, std::string objectID, std::string fileName);
int db_get(std::string, std::string fileName);
/*
* ssize_t db_getmd();
*/
void setBucketName(std::string bucketName);
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
private:
struct pho_xfer_desc descriptor = {0};
};
} // namespace fiphoboserver
} // namespace fiphoboserver
#pragma once
#include <cstring>
#include <sstream>
#include <string>
#include <cstring>
namespace fiphoexceptions {
......
......@@ -2,20 +2,34 @@
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <poll.h>
namespace fiphoboserver {
void FIFOGet::openFIFO()
{
#ifdef DEBUG
std::cout << "FIFOGet::openFIFO" << std::endl;
#endif
fifo_descriptor = open(fifoName.c_str(), O_RDONLY);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
ssize_t FIFOGet::getDataFromFIFO(void* buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
std::cout << "FIFOGet::getDataFromFIFO" << std::endl;
#endif
ssize_t rc = read(fifo_descriptor, buf, count);
if (rc < 0) {
......@@ -24,10 +38,6 @@ ssize_t FIFOGet::getDataFromFIFO(void* buf, size_t count)
#endif
throw fiphoexceptions::FIFOException("read", rc);
}
#ifdef DEBUG
std::cout << "Count = " << rc << std::endl;
#endif
return rc;
}
......
#include "fifo_ops.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <iostream>
#include <sys/stat.h>
namespace fiphoboserver {
......@@ -11,6 +11,9 @@ namespace fiphoboserver {
FIFO::FIFO()
{
#ifdef DEBUG
std::cout << "FIFO::FIFO" << std::endl;
#endif
startFIFO();
}
......@@ -33,9 +36,6 @@ void FIFO::startFIFO()
std::cout << "FIFO::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
#ifdef DEBUG
std::cout << "FIFO name: " << fifoName << std::endl;
#endif
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
......@@ -44,15 +44,6 @@ void FIFO::startFIFO()
return;
}
}
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
} // namespace fiphoboserver
} // namespace fiphoboserver
#pragma once
#include <future>
#include <folly/io/IOBuf.h>
#include "FiPhoExceptions.h"
#include <folly/io/IOBuf.h>
#include <future>
namespace fiphoboserver {
......@@ -10,7 +10,8 @@ class FIFO {
public:
FIFO();
void setBucketName(std::string bucketName);
int get_fifo_descriptor() { return fifo_descriptor; }
std::string getFifoName() { return fifoName; }
virtual void openFIFO() = 0;
~FIFO();
protected:
......@@ -22,6 +23,7 @@ class FIFO {
class FIFOPut : public FIFO {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void openFIFO();
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
......@@ -30,13 +32,13 @@ class FIFOPut : public FIFO {
bool putRunning = false;
};
class FIFOGet: public FIFO {
class FIFOGet : public FIFO {
public:
void openFIFO();
ssize_t getDataFromFIFO(void* buf, size_t count);
/*
* finishGet();
*/
};
} // namespace fiphoboserver
} // namespace fiphoboserver
......@@ -2,15 +2,29 @@
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
void FIFOPut::openFIFO()
{
#ifdef DEBUG
std::cout << "FIFOPut::openFIFO" << std::endl;
#endif
fifo_descriptor = open(fifoName.c_str(), O_WRONLY);
if (fifo_descriptor < 0) {
std::stringstream ss;
ss << "open with filename " << fifoName.c_str();
throw fiphoexceptions::FIFOException(ss.str(), errno);
return;
}
}
void FIFOPut::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* * This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
......@@ -25,9 +24,6 @@ namespace fiphoboserver {
void GetRequestHandler::onRequest(
std::unique_ptr<proxygen::HTTPMessage> headers) noexcept
{
#ifdef DEBUG
std::cout << "Entered GetRequestHandler::onRequest" << std::endl;
#endif
if (headers->getMethod() != proxygen::HTTPMethod::GET) {
proxygen::ResponseBuilder(downstream_)
.status(400, "Bad method")
......@@ -49,7 +45,6 @@ void GetRequestHandler::onRequest(
s3_header.setHeaders(std::move(headers));
try {
#ifdef DEBUG
......@@ -60,9 +55,15 @@ void GetRequestHandler::onRequest(
backend.setBucketName(s3_header.getBucket());
#ifdef DEBUG
std::cout << "Send get to phobos layer" << std::endl;
std::cout << "Creating backend GET thread" << std::endl;
#endif
backend.db_get(s3_header.getKey(), fifo.get_fifo_descriptor());
db_result = std::async(
[this, key = s3_header.getKey(), fifoName = fifo.getFifoName()]() {
return backend.db_get(key, fifoName);
});
fifo.openFIFO();
file_closed_ = false;
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -91,11 +92,17 @@ void GetRequestHandler::onRequest(
void GetRequestHandler::readFile(folly::EventBase* evb)
{
#ifdef DEBUG
std::cout << "GetRequestHandler::readFile" << std::endl;
#endif
folly::IOBufQueue buf;
while (!paused_) {
while (!file_closed_ && !paused_) {
// read 4k-ish chunks and foward each one to the client
auto data = buf.preallocate(4000, 4000);
auto rc = fifo.getDataFromFIFO(data.first, data.second);
#ifdef DEBUG
std::cout << "getDataFromFIFO: rc = " << rc << std::endl;
#endif
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
......@@ -107,6 +114,7 @@ void GetRequestHandler::readFile(folly::EventBase* evb)
}
else if (rc == 0) {
// done
file_closed_ = true;
VLOG(4) << "Read EOF";
evb->runInEventBaseThread([this] {
proxygen::ResponseBuilder(downstream_).sendWithEOM();
......@@ -160,7 +168,10 @@ void GetRequestHandler::onBody(std::unique_ptr<folly::IOBuf> /*body*/) noexcept
// ignore, only support GET
}
void GetRequestHandler::onEOM() noexcept {}
void GetRequestHandler::onEOM() noexcept
{
db_result.wait();
}
void GetRequestHandler::onUpgrade(
proxygen::UpgradeProtocol /*protocol*/) noexcept
......
......@@ -51,6 +51,8 @@ class GetRequestHandler : public proxygen::RequestHandler {
bool readFileScheduled_{false};
std::atomic<bool> paused_{false};
bool finished_{false};
bool file_closed_{false};
std::future<int> db_result;
};
} // namespace fiphoboserver
......@@ -38,7 +38,7 @@ void PutRequestHandler::onRequest(
#endif
#ifdef DEBUG
std::cout << "Starting Create Bucket queries..." << std::endl;
std::cout << "Creating Bucket queries..." << std::endl;
#endif
s3_header.setHeaders(std::move(headers));
if (s3_header.isCreateBucketRequest()) {
......@@ -51,7 +51,6 @@ void PutRequestHandler::onRequest(
try {
#ifdef DEBUG
std::cout << "Bucket: " << s3_header.getBucket() << '\n';
std::cout << "Key: " << s3_header.getKey() << '\n';
......@@ -68,14 +67,14 @@ void PutRequestHandler::onRequest(
backend.setMetaData(s3_header.getMetaData());
#ifdef DEBUG
std::cout << "Send put to phobos layer asynchronously" << std::endl;
#endif
backend.db_put(
s3_header.getBodyLength(), s3_header.getKey(), fifo.get_fifo_descriptor());
#ifdef DEBUG
std::cout << "Past asynchronous put call" << std::endl;
std::cout << "Creating backend PUT thread" << std::endl;
#endif
db_result = std::async([this]() {
return backend.db_put(
s3_header.getBodyLength(), s3_header.getKey(),
fifo.getFifoName());
});
fifo.openFIFO();
}
catch (const std::system_error& ex) {
proxygen::ResponseBuilder(downstream_)
......@@ -102,7 +101,7 @@ void PutRequestHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept
if (body) {
body->coalesce();
size_t totalBodyLength = body->computeChainDataLength();
std::cout << "Body Data Length " << totalBodyLength << '\n';
std::cout << "Body Data Length = " << totalBodyLength << '\n';
}
#endif
try {
......@@ -125,7 +124,7 @@ void PutRequestHandler::onEOM() noexcept
#ifdef DEBUG
std::cout << "Finish IO..." << std::endl;
#endif
backend.get_db_result().wait();
db_result.wait();
fifo.finishPut();
}
catch (fiphoexceptions::FIFOException& fifoExcp) {
......
......@@ -40,6 +40,7 @@ class PutRequestHandler : public proxygen::RequestHandler {
S3_header s3_header;
PhobosDb backend;
FIFOPut fifo;
std::future<int> db_result;
};
} // namespace fiphoboserver
......@@ -7,7 +7,7 @@
*/
#include <folly/Memory.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/init/Init.h>
#include <folly/io/async/EventBaseManager.h>
......@@ -51,13 +51,13 @@ class HandlerFactory : public proxygen::RequestHandlerFactory {
#endif
if (headers->getMethod() == proxygen::HTTPMethod::GET) {
#ifdef DEBUG
std::cout << "Chose GET" << std::endl;
std::cout << "GET" << std::endl;
#endif
return new fiphoboserver::GetRequestHandler;
}
else if (headers->getMethod() == proxygen::HTTPMethod::PUT) {
#ifdef DEBUG
std::cout << "Chose PUT" << std::endl;
std::cout << "PUT" << std::endl;
#endif
return new fiphoboserver::PutRequestHandler;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment