Commit 9ff8651d authored by Ciarán Ó Rourke's avatar Ciarán Ó Rourke
Browse files

GET works

parent 899241f9
......@@ -140,53 +140,43 @@ void FiPhobo::startGetOperation(std::string objectID)
descriptor.xd_op = PHO_XFER_OP_GET;
// descriptor.xd_objid = unconstedObjectID;
descriptor.xd_fd = fifo_descriptor;
int rc = phobos_getmd_cpp(&descriptor, 1, NULL, NULL);
descriptor.xd_size = 20662;
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
std::cout << "Starting async phobos_get_cpp" << std::endl;
std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
phobos_result = std::async(
[&]() {
std::cout << "Phobos Thread id " << std::this_thread::get_id()
<< std::endl;
return phobos_get_cpp(&descriptor, 1, NULL, NULL); });
#ifdef DEBUG
std::cout << "Phobos object size = " << descriptor.xd_size << std::endl;
#endif
}
std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO()
ssize_t FiPhobo::getDataFromFIFO(void *buf, size_t count)
{
#ifdef DEBUG
std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
ssize_t size = descriptor.xd_size;
get_result = std::async([&] {
std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
int readRC = read(fifo_descriptor, buffer->writableData(), size);
if (readRC < 0) {
throw fiphoexceptions::FIFOException("read", readRC);
auto readRC = std::async([fd = fifo_descriptor, buffer = buf, size = count] () {
ssize_t rc = read(fd, buffer, size);
if (rc < 0) {
#ifdef DEBUG
std::cout << "getDataFromFIFO read failed" << std::endl;
#endif
throw fiphoexceptions::FIFOException("read", rc);
}
return std::move(buffer);
#ifdef DEBUG
std::cout << "Count = " << rc << std::endl;
#endif
return rc;
});
int phobosRC = phobos_result.get();
if (phobosRC) {
throw fiphoexceptions::PhobosException("pho_get", phobosRC);
}
std::unique_ptr<folly::IOBuf> buffer;
try {
buffer = std::move(get_result.get());
}
// TODO: Does this make any sense or is it ok if I don't catch this here but
// let the caller catch it - what he has to do anyway?
catch (fiphoexceptions::FIFOException& fifoException) {
throw fifoException;
}
return std::move(buffer);
readRC.wait();
return readRC.get();
}
FiPhobo::~FiPhobo()
......
......@@ -29,7 +29,7 @@ class FiPhobo {
void startGetOperation(std::string objectID);
std::unique_ptr<folly::IOBuf> getDataFromFIFO();
ssize_t getDataFromFIFO(void *buf, size_t count);
~FiPhobo();
......
......@@ -89,33 +89,54 @@ void GetRequestHandler::onRequest(
folly::EventBaseManager::get()->getEventBase()));
}
void GetRequestHandler::readFile(folly::EventBase* evb)
{
#ifdef DEBUG
std::cout << "Getting data from FIFO... " << std::endl;
#endif
std::unique_ptr<folly::IOBuf> buf;
try
{
buf = phobos.getDataFromFIFO();
}
catch(fiphoexceptions::FIFOException& fifoExcp)
{
proxygen::ResponseBuilder(downstream_)
.status(409, "Conflict")
.body(fifoExcp.what())
.sendWithEOM();
return;
void GetRequestHandler::readFile(folly::EventBase* evb) {
folly::IOBufQueue buf;
while (!paused_) {
// read 4k-ish chunks and foward each one to the client
auto data = buf.preallocate(4000, 4000);
auto rc = phobos.getDataFromFIFO(data.first, data.second);
if (rc < 0) {
// error
VLOG(4) << "Read error=" << rc;
evb->runInEventBaseThread([this] {
LOG(ERROR) << "Error reading file";
downstream_->sendAbort();
});
break;
} else if (rc == 0) {
// done
VLOG(4) << "Read EOF";
evb->runInEventBaseThread([this] {
proxygen::ResponseBuilder(downstream_)
.sendWithEOM();
});
break;
} else if (rc < 4000) {
// done
VLOG(4) << "Read EOF";
buf.postallocate(rc);
evb->runInEventBaseThread([this, body=buf.move()] () mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.sendWithEOM();
});
break;
} else {
buf.postallocate(rc);
evb->runInEventBaseThread([this, body=buf.move()] () mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.send();
});
}
}
#ifdef DEBUG
std::cout << "Sending data" << std::endl;
#endif
evb->runInEventBaseThread([this, body=std::move(buf)] () mutable {
proxygen::ResponseBuilder(downstream_)
.body(std::move(body))
.sendWithEOM();
// Notify the request thread that we terminated the readFile loop
evb->runInEventBaseThread([this] {
readFileScheduled_ = false;
if (!checkForCompletion() && !paused_) {
VLOG(4) << "Resuming deferred readFile";
onEgressResumed();
}
});
}
......
......@@ -62,6 +62,9 @@ class HandlerFactory : public proxygen::RequestHandlerFactory {
return new fiphoboserver::PushRequestHandler;
}
else {
#ifdef DEBUG
std::cout << "Unsupported Request" << std::endl;
#endif
return new fiphoboserver::UnsupportedRequestHandler;
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment