Commit e8f402f0 authored by Sebastien Gougeaud's avatar Sebastien Gougeaud Committed by Thomas Leibovici
Browse files

phobosd: allow the utilization of multiple schedulers



Until now, the daemon could only manage one family of resources (tape or
directory) at a time. This family was defined using the
PHOBOS_LRS_default_family env variable.

Instead of setting only one scheduler, the daemon can now initialize a
scheduler per family, depending on those requested using the
PHOBOS_LRS_families env variable.

Users can now indicate to which family they send their data using the
PHOBOS_STORE_default_family env variable. This piece of information is
now a part of the write alloc request protocol.

Change-Id: I7851edddc8786b15e1df697d366834c4bf35b66f
Signed-off-by: default avatarSebastien Gougeaud <sebastien.gougeaud@cea.fr>
Reviewed-on: https://cws-fleury.labs.ocre.cea.fr/gerrit/6547


Reviewed-by: Linter
Tested-by: default avatarJenkins s8open_nr <s8open_nr@ccc.ocre.cea.fr>
Reviewed-by: default avatarQuentin Bouget <quentin.bouget@cea.fr>
Reviewed-by: default avatarThomas Leibovici <thomas.leibovici@cea.fr>
parent d8cc4ede
......@@ -265,18 +265,18 @@ To specify the default family where data is to be written, specify it in the
phobos configuration file:
```
[lrs]
[store]
# indicate 'tape' or 'dir' family
default_family = tape
family = tape
```
Alternativelly, you can override this value in the environment (like any other
configuration parameter) by setting the `PHOBOS_LRS_default_family` variable.
Alternatively, you can override this value in the environment (like any other
configuration parameter) by setting the `PHOBOS_STORE_default_family` variable.
Example:
```
# put data to directory storage
PHOBOS_LRS_default_family=dir phobos put file.in obj123
PHOBOS_STORE_default_family=dir phobos put file.in obj123
```
### Configuring device and media types
......
......@@ -4,15 +4,15 @@ connect_string = dbname=phobos host=localhost user=phobos password=phobos
[lrs]
# prefix to mount phobos filesystems
mount_prefix = /mnt/phobos-
mount_prefix = /mnt/phobos-
# media selection policy: first_fit or best_fit
policy = best_fit
# default media family to put data (for now, 'dir' or 'tape')
default_family = tape
policy = best_fit
# handled resource families (comma-separated list without any space)
families = tape,dir
# path of SCSI library control device
lib_device = /dev/changer
lib_device = /dev/changer
# path of the LRS-server socket
server_socket = /run/phobosd/lrs
server_socket = /run/phobosd/lrs
[scsi]
# retry count for SCSI requests
......@@ -38,7 +38,9 @@ cmd_format = /usr/sbin/pho_ldm_helper format_ltfs "%s" "%s"
[store]
# default layout for put operations
layout = simple
default_layout = simple
# default resource family for put operations
default_family = tape
######### Tape/drive support and compatibility rules ########
# You should not modify the following configuration unless:
......
......@@ -75,6 +75,7 @@ class XferDescriptor(Structure):
("xd_fd", c_int),
("xd_size", c_ssize_t),
("xd_layout_name", c_char_p),
("xd_family", c_int),
("xd_attrs", PhoAttrs),
("xd_flags", c_int),
("xd_tags", Tags),
......@@ -131,6 +132,7 @@ class XferDescriptor(Structure):
self.xd_objid = desc[0]
self.xd_op = desc[5]
self.xd_layout_name = 0
self.xd_family = -1
self.xd_flags = desc[3]
self.xd_tags = Tags(desc[4])
self.xd_rc = 0
......
......@@ -77,6 +77,7 @@ struct pho_xfer_desc {
const char *xd_layout_name; /**< Name of the layout module to use
* (for put).
*/
enum rsc_family xd_family; /**< Targeted resource family (for PUT) */
struct pho_attrs xd_attrs; /**< User defined attribute to get / put */
enum pho_xfer_flags xd_flags; /**< See enum pho_xfer_flags doc */
struct tags xd_tags; /**< Tags to select a media to write */
......
......@@ -50,8 +50,8 @@
* - Communication info: stores info related to the communication with Store
*/
struct lrs {
struct lrs_sched sched; /*!< Scheduler part. */
struct pho_comm_info comm; /*!< Communication part. */
struct lrs_sched *sched[PHO_RSC_LAST]; /*!< Scheduler handles */
struct pho_comm_info comm; /*!< Communication handle */
};
struct lrs_params {
......@@ -95,46 +95,58 @@ bool running = true;
/* LRS helpers ****************************************************************/
/* ****************************************************************************/
static int _prepare_requests(struct lrs *lrs, const int n_data,
struct pho_comm_data *data)
static enum rsc_family _determine_family(const pho_req_t *req)
{
int rc = 0;
int i;
if (pho_request_is_write(req))
return req->walloc->family;
for (i = 0; i < n_data; ++i) {
struct req_container *req_cont;
int rc2 = 0;
if (pho_request_is_read(req)) {
if (!req->ralloc->n_med_ids)
return PHO_RSC_INVAL;
return req->ralloc->med_ids[0]->family;
}
if (data[i].buf.size == -1) /* close notification, ignore */
continue;
if (pho_request_is_release(req)) {
if (!req->release->n_media)
return PHO_RSC_INVAL;
return req->release->media[0]->med_id->family;
}
req_cont = malloc(sizeof(*req_cont));
if (!req_cont) {
pho_error(rc = -ENOMEM, "Cannot allocate request structure");
break;
}
if (pho_request_is_format(req))
return req->format->med_id->family;
/* request processing */
req_cont->token = data[i].fd;
req_cont->req = pho_srl_request_unpack(&data[i].buf);
if (!req_cont->req) {
pho_error(-EINVAL, "Request can not be unpacked");
free(req_cont);
rc = rc ? : -EINVAL;
continue;
}
if (pho_request_is_notify(req))
return req->notify->rsrc_id->family;
rc2 = sched_request_enqueue(&lrs->sched, req_cont);
if (rc2) {
pho_error(rc2, "Request can not be enqueue");
pho_srl_request_free(req_cont->req, true);
free(req_cont);
rc = rc ? : rc2;
continue;
}
}
return PHO_RSC_INVAL;
}
return rc;
static int _prepare_error(struct resp_container *resp_cont, int req_rc,
const struct req_container *req_cont)
{
int rc;
resp_cont->token = req_cont->token;
rc = pho_srl_response_error_alloc(resp_cont->resp);
if (rc)
LOG_RETURN(rc, "Failed to allocate response");
resp_cont->resp->error->rc = req_rc;
if (!req_cont->req) /* If the error happened during request unpacking */
return 0;
resp_cont->resp->req_id = req_cont->req->id;
if (pho_request_is_write(req_cont->req))
resp_cont->resp->error->req_kind = PHO_REQUEST_KIND__RQ_WRITE;
else if (pho_request_is_read(req_cont->req))
resp_cont->resp->error->req_kind = PHO_REQUEST_KIND__RQ_READ;
else if (pho_request_is_release(req_cont->req))
resp_cont->resp->error->req_kind = PHO_REQUEST_KIND__RQ_RELEASE;
else if (pho_request_is_format(req_cont->req))
resp_cont->resp->error->req_kind = PHO_REQUEST_KIND__RQ_FORMAT;
else if (pho_request_is_notify(req_cont->req))
resp_cont->resp->error->req_kind = PHO_REQUEST_KIND__RQ_NOTIFY;
return 0;
}
static int _send_responses(struct lrs *lrs, const int n_resp,
......@@ -153,7 +165,7 @@ static int _send_responses(struct lrs *lrs, const int n_resp,
pho_srl_response_free(resp_cont[i].resp, false);
free(resp_cont[i].resp);
if (rc2) {
pho_error(rc2, "Response can not be packed");
pho_error(rc2, "Response cannot be packed");
rc = rc ? : rc2;
continue;
}
......@@ -164,7 +176,7 @@ static int _send_responses(struct lrs *lrs, const int n_resp,
pho_debug("Client closed socket");
continue;
} else if (rc2) {
pho_error(rc2, "Response can not be sent");
pho_error(rc2, "Response cannot be sent");
rc = rc ? : rc2;
continue;
}
......@@ -173,10 +185,181 @@ static int _send_responses(struct lrs *lrs, const int n_resp,
return rc;
}
static int _send_error(struct lrs *lrs, struct req_container *req_cont)
{
struct resp_container resp_cont;
int rc;
resp_cont.resp = malloc(sizeof(*resp_cont.resp));
if (!resp_cont.resp)
LOG_RETURN(-ENOMEM, "Cannot allocate error response");
rc = _prepare_error(&resp_cont, -EINVAL, req_cont);
free(req_cont);
if (rc) {
free(resp_cont.resp);
LOG_RETURN(rc, "Cannot prepare error response");
}
rc = _send_responses(lrs, 1, &resp_cont);
if (rc)
LOG_RETURN(rc, "Error during response sending");
return rc;
}
static int _prepare_requests(struct lrs *lrs, const int n_data,
struct pho_comm_data *data)
{
enum rsc_family fam;
int rc = 0;
int i;
for (i = 0; i < n_data; ++i) {
struct req_container *req_cont;
int rc2 = 0;
if (data[i].buf.size == -1) /* close notification, ignore */
continue;
req_cont = malloc(sizeof(*req_cont));
if (!req_cont)
LOG_RETURN(-ENOMEM, "Cannot allocate request structure");
/* request processing */
req_cont->token = data[i].fd;
req_cont->req = pho_srl_request_unpack(&data[i].buf);
if (!req_cont->req) {
pho_error(-EINVAL, "Request cannot be unpacked");
rc = _send_error(lrs, req_cont);
if (rc)
LOG_RETURN(rc, "Cannot send error response");
continue;
}
fam = _determine_family(req_cont->req);
if (fam == PHO_RSC_INVAL) {
pho_error(-EINVAL, "Request type is not recognized");
rc = _send_error(lrs, req_cont);
if (rc)
LOG_RETURN(rc, "Cannot send error response");
continue;
}
if (!lrs->sched[fam]) {
pho_error(-EINVAL, "Requested family is not handled by the daemon");
rc = _send_error(lrs, req_cont);
if (rc)
LOG_RETURN(rc, "Cannot send error response");
continue;
}
rc2 = sched_request_enqueue(lrs->sched[fam], req_cont);
if (rc2) {
pho_srl_request_free(req_cont->req, true);
free(req_cont);
LOG_RETURN(rc2, "Request cannot be enqueue");
}
}
return rc;
}
static int _load_schedulers(struct lrs *lrs)
{
const char *list;
char *parse_list;
char *saveptr;
char *item;
int rc;
int i;
list = PHO_CFG_GET(cfg_lrs, PHO_CFG_LRS, families);
for (i = 0; i < PHO_RSC_LAST; ++i)
lrs->sched[i] = NULL;
parse_list = strdup(list);
if (!parse_list)
LOG_RETURN(-errno, "Error on family list duplication");
/* Initialize a scheduler for each requested family */
for (item = strtok_r(parse_list, ",", &saveptr);
item != NULL;
item = strtok_r(NULL, ",", &saveptr)) {
int family = str2rsc_family(item);
switch (family) {
case PHO_RSC_DISK:
LOG_GOTO(out_free, rc = -ENOTSUP,
"The family '%s' is not supported yet", item);
case PHO_RSC_TAPE:
case PHO_RSC_DIR:
if (lrs->sched[family]) {
pho_warn("The family '%s' was already processed, ignore it",
item);
continue;
}
lrs->sched[family] = malloc(sizeof(*lrs->sched[family]));
if (!lrs->sched[family])
LOG_GOTO(out_free, rc = -ENOMEM,
"Error on lrs scheduler allocation");
rc = sched_init(lrs->sched[family], family);
if (rc) {
free(lrs->sched[family]);
lrs->sched[family] = NULL;
LOG_GOTO(out_free, rc, "Error on lrs scheduler initialization");
}
break;
default:
LOG_GOTO(out_free, rc = -EINVAL,
"The family '%s' is not recognized", item);
}
}
out_free:
/* in case of error, allocated schedulers will be terminated in the error
* handling of lrs_init()
*/
free(parse_list);
return rc;
}
/* ****************************************************************************/
/* LRS main functions *********************************************************/
/* ****************************************************************************/
/**
* Free all resources associated with this LRS except for the dss, which must be
* deinitialized by the caller if necessary.
*
* The LRS data structure is allocated in lrs_init()
* and deallocated in lrs_fini().
*
* \param[in/out] lrs The LRS to be deinitialized.
*/
static void lrs_fini(struct lrs *lrs)
{
int rc = 0;
int i;
if (lrs == NULL)
return;
for (i = 0; i < PHO_RSC_LAST; ++i) {
sched_fini(lrs->sched[i]);
free(lrs->sched[i]);
}
rc = pho_comm_close(&lrs->comm);
if (rc)
pho_error(rc, "Error on closing the socket");
}
/**
* Initialize a new LRS.
*
......@@ -190,8 +373,8 @@ static int _send_responses(struct lrs *lrs, const int n_resp,
*/
static int lrs_init(struct lrs *lrs, struct lrs_params parm)
{
int rc;
const char *sock_path;
int rc;
/* Load configuration */
rc = pho_cfg_init_local(parm.cfg_path);
......@@ -202,17 +385,20 @@ static int lrs_init(struct lrs *lrs, struct lrs_params parm)
if (parm.use_syslog)
pho_log_callback_set(phobos_log_callback_def_with_sys);
sock_path = PHO_CFG_GET(cfg_lrs, PHO_CFG_LRS, server_socket);
rc = sched_init(&lrs->sched);
rc = _load_schedulers(lrs);
if (rc)
LOG_RETURN(rc, "Error on lrs scheduler initialization");
LOG_GOTO(err, rc, "Error while loading the schedulers");
sock_path = PHO_CFG_GET(cfg_lrs, PHO_CFG_LRS, server_socket);
rc = pho_comm_open(&lrs->comm, sock_path, true);
if (rc)
LOG_RETURN(rc, "Error on opening the socket");
LOG_GOTO(err, rc, "Error while opening the socket");
return 0;
return rc;
err:
lrs_fini(lrs);
return rc;
}
/**
......@@ -245,6 +431,7 @@ static int lrs_process(struct lrs *lrs)
struct resp_container *resp_cont;
int n_data, n_resp = 0;
int rc = 0;
int i;
/* request reception and accept handling */
rc = pho_comm_recv(&lrs->comm, &data, &n_data);
......@@ -257,39 +444,21 @@ static int lrs_process(struct lrs *lrs)
LOG_RETURN(rc, "Error during request enqueuing");
/* response processing */
rc = sched_responses_get(&lrs->sched, &n_resp, &resp_cont);
if (rc)
LOG_RETURN(rc, "Error during sched processing");
rc = _send_responses(lrs, n_resp, resp_cont);
free(resp_cont);
if (rc)
LOG_RETURN(rc, "Error during responses sending");
return rc;
}
/**
* Free all resources associated with this LRS except for the dss, which must be
* deinitialized by the caller if necessary.
*
* The LRS data structure is allocated in lrs_init()
* and deallocated in lrs_fini().
*
* \param[in/out] lrs The LRS to be deinitialized.
*/
static void lrs_fini(struct lrs *lrs)
{
int rc = 0;
for (i = 0; i < PHO_RSC_LAST; ++i) {
if (!lrs->sched[i])
continue;
if (lrs == NULL)
return;
rc = sched_responses_get(lrs->sched[i], &n_resp, &resp_cont);
if (rc)
LOG_RETURN(rc, "Error during sched processing");
sched_fini(&lrs->sched);
rc = _send_responses(lrs, n_resp, resp_cont);
free(resp_cont);
if (rc)
LOG_RETURN(rc, "Error during responses sending");
}
rc = pho_comm_close(&lrs->comm);
if (rc)
pho_error(rc, "Error on closing the socket");
return rc;
}
/* ****************************************************************************/
......@@ -412,7 +581,7 @@ int main(int argc, char **argv)
struct lrs_params parm;
struct sigaction sa;
int init_pipe[2];
struct lrs lrs;
struct lrs lrs = {};
pid_t pid;
int rc;
......
......@@ -39,10 +39,10 @@ const struct pho_config_item cfg_lrs[] = {
.name = "policy",
.value = "best_fit"
},
[PHO_CFG_LRS_default_family] = {
[PHO_CFG_LRS_families] = {
.section = "lrs",
.name = "default_family",
.value = "tape"
.name = "families",
.value = "tape,dir"
},
[PHO_CFG_LRS_lib_device] = {
.section = "lrs",
......
......@@ -34,7 +34,7 @@ enum pho_cfg_params_lrs {
/* lrs parameters */
PHO_CFG_LRS_mount_prefix = PHO_CFG_LRS_FIRST,
PHO_CFG_LRS_policy,
PHO_CFG_LRS_default_family,
PHO_CFG_LRS_families,
PHO_CFG_LRS_lib_device,
PHO_CFG_LRS_server_socket,
......
......@@ -87,18 +87,6 @@ static char *mount_point(const char *id)
return mnt_out;
}
/** return the default device family to write data */
static enum rsc_family default_family(void)
{
const char *fam_str;
fam_str = PHO_CFG_GET(cfg_lrs, PHO_CFG_LRS, default_family);
if (fam_str == NULL)
return PHO_RSC_INVAL;
return str2rsc_family(fam_str);
}
static struct utsname host_info;
/** get host name once (/!\ not thread-safe). */
......@@ -533,15 +521,13 @@ static int sched_load_dev_state(struct lrs_sched *sched)
{
struct dev_info *devs = NULL;
int dcnt = 0;
enum rsc_family family;
struct lib_adapter lib;
int i;
int rc;
ENTRY;
family = default_family();
if (family == PHO_RSC_INVAL)
if (sched->family == PHO_RSC_INVAL)
return -EINVAL;
/* If no device has previously been loaded, load the list of available
......@@ -559,7 +545,7 @@ static int sched_load_dev_state(struct lrs_sched *sched)
"]}",
get_hostname(),
rsc_adm_status2str(PHO_RSC_ADM_ST_UNLOCKED),
rsc_family2str(family));
rsc_family2str(sched->family));
if (rc)
return rc;
......@@ -571,7 +557,7 @@ static int sched_load_dev_state(struct lrs_sched *sched)
if (dcnt == 0) {
pho_info("No usable device found (%s): check devices status",
rsc_family2str(family));
rsc_family2str(sched->family));
GOTO(err, rc = -ENXIO);
}
......@@ -586,7 +572,7 @@ static int sched_load_dev_state(struct lrs_sched *sched)
}
/* get a handle to the library to query it */
rc = wrap_lib_open(family, &lib);
rc = wrap_lib_open(sched->family, &lib);
if (rc)
GOTO(err, rc);
......@@ -626,10 +612,12 @@ static void dev_descr_fini(gpointer ptr)
static __thread uint64_t sched_lock_number;
int sched_init(struct lrs_sched *sched)
int sched_init(struct lrs_sched *sched, enum rsc_family family)
{
int rc;
sched->family = family;
/* For the lock owner name to generate a collision, either the tid or the
* sched_lock_number has to loop in less than 1 second.
*
......@@ -1748,7 +1736,7 @@ static int sched_get_write_res(struct lrs_sched *sched, size_t size,
* Note: sched_select_media locks the media.
*/
pho_verb("Not enough space on loaded media: selecting another one");
rc = sched_select_media(sched, &pmedia, size, default_family(), tags,
rc = sched_select_media(sched, &pmedia, size, sched->family, tags,
devs, new_dev_index);
if (rc)
return rc;
......
......@@ -39,6 +39,7 @@ struct dev_descr;
*/
struct lrs_sched {
struct dss_handle dss; /**< Associated DSS */
enum rsc_family family; /**< Managed resource family */
GArray *devices; /**< List of available devices */
char *lock_owner; /**< Lock owner name for this LRS
* (contains hostname and tid)
......@@ -71,10 +72,11 @@ struct resp_container {
* Initialize a new sched bound to a given DSS.
*
* \param[in] sched The sched to be initialized.
* \param[in] family Resource family managed by the scheduler.
*
* \return 0 on success, -1 * posix error code on failure.
*/
int sched_init(struct lrs_sched *sched);
int sched_init(struct lrs_sched *sched, enum rsc_family family);
/**
* Free all resources associated with this sched except for the dss, which must
......
......@@ -33,12 +33,13 @@ message PhoRequest {
message Write {
/** Request for one write accessible medium. */
message Elt {