aboutsummaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
authorJeff Layton <jlayton@kernel.org>2022-07-01 06:30:12 -0400
committerIlya Dryomov <idryomov@gmail.com>2023-08-22 09:01:48 +0200
commitdee0c5f834605ce9b384ee8b9c7032ffd8db4eca (patch)
tree644479891b3fe248ded8e7fb04755c55107a0970 /net/ceph
parent4c793d4c58b7e57e1eb616e3d032df6b3789a4a3 (diff)
libceph: add new iov_iter-based ceph_msg_data_type and ceph_osd_data_type
Add an iov_iter to the unions in ceph_msg_data and ceph_msg_data_cursor. Instead of requiring a list of pages or bvecs, we can just use an iov_iter directly, and avoid extra allocations. We assume that the pages represented by the iter are pinned such that they shouldn't incur page faults, which is the case for the iov_iters created by netfs. While working on this, Al Viro informed me that he was going to change iov_iter_get_pages to auto-advance the iterator as that pattern is more or less required for ITER_PIPE anyway. We emulate that here for now by advancing in the _next op and tracking that amount in the "lastlen" field. In the event that _next is called twice without an intervening _advance, we revert the iov_iter by the remaining lastlen before calling iov_iter_get_pages. Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: David Howells <dhowells@redhat.com> Signed-off-by: Jeff Layton <jlayton@kernel.org> Reviewed-by: Xiubo Li <xiubli@redhat.com> Reviewed-and-tested-by: Luís Henriques <lhenriques@suse.de> Reviewed-by: Milind Changire <mchangir@redhat.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c77
-rw-r--r--net/ceph/osd_client.c27
2 files changed, 104 insertions, 0 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 2eb10d7518e8..10a41cd9c523 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -969,6 +969,62 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
return true;
}
+static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
+{
+ struct ceph_msg_data *data = cursor->data;
+
+ cursor->iov_iter = data->iter;
+ cursor->lastlen = 0;
+ iov_iter_truncate(&cursor->iov_iter, length);
+ cursor->resid = iov_iter_count(&cursor->iov_iter);
+}
+
+static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
+{
+ struct page *page;
+ ssize_t len;
+
+ if (cursor->lastlen)
+ iov_iter_revert(&cursor->iov_iter, cursor->lastlen);
+
+ len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE,
+ 1, page_offset);
+ BUG_ON(len < 0);
+
+ cursor->lastlen = len;
+
+ /*
+ * FIXME: The assumption is that the pages represented by the iov_iter
+ * are pinned, with the references held by the upper-level
+ * callers, or by virtue of being under writeback. Eventually,
+ * we'll get an iov_iter_get_pages2 variant that doesn't take
+ * page refs. Until then, just put the page ref.
+ */
+ VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page);
+ put_page(page);
+
+ *length = min_t(size_t, len, cursor->resid);
+ return page;
+}
+
+static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ BUG_ON(bytes > cursor->resid);
+ cursor->resid -= bytes;
+
+ if (bytes < cursor->lastlen) {
+ cursor->lastlen -= bytes;
+ } else {
+ iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen);
+ cursor->lastlen = 0;
+ }
+
+ return cursor->resid;
+}
+
/*
* Message data is handled (sent or received) in pieces, where each
* piece resides on a single page. The network layer might not
@@ -996,6 +1052,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
case CEPH_MSG_DATA_BVECS:
ceph_msg_data_bvecs_cursor_init(cursor, length);
break;
+ case CEPH_MSG_DATA_ITER:
+ ceph_msg_data_iter_cursor_init(cursor, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
/* BUG(); */
@@ -1043,6 +1102,9 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
case CEPH_MSG_DATA_BVECS:
page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
break;
+ case CEPH_MSG_DATA_ITER:
+ page = ceph_msg_data_iter_next(cursor, page_offset, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
page = NULL;
@@ -1081,6 +1143,9 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
case CEPH_MSG_DATA_BVECS:
new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
break;
+ case CEPH_MSG_DATA_ITER:
+ new_piece = ceph_msg_data_iter_advance(cursor, bytes);
+ break;
case CEPH_MSG_DATA_NONE:
default:
BUG();
@@ -1880,6 +1945,18 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
}
EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+void ceph_msg_data_add_iter(struct ceph_msg *msg,
+ struct iov_iter *iter)
+{
+ struct ceph_msg_data *data;
+
+ data = ceph_msg_data_add(msg);
+ data->type = CEPH_MSG_DATA_ITER;
+ data->iter = *iter;
+
+ msg->data_length += iov_iter_count(&data->iter);
+}
+
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 0aacbadcab06..684faf8553de 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -171,6 +171,13 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
osd_data->num_bvecs = num_bvecs;
}
+static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
+ struct iov_iter *iter)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
+ osd_data->iter = *iter;
+}
+
static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
@@ -264,6 +271,22 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
+/**
+ * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
+ * @osd_req: The request to set up
+ * @which: Index of the operation in which to set the iter
+ * @iter: The buffer iterator
+ */
+void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
+ unsigned int which, struct iov_iter *iter)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_iter_init(osd_data, iter);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
+
static void osd_req_op_cls_request_info_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -346,6 +369,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
#endif /* CONFIG_BLOCK */
case CEPH_OSD_DATA_TYPE_BVECS:
return osd_data->bvec_pos.iter.bi_size;
+ case CEPH_OSD_DATA_TYPE_ITER:
+ return iov_iter_count(&osd_data->iter);
default:
WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
return 0;
@@ -954,6 +979,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
#endif
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
+ ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
}