Commit 9a5ea483 authored by Sebastien Gougeaud's avatar Sebastien Gougeaud Committed by Thomas Leibovici
Browse files

refactor: aggregate XFer PUT parameters in a new structure



To avoid the xfer_desc structure pollution with an increasing set
of parameters (with the incoming addition of family and layout options),
the PUT-only parameters are now aggregated in a pho_xfer_put_params
data structure.

This new data structure is part of a union in the xfer_desc structure to
allow the future utilization of GET/GETMD sets of parameters.

Change-Id: I0063611df3c87bf53c8c1d783741112a6a2faac0
Signed-off-by: default avatarSebastien Gougeaud <sebastien.gougeaud@cea.fr>
Reviewed-on: https://cws-fleury.labs.ocre.cea.fr/gerrit/6663


Reviewed-by: Linter
Reviewed-by: default avatarPatrice LUCAS <patrice.lucas@cea.fr>
Tested-by: default avatarJenkins s8open_nr <s8open_nr@ccc.ocre.cea.fr>
Reviewed-by: default avatarThomas Leibovici <thomas.leibovici@cea.fr>
parent c73783a7
......@@ -51,7 +51,7 @@ from phobos.core.dss import Client as DSSClient
from phobos.core.ffi import DevInfo, MediaInfo, ResourceFamily
from phobos.core.ldm import LibAdapter
from phobos.core.log import LogControl, DISABLED, WARNING, INFO, VERBOSE, DEBUG
from phobos.core.store import Client as XferClient, attrs_as_dict
from phobos.core.store import Client as XferClient, attrs_as_dict, PutParams
from phobos.output import dump_object_list
def phobos_log_handler(log_record):
......@@ -319,14 +319,13 @@ class StorePutHandler(StoreGenericPutHandler):
attrs = attr_convert(attrs)
self.logger.debug("Loaded attributes set %r", attrs)
tags = self.params.get('tags', [])
family = self.params.get('family')
layout = self.params.get('layout')
put_params = PutParams(self.params.get('family'),
self.params.get('layout'),
self.params.get('tags', []))
self.logger.debug("Inserting object '%s' to 'objid:%s'", src, oid)
self.client.put_register(oid, src, family=family, layout=layout,
attrs=attrs, tags=tags)
self.client.put_register(oid, src, attrs=attrs, put_params=put_params)
try:
self.client.run()
except IOError as err:
......@@ -356,9 +355,9 @@ class StoreMPutHandler(StoreGenericPutHandler):
else:
fin = open(path)
tags = self.params.get('tags', [])
family = self.params.get('family', [])
layout = self.params.get('layout', [])
put_params = PutParams(self.params.get('family'),
self.params.get('layout'),
self.params.get('tags', []))
for i, line in enumerate(fin):
# Skip empty lines and comments
......@@ -382,8 +381,8 @@ class StoreMPutHandler(StoreGenericPutHandler):
self.logger.debug("Loaded attributes set %r", attrs)
self.logger.debug("Inserting object '%s' to 'objid:%s'", src, oid)
self.client.put_register(oid, src, family=family, layout=layout,
attrs=attrs, tags=tags)
self.client.put_register(oid, src, attrs=attrs,
put_params=put_params)
if fin is not sys.stdin:
fin.close()
......
......@@ -27,6 +27,7 @@ import errno
import logging
import os
from collections import namedtuple
from ctypes import *
from phobos.core.ffi import LIBPHOBOS, Tags
......@@ -69,18 +70,48 @@ def attrs_as_dict(attrs):
LIBPHOBOS.pho_attrs_foreach(byref(attrs), cb, c_res)
return res
class XferPutParams(Structure):
"""Phobos PUT parameters of the XferDescriptor."""
_fields_ = [
("size", c_ssize_t),
("family", c_int),
("layout_name", c_char_p),
("tags", Tags),
]
def __init__(self, put_params):
self.size = -1
self.layout_name = put_params.layout
self.tags = Tags(put_params.tags)
if put_params.family is None:
self.family = str2rsc_family(cfg_get_val("store", "default_family"))
else:
self.family = str2rsc_family(put_params.family)
class PutParams(namedtuple('PutParams', 'family layout tags')):
"""
Transition data structure for put parameters between
the CLI and the XFer data structure.
"""
__slots__ = ()
PutParams.__new__.__defaults__ = (None,) * len(PutParams._fields)
class XferOpParams(Union):
"""Phobos operation parameters of the XferDescriptor."""
_fields_ = [
("put", XferPutParams),
]
class XferDescriptor(Structure):
"""phobos struct xfer_descriptor."""
_fields_ = [
("xd_objid", c_char_p),
("xd_op", c_int),
("xd_fd", c_int),
("xd_size", c_ssize_t),
("xd_layout_name", c_char_p),
("xd_family", c_int),
("xd_attrs", PhoAttrs),
("xd_params", XferOpParams),
("xd_flags", c_int),
("xd_tags", Tags),
("xd_rc", c_int),
]
......@@ -105,7 +136,6 @@ class XferDescriptor(Structure):
# in case of getmd, the file is not opened, return without exception
if self.xd_op == PHO_XFER_OP_GETMD:
self.xd_fd = -1
self.xd_size = -1
return
if not path:
......@@ -122,25 +152,26 @@ class XferDescriptor(Structure):
raise e
self.xd_fd = os.open(path, os.O_RDONLY)
self.xd_size = os.fstat(self.xd_fd).st_size
self.xd_params.put.size = os.fstat(self.xd_fd).st_size
def init_from_descriptor(self, desc):
"""
xfer_descriptor initialization by using python-list descriptor.
It opens the file descriptor of the given path. The python-list
contains the tuple (id, path, attrs, flags, tags, op) describing
the opened file.
contains the tuple (id, path, attrs, flags, op and put-only parameters)
describing the opened file.
"""
self.xd_op = desc[5]
if self.xd_op == PHO_XFER_OP_PUT:
self.xd_params.put = XferPutParams(desc[4]) if \
desc[4] is not None else \
XferPutParams(PutParams())
self.xd_objid = desc[0]
self.xd_op = desc[7]
self.xd_layout_name = desc[3]
self.xd_family = desc[2]
self.xd_flags = desc[5]
self.xd_tags = Tags(desc[6])
self.xd_flags = desc[3]
self.xd_rc = 0
if desc[4]:
for k, v in desc[4].iteritems():
if desc[2]:
for k, v in desc[2].iteritems():
rc = LIBPHOBOS.pho_attr_set(byref(self.xd_attrs),
str(k), str(v))
if rc:
......@@ -219,22 +250,19 @@ class Client(object):
def getmd_register(self, oid, data_path, attrs=None):
"""Enqueue a GETMD transfer."""
self.getmd_session.append((oid, data_path, -1, 0, attrs, 0, None,
PHO_XFER_OP_GETMD))
self.getmd_session.append((oid, data_path, attrs, 0, None,
PHO_XFER_OP_GETMD))
def get_register(self, oid, data_path, attrs=None):
"""Enqueue a GET transfer."""
self.get_session.append((oid, data_path, -1, 0, attrs, 0, None,
self.get_session.append((oid, data_path, attrs, 0, None,
PHO_XFER_OP_GET))
def put_register(self, oid, data_path, family=None, layout=None, attrs=None,
tags=None):
def put_register(self, oid, data_path, attrs=None,
put_params=PutParams()):
"""Enqueue a PUT transfert."""
if family is None:
family = cfg_get_val("store", "default_family")
self.put_session.append((oid, data_path, str2rsc_family(family), layout,
attrs, 0, tags, PHO_XFER_OP_PUT))
self.put_session.append((oid, data_path, attrs, 0, put_params,
PHO_XFER_OP_PUT))
def clear(self):
"""Release resources associated to the current queues."""
......
......@@ -27,7 +27,7 @@ import unittest
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from phobos.core.const import PHO_XFER_OP_GETMD
from phobos.core.const import PHO_XFER_OP_GETMD, PHO_XFER_OP_PUT
from phobos.core.store import XferDescriptor
class FileDescTest(unittest.TestCase):
......@@ -47,8 +47,9 @@ class FileDescTest(unittest.TestCase):
def _getsize_good(self, path, size):
xfr = XferDescriptor()
xfr.xd_op = PHO_XFER_OP_PUT
xfr.open_file(path)
self.assertEqual(xfr.xd_size, size)
self.assertEqual(xfr.xd_params.put.size, size)
os.close(xfr.xd_fd)
def test_open_good(self):
......
......@@ -50,41 +50,51 @@ enum pho_xfer_flags {
*/
typedef void (*pho_completion_cb_t)(void *u, const struct pho_xfer_desc *, int);
/**
* Phobos XFer operations.
*/
enum pho_xfer_op {
PHO_XFER_OP_PUT, /**< Put operation */
PHO_XFER_OP_GET, /**< Get operation */
PHO_XFER_OP_GETMD /**< Get metadata operation */
PHO_XFER_OP_PUT, /**< PUT operation. */
PHO_XFER_OP_GET, /**< GET operation. */
PHO_XFER_OP_GETMD /**< GET metadata operation. */
};
/**
* PUT parameters.
*/
struct pho_xfer_put_params {
ssize_t size; /**< Amount of data to write. */
enum rsc_family family; /**< Targeted resource family. */
const char *layout_name; /**< Name of the layout module to use. */
struct tags tags; /**< Tags to select a media to write. */
};
/**
* GET / PUT parameter.
* Operation parameters.
*/
union pho_xfer_params {
struct pho_xfer_put_params put; /**< PUT parameters. */
};
/**
* Xfer descriptor.
* The source/destination semantics of the fields vary
* depending on the nature of the operation.
* See below:
* - pĥobos_getmd()
* - phobos_get()
* - phobos_put()
*/
struct pho_xfer_desc {
char *xd_objid; /**< Object id to read or write */
enum pho_xfer_op xd_op; /**< Operation to perform
* (GET, GETMD or PUT)
*/
int xd_fd; /**< positive fd if xd_id_open */
ssize_t xd_size; /**< Amount of data to write (for the GET
* operation, the size read is equal to
* the size of the retrieved object)
*/
const char *xd_layout_name; /**< Name of the layout module to use
* (for put).
*/
enum rsc_family xd_family; /**< Targeted resource family (for PUT) */
struct pho_attrs xd_attrs; /**< User defined attribute to get / put */
enum pho_xfer_flags xd_flags; /**< See enum pho_xfer_flags doc */
struct tags xd_tags; /**< Tags to select a media to write */
int xd_rc; /**< Outcome of this xfer */
char *xd_objid; /**< Object ID to read or write. */
enum pho_xfer_op xd_op; /**< Operation to perform. */
int xd_fd; /**< FD of the source/destination. */
struct pho_attrs xd_attrs; /**< User defined attributes. */
union pho_xfer_params xd_params; /**< Operation parameters. */
enum pho_xfer_flags xd_flags; /**< See enum pho_xfer_flags doc. */
int xd_rc; /**< Outcome of this xfer. */
};
/**
* Put N files to the object store with minimal overhead.
* Each desc entry contains:
......
......@@ -795,7 +795,7 @@ static int raid1_enc_next_write_req(struct pho_encoder *enc, pho_req_t *req)
"write alloc");
for (i = 0; i < raid1->repl_count; ++i)
n_tags[i] = enc->xfer->xd_tags.n_tags;
n_tags[i] = enc->xfer->xd_params.put.tags.n_tags;
rc = pho_srl_request_write_alloc(req, raid1->repl_count, n_tags);
free(n_tags);
......@@ -805,8 +805,9 @@ static int raid1_enc_next_write_req(struct pho_encoder *enc, pho_req_t *req)
for (i = 0; i < raid1->repl_count; ++i) {
req->walloc->media[i]->size = raid1->to_write;
for (j = 0; j < enc->xfer->xd_tags.n_tags; ++j)
req->walloc->media[i]->tags[j] = strdup(enc->xfer->xd_tags.tags[j]);
for (j = 0; j < enc->xfer->xd_params.put.tags.n_tags; ++j)
req->walloc->media[i]->tags[j] =
strdup(enc->xfer->xd_params.put.tags.tags[j]);
}
return rc;
......@@ -1091,11 +1092,11 @@ static int layout_raid1_encode(struct pho_encoder *enc)
if (raid1->repl_count <= 0)
LOG_RETURN(-EINVAL, "Invalid # of replica (%d)", raid1->repl_count);
if (enc->xfer->xd_size < 0)
if (enc->xfer->xd_params.put.size < 0)
LOG_RETURN(-EINVAL, "bad input encoder size to write when building "
"raid1 encoder");
raid1->to_write = enc->xfer->xd_size;
raid1->to_write = enc->xfer->xd_params.put.size;
/* Allocate the extent array */
raid1->written_extents = g_array_new(FALSE, TRUE,
......
......@@ -367,14 +367,16 @@ static int simple_enc_next_write_req(struct pho_encoder *enc, pho_req_t *req)
int rc = 0, i;
/* Otherwise, generate the next request */
rc = pho_srl_request_write_alloc(req, 1, &enc->xfer->xd_tags.n_tags);
rc = pho_srl_request_write_alloc(req, 1,
&enc->xfer->xd_params.put.tags.n_tags);
if (rc)
return rc;
req->walloc->media[0]->size = simple->to_write;
for (i = 0; i < enc->xfer->xd_tags.n_tags; ++i)
req->walloc->media[0]->tags[i] = strdup(enc->xfer->xd_tags.tags[i]);
for (i = 0; i < enc->xfer->xd_params.put.tags.n_tags; ++i)
req->walloc->media[0]->tags[i] =
strdup(enc->xfer->xd_params.put.tags.tags[i]);
return rc;
}
......@@ -598,7 +600,7 @@ static int layout_simple_encode(struct pho_encoder *enc)
for (i = 0; i < enc->layout->ext_count; i++)
simple->to_write += enc->layout->extents[i].size;
} else {
ssize_t to_write = enc->xfer->xd_size;
ssize_t to_write = enc->xfer->xd_params.put.size;
if (to_write < 0)
return to_write;
......
......@@ -213,7 +213,7 @@ int layout_encode(struct pho_encoder *enc, struct pho_xfer_desc *xfer)
int rc;
/* Load new module if necessary */
rc = layout_module_lazy_load(xfer->xd_layout_name, &mod);
rc = layout_module_lazy_load(xfer->xd_params.put.layout_name, &mod);
if (rc)
return rc;
......@@ -233,7 +233,7 @@ int layout_encode(struct pho_encoder *enc, struct pho_xfer_desc *xfer)
if (enc->layout == NULL)
return -ENOMEM;
enc->layout->oid = xfer->xd_objid;
enc->layout->wr_size = xfer->xd_size;
enc->layout->wr_size = xfer->xd_params.put.size;
enc->layout->state = PHO_EXT_ST_PENDING;
rc = mod->ops->encode(enc);
......
......@@ -235,7 +235,7 @@ static int encoder_communicate(struct pho_encoder *enc,
pho_error(rc, "Error while communicating with encoder for %s",
enc->xfer->xd_objid);
family = enc->xfer->xd_family;
family = enc->xfer->xd_params.put.family;
/* Dispatch generated requests (even on error, if any) */
for (i = 0; i < n_reqs; i++) {
......@@ -793,11 +793,12 @@ int phobos_put(struct pho_xfer_desc *xfers, size_t n,
for (i = 0; i < n; i++) {
xfers[i].xd_op = PHO_XFER_OP_PUT;
if (xfers[i].xd_layout_name == NULL)
xfers[i].xd_layout_name = default_layout;
if (xfers[i].xd_family == PHO_RSC_INVAL)
xfers[i].xd_family = default_family_from_cfg();
if (xfers[i].xd_params.put.layout_name == NULL)
xfers[i].xd_params.put.layout_name = default_layout;
if (xfers[i].xd_params.put.family == PHO_RSC_INVAL)
xfers[i].xd_params.put.family = default_family_from_cfg();
}
return phobos_xfer(xfers, n, cb, udata);
......@@ -827,6 +828,7 @@ int phobos_getmd(struct pho_xfer_desc *xfers, size_t n,
void pho_xfer_desc_destroy(struct pho_xfer_desc *xfer)
{
tags_free(&xfer->xd_tags);
if (xfer->xd_op == PHO_XFER_OP_PUT)
tags_free(&xfer->xd_params.put.tags);
pho_attrs_free(&xfer->xd_attrs);
}
......@@ -504,7 +504,6 @@ function tape_drive_compat
rm /tmp/svc_lto5_from_lto5_drive
$phobos get svc_lto6 /tmp/svc_lto6_from_lto5_drive &&
error "getting data from lto6 tape with one lto5 drive must fail"
rm -f /tmp/svc_lto6_from_lto5_drive
#test with only one lto6 drive
lock_all_drives
......
......@@ -63,7 +63,6 @@ static int xfer_desc_open_path(struct pho_xfer_desc *xfer, const char *path,
if (path == NULL) {
xfer->xd_fd = -1;
xfer->xd_size = -1;
return -EINVAL;
}
......@@ -79,8 +78,10 @@ static int xfer_desc_open_path(struct pho_xfer_desc *xfer, const char *path,
if (xfer->xd_fd < 0)
LOG_RETURN(-errno, "open(%s) failed", path);
fstat(xfer->xd_fd, &st);
xfer->xd_size = st.st_size;
if (xfer->xd_op == PHO_XFER_OP_PUT) {
fstat(xfer->xd_fd, &st);
xfer->xd_params.put.size = st.st_size;
}
return xfer->xd_fd;
}
......
......@@ -74,8 +74,8 @@ int main(int argc, char **argv)
for (i = 2; i < argc; i++) {
xfer_desc_open_path(&xfer, argv[i], PHO_XFER_OP_PUT, 0);
xfer.xd_params.put.family = PHO_RSC_INVAL;
xfer.xd_objid = realpath(argv[i], NULL);
xfer.xd_family = PHO_RSC_INVAL;
xfer.xd_attrs = attrs;
rc = phobos_put(&xfer, 1, NULL, NULL);
......@@ -105,8 +105,8 @@ int main(int argc, char **argv)
for (i = 0; i < argc; i++) {
xfer_desc_open_path(xfer + i, argv[i], PHO_XFER_OP_PUT, 0);
xfer[i].xd_params.put.family = PHO_RSC_INVAL;
xfer[i].xd_objid = realpath(argv[i], NULL);
xfer[i].xd_family = PHO_RSC_INVAL;
xfer[i].xd_attrs = attrs;
}
......@@ -131,11 +131,11 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE);
xfer_desc_open_path(&xfer, argv[2], PHO_XFER_OP_PUT, 0);
xfer.xd_params.put.family = PHO_RSC_INVAL;
xfer.xd_params.put.tags.tags = &argv[3];
xfer.xd_params.put.tags.n_tags = argc - 3;
xfer.xd_objid = realpath(argv[2], NULL);
xfer.xd_family = PHO_RSC_INVAL;
xfer.xd_attrs = attrs;
xfer.xd_tags.tags = &argv[3];
xfer.xd_tags.n_tags = argc - 3;
rc = phobos_put(&xfer, 1, NULL, NULL);
xfer_desc_close_fd(&xfer);
......
......@@ -68,8 +68,9 @@ static void reinit_xfer(struct pho_xfer_desc *xfer, const char *path,
xfer_desc_open_path(xfer, path, op, 0);
xfer->xd_op = op;
xfer->xd_family = PHO_RSC_INVAL;
xfer->xd_objid = realpath(objpath, NULL);
if (op == PHO_XFER_OP_PUT)
xfer->xd_params.put.family = PHO_RSC_INVAL;
}
static void add_dir(struct admin_handle *adm, struct dss_handle *dss,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment