Commit 2a3cb78f authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

Blocking files obsolete now

parent af247189
#include "fiphobo_blocking.h"
#include <cstdio>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace fiphoboserver {
FiPhobo::FiPhobo()
{
memset(&descriptor, 0, sizeof(descriptor));
descriptor.xd_attrs = {0};
}
void FiPhobo::setBucketName(std::string bucketName)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
if (rc) {
printf("pho_attr_set returned an error: %d\n", rc);
return;
}
}
void FiPhobo::setMetaData(
std::vector<std::pair<std::string, std::string>> metaData)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::setMetaData" << std::endl;
#endif
std::for_each(
metaData.begin(), metaData.end(),
[&](std::pair<std::string, std::string> pair) {
int rc = pho_attr_set(
&descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
if (rc) {
printf("pho_attr_set returned an error: %d\n", rc);
return;
}
});
}
void FiPhobo::startPutOperation(int size, std::string objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startPutOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_PUT;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
descriptor.xd_size = size;
// #ifdef DEBUG
// std::cout << "Starting async phobos_put_cpp" << std::endl;
// #endif
// phobos_result = std::async([&](){
// return phobos_put_cpp(&descriptor, 1, NULL, NULL);
// });
//
// TODOOOO
// delete [] unconstedObjectID;
}
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
// if(ioRunning)
// {
// int ioRC = io_result.get();
// if(ioRC < 0)
// {
// //TODO error handling
// }
// }
// ioRunning = true;
//
// buffer->coalesce();
// io_result = std::async([&](){
// return write(fifo_descriptor, buffer->data(), buffer->length());
// });
size_t written = write(fifo_descriptor, buffer->data(), buffer->length());
}
void FiPhobo::startGetOperation(std::string& objectID)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
startFIFO();
char* unconstedObjectID = new char[objectID.length() + 1];
strcpy(unconstedObjectID, objectID.c_str());
descriptor.xd_op = PHO_XFER_OP_GET;
descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
// phobos_result = std::async([&](){
// return phobos_get_cpp(&descriptor, 1, NULL, NULL);
// });
}
std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
// if(ioRunning)
// {
// int ioRC = io_result.get();
// if(ioRC < 0)
// {
// //TODO error handling
// }
// }
// ioRunning = true;
//
// io_result = std::async([&](){
// buffer->unshare();
// return read(fifo_descriptor, buffer->writableData(), size);
// });
int phobos_return = phobos_get_cpp(&descriptor, 1, NULL, NULL);
if (phobos_return < 0) {
std::cout << "GET: schade schade schade " << std::endl;
// error handling
}
lseek(fifo_descriptor, 0, SEEK_SET);
buffer->unshare();
read(fifo_descriptor, buffer->writableData(), size);
return std::move(buffer);
}
void FiPhobo::finishIO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::finishIO" << std::endl;
#endif
// int ioRC = io_result.get();
// if(ioRC < 0)
// {
// //TODO error handling
// }
//
// int phobosRC = phobos_result.get();
// if(phobosRC)
// {
// //TODO error handling!
// }
// ioRunning = false;
if (descriptor.xd_op == PHO_XFER_OP_PUT) {
lseek(fifo_descriptor, 0, SEEK_SET);
int phobos_return = phobos_put_cpp(&descriptor, 1, NULL, NULL);
if (phobos_return < 0) {
std::cout << "PUT: schade schade schade " << std::endl;
// error handling
}
}
}
FiPhobo::~FiPhobo()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
// pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
// desc_destroy??
pho_xfer_desc_destroy_cpp(&descriptor);
if (fifo_descriptor > 0) {
close(fifo_descriptor);
}
// if( access( fifoName.c_str(), F_OK ) != -1 )
// {
// remove(fifoName.c_str());
// }
}
void FiPhobo::startFIFO()
{
#ifdef DEBUG
std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
// int rc = mkfifo(fifoName.c_str(), 0777);
// if(rc)
// {
// printf("mkfifo returned an error: %s\n", std::strerror(errno));
// return;
// }
fifo_descriptor = open(fifoName.c_str(), O_RDWR);
if (fifo_descriptor < 0) {
printf("Could not open file: %s\n", fifoName);
return;
}
}
} // namespace fiphoboserver
#pragma once
#include <future>
#include <vector>
#include <folly/io/IOBuf.h>
extern "C" {
#include "phobos_cpp_wrapper/phobos_cpp_wrapper.h"
}
namespace fiphoboserver {
// TODO: Remember the m-versions: PUT and GET functions can work on more than
// one file. Then this structure is stupid. Maybe with a vector of descriptors?
class FiPhobo {
public:
FiPhobo();
void setBucketName(std::string bucketName);
void setMetaData(std::vector<std::pair<std::string, std::string>> metaData);
void startPutOperation(int size, std::string objectID);
void addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer);
void startGetOperation(std::string& objectID);
std::unique_ptr<folly::IOBuf> getDataFromFIFO(size_t size);
void finishIO();
~FiPhobo();
private:
void startFIFO();
struct pho_xfer_desc descriptor = {0};
std::string fifoName = "/tmp/fiphoboserver_fifotemp";
int fifo_descriptor = -1;
std::future<int> phobos_result;
std::future<long int> io_result;
bool ioRunning = false;
};
} // namespace fiphoboserver
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment