fiphobo.cc 5.79 KB
Newer Older
1
2
#include "fiphobo.h"

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
3
4
#include <cstdio>
#include <fcntl.h>
5
#include <iostream>
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
6
7
#include <stdio.h>
#include <stdlib.h>
8
9
#include <sys/stat.h>
#include <sys/types.h>
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
10
#include <unistd.h>
11

12
13
#include "FiPhoExceptions.h"

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
14
namespace fiphoboserver {
15
16
17
18
19
20
21
22
23
24
25
26
27

FiPhobo::FiPhobo()
{
    memset(&descriptor, 0, sizeof(descriptor));
    descriptor.xd_attrs = {0};
}

void FiPhobo::setBucketName(std::string bucketName)
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::setBucketName" << std::endl;
#endif
    int rc = pho_attr_set(&descriptor.xd_attrs, "bucket", bucketName.c_str());
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
28
    if (rc) {
29
        throw fiphoexceptions::PhobosException("pho_attr_set", rc);
30
31
32
    }
}

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
33
34
void FiPhobo::setMetaData(
    std::vector<std::pair<std::string, std::string>> metaData)
35
36
37
38
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::setMetaData" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
39
40
41
42
43
44
    std::for_each(
        metaData.begin(), metaData.end(),
        [&](std::pair<std::string, std::string> pair) {
            int rc = pho_attr_set(
                &descriptor.xd_attrs, pair.first.c_str(), pair.second.c_str());
            if (rc) {
45
                throw fiphoexceptions::PhobosException("pho_attr_set", rc);
46
47
48
49
50
51
52
53
54
55
56
            }
        });
}

void FiPhobo::startPutOperation(unsigned int size, std::string objectID)
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::startPutOperation" << std::endl;
#endif
    startFIFO();

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
57
    char* unconstedObjectID = new char[objectID.length() + 1];
58
    strcpy(unconstedObjectID, objectID.c_str());
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
59
60

    descriptor.xd_op    = PHO_XFER_OP_PUT;
61
    descriptor.xd_objid = unconstedObjectID;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
62
63
    descriptor.xd_fd    = fifo_descriptor;
    descriptor.xd_size  = size;
64
65
66
67

#ifdef DEBUG
    std::cout << "Starting async phobos_put_cpp" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
68
69
70
    phobos_result = std::async([&]() {
        std::cout << "Phobos Thread id " << std::this_thread::get_id()
                  << std::endl;
71
72
        return phobos_put_cpp(&descriptor, 1, NULL, NULL);
    });
73
    std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
74
75

    // TODOOOO
76
77
78
    // delete [] unconstedObjectID;
}

79

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
80
81
82
static long int fifoWriteTemp(
    std::unique_ptr<folly::IOBuf> buffer, int fifo_descriptor)
{
83
84
85
86
    std::cout << "IO Thread id " << std::this_thread::get_id() << std::endl;
    return write(fifo_descriptor, buffer->data(), buffer->length());
}

87
88
89
90
91
void FiPhobo::addDataToFIFO(std::unique_ptr<folly::IOBuf> buffer)
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::addDataToFIFO" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
92
    if (putRunning) {
93
        int ioRC = put_result.get();
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
94
        if (ioRC < 0) {
95
            throw fiphoexceptions::FIFOException("write", ioRC);
96
97
        }
    }
98
    putRunning = true;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
99
100
101
102
103
104

    if (buffer) {
        put_result =
            std::async([fd = fifo_descriptor, buffer = std::move(buffer)] {
                return write(fd, buffer->data(), buffer->length());
            });
105
106
        // io_result.wait();
    }
107
108
}

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
109

110
111
112
113
114
115
void FiPhobo::finishPUT()
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::finishPUT" << std::endl;
#endif
    int ioRC = put_result.get();
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
116
    if (ioRC < 0) {
117
118
119
120
        throw fiphoexceptions::FIFOException("write", ioRC);
    }

    int phobosRC = phobos_result.get();
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
121
    if (phobosRC) {
122
123
        throw fiphoexceptions::PhobosException("pho_put", phobosRC);
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
124

125
126
127
    putRunning = false;
}

128
void FiPhobo::startGetOperation(std::string objectID)
129
130
131
132
133
134
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::startGetOperation" << std::endl;
#endif
    startFIFO();

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
135
    char* unconstedObjectID = new char[objectID.length() + 1];
136
    strcpy(unconstedObjectID, objectID.c_str());
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
137
138

    descriptor.xd_op    = PHO_XFER_OP_GET;
139
    descriptor.xd_objid = unconstedObjectID;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
140
    descriptor.xd_fd    = fifo_descriptor;
141

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
142
143
144
#ifdef DEBUG
    std::cout << "Starting async phobos_get_cpp" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
145
    phobos_result = std::async(
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
146
        [&]() { return phobos_get_cpp(&descriptor, 1, NULL, NULL); });
147
148
149
150
151
152
153
}

std::unique_ptr<folly::IOBuf> FiPhobo::getDataFromFIFO(size_t size)
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::getDataFromFIFO" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
154
155

    get_result = std::async([&] {
156
157
        std::unique_ptr<folly::IOBuf> buffer = folly::IOBuf::create(size);
        int readRC = read(fifo_descriptor, buffer->writableData(), size);
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
158
        if (readRC < 0) {
159
            throw fiphoexceptions::FIFOException("read", readRC);
160
        }
161
        return std::move(buffer);
162
    });
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
163

164
    int phobosRC = phobos_result.get();
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
165
    if (phobosRC) {
166
167
        throw fiphoexceptions::PhobosException("pho_get", phobosRC);
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
168

169
    std::unique_ptr<folly::IOBuf> buffer;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
170
    try {
171
172
        buffer = std::move(get_result.get());
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
173
    // TODO: Does this make any sense or is it ok if I don't catch this here but
174
    // let the caller catch it - what he has to do anyway?
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
175
    catch (fiphoexceptions::FIFOException& fifoException) {
176
        throw fifoException;
177
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
178

179
    return std::move(buffer);
180
181
182
183
184
185
186
}

FiPhobo::~FiPhobo()
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::~Phobos_Layer" << std::endl;
#endif
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
187
188
189
190
191
    // pho_attrs_free(&descriptor.xd_attrs); // TODO: Is that done in
    // desc_destroy??
    pho_xfer_desc_destroy_cpp(&descriptor);
    if (fifo_descriptor > 0) {
        close(fifo_descriptor);
192
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
193
    if (access(fifoName.c_str(), F_OK) != -1) {
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
194
        remove(fifoName.c_str());
195
196
    }
}
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
197

198
199
200
201
202
203
void FiPhobo::startFIFO()
{
#ifdef DEBUG
    std::cout << "Phobos_Layer::startFIFO" << std::endl;
#endif
    int rc = mkfifo(fifoName.c_str(), 0777);
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
204
205
    if (rc < 0) {
        if (errno != EEXIST) {
206
207
208
            throw fiphoexceptions::FIFOException("mkfifo", errno);
            return;
        }
209
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
210

Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
211
    fifo_descriptor = open(fifoName.c_str(), O_RDWR);
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
212
    if (fifo_descriptor < 0) {
213
        std::stringstream ss;
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
214
        ss << "open with filename " << fifoName.c_str();
215
        throw fiphoexceptions::FIFOException(ss.str(), errno);
216
217
        return;
    }
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
218

219
220
221
222
#ifdef DEBUG
    std::cout << "End of Phobos_Layer::startFIFO" << std::endl;
#endif
}
Ciarán Ó Rourke's avatar
Ciarán Ó Rourke committed
223
}  // namespace fiphoboserver