Commit dcb20739 authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Split put and get backend functions into separate files

parent 741edde0
#pragma once
#include <future>
#include <folly/io/IOBuf.h>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class Backend {
public:
Backend();
void setBucketName(std::string bucketName);
~Backend();
protected:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName;
int fifo_descriptor = -1;
std::future<int> phobos_result;
};
Backend::Backend()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
Backend::~Backend()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
if (access(fifoName.c_str(), F_OK) != -1) {
remove(fifoName.c_str());
}
}
void Backend::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
}
void Backend::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
fifoName = std::tmpnam(nullptr);
#ifdef DEBUG
std::cout << "FIFO name: " << fifoName << std::endl;
#endif
int rc = mkfifo(fifoName.c_str(), 0777);
if (rc < 0) {
if (errno != EEXIST) {
throw fiphoexceptions::FIFOException("mkfifo", errno);
return;
}
}
}
class PutBackend : public Backend {
public:
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(ssize_t size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void finishPut();
private:
std::future<long int> put_result;
bool putRunning = false;
};
class GetBackend : public Backend {
public:
void startGetOperation(std::string objectID);
ssize_t getDataFromFIFO(void* buf, size_t count);
};
} // namespace fiphoboserver
#include "fiphobo.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
namespace fiphoboserver {
void GetBackend::startGetOperation(std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
startFIFO();
descriptor.xd_objid = new char[objectID.length() + 1];
strcpy(descriptor.xd_objid, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = 20662;
#ifdef DEBUG
std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
phobos_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL);
});
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
#endif
}
ssize_t GetBackend::getDataFromFIFO(void* buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
ssize_t rc = read(fifo_descriptor, buf, count);
if (rc < 0) {
#ifdef DEBUG
std::cout << "getDataFromFIFO read failed" << std::endl;
#endif
throw fiphoexceptions::FIFOException("read", rc);
}
#ifdef DEBUG
std::cout << "Count = " << rc << std::endl;
#endif
return rc;
}
} // namespace fiphoboserver
#include "fiphobo.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "FiPhoExceptions.h"
namespace fiphoboserver {
void PutBackend::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setMetaData" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
throw fiphoexceptions::PhobosException("pho_attr_set", rc);
}
});
}
void PutBackend::startPutOperation(ssize_t size, std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = size;
#ifdef DEBUG
std::cout << "Starting async phobos_put_cpp" << std::endl;
#endif
phobos_result = std::async([&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_put_cpp(&descriptor, 1, NULL, NULL);
});
std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
// TODOOOO
// delete [] unconstedObjectID;
}
void PutBackend::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
if (putRunning) {
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
}
putRunning = true;
if (buffer) {
put_result =
std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
return write(fd, buffer->data(), buffer->length());
});
// io_result.wait();
}
}
void PutBackend::finishPut()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishPUT" << std::endl;
#endif
int ioRC = put_result.get();
if (ioRC < 0) {
throw fiphoexceptions::FIFOException("write", ioRC);
}
int phobosRC = phobos_result.get();
if (phobosRC) {
throw fiphoexceptions::PhobosException("pho_put", phobosRC);
}
putRunning = false;
}
} // namespace fiphoboserver
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment