Commit b7ef5f3a authored by Filipe Manana's avatar Filipe Manana Committed by David Sterba

btrfs: loop only once over data sizes array when inserting an item batch

When inserting a batch of items into a btree, we end up looping over the
data sizes array 3 times:

1) Once in the caller of btrfs_insert_empty_items(), when it populates the
   array with the data sizes for each item;

2) Once at btrfs_insert_empty_items() to sum the elements of the data
   sizes array and compute the total data size;

3) And then once again at setup_items_for_insert(), where we do exactly
   the same as what we do at btrfs_insert_empty_items(), to compute the
   total data size.

That is not bad for small arrays, but when the arrays have hundreds of
elements, the time spent on looping is not negligible. For example when
doing batch inserts of delayed items for dir index items or when logging
a directory, it's common to have 200 to 260 dir index items in a single
batch when using a leaf size of 16K and using file names between 8 and 12
characters. For a 64K leaf size, multiply that by 4. Taking into account
that during directory logging or when flushing delayed dir index items we
can have many of those large batches, the time spent on the looping adds
up quickly.

It's also more important to avoid it at setup_items_for_insert(), since
we are holding a write lock on a leaf and, in some cases, on upper nodes
of the btree, which causes us to block other tasks that want to access
the leaf and nodes for longer than necessary.

So change the code so that setup_items_for_insert() and
btrfs_insert_empty_items() no longer compute the total data size, and
instead rely on the caller to supply it. This makes us loop over the
array only once, where we can both populate the data size array and
compute the total data size, taking advantage of spatial and temporal
locality. To make this more manageable, use a structure to contain
all the relevant details for a batch of items (keys array, data sizes
array, total data size, number of items), and use it as an argument
for btrfs_insert_empty_items() and setup_items_for_insert().

This patch is part of a small patchset that is comprised of the following
patches:

  btrfs: loop only once over data sizes array when inserting an item batch
  btrfs: unexport setup_items_for_insert()
  btrfs: use single bulk copy operations when logging directories

This is patch 1/3 and performance results, and the specific tests, are
included in the changelog of patch 3/3.
Signed-off-by: default avatarFilipe Manana <fdmanana@suse.com>
Signed-off-by: default avatarDavid Sterba <dsterba@suse.com>
parent 6a258d72
......@@ -3605,7 +3605,7 @@ int btrfs_duplicate_item(struct btrfs_trans_handle *trans,
return ret;
path->slots[0]++;
setup_items_for_insert(root, path, new_key, &item_size, 1);
setup_item_for_insert(root, path, new_key, item_size);
leaf = path->nodes[0];
memcpy_extent_buffer(leaf,
btrfs_item_ptr_offset(leaf, path->slots[0]),
......@@ -3785,13 +3785,10 @@ void btrfs_extend_item(struct btrfs_path *path, u32 data_size)
*
* @root: root we are inserting items to
* @path: points to the leaf/slot where we are going to insert new items
* @cpu_key: array of keys for items to be inserted
* @data_size: size of the body of each item we are going to insert
* @nr: size of @cpu_key/@data_size arrays
* @batch: information about the batch of items to insert
*/
void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
const struct btrfs_key *cpu_key, u32 *data_size,
int nr)
const struct btrfs_item_batch *batch)
{
struct btrfs_fs_info *fs_info = root->fs_info;
struct btrfs_item *item;
......@@ -3803,14 +3800,14 @@ void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
int slot;
struct btrfs_map_token token;
u32 total_size;
u32 total_data = 0;
for (i = 0; i < nr; i++)
total_data += data_size[i];
total_size = total_data + (nr * sizeof(struct btrfs_item));
/*
* Before anything else, update keys in the parent and other ancestors
* if needed, then release the write locks on them, so that other tasks
* can use them while we modify the leaf.
*/
if (path->slots[0] == 0) {
btrfs_cpu_key_to_disk(&disk_key, cpu_key);
btrfs_cpu_key_to_disk(&disk_key, &batch->keys[0]);
fixup_low_keys(path, &disk_key, 1);
}
btrfs_unlock_up_safe(path, 1);
......@@ -3820,6 +3817,7 @@ void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
nritems = btrfs_header_nritems(leaf);
data_end = leaf_data_end(leaf);
total_size = batch->total_data_size + (batch->nr * sizeof(struct btrfs_item));
if (btrfs_leaf_free_space(leaf) < total_size) {
btrfs_print_leaf(leaf);
......@@ -3849,31 +3847,32 @@ void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
item = btrfs_item_nr(i);
ioff = btrfs_token_item_offset(&token, item);
btrfs_set_token_item_offset(&token, item,
ioff - total_data);
ioff - batch->total_data_size);
}
/* shift the items */
memmove_extent_buffer(leaf, btrfs_item_nr_offset(slot + nr),
memmove_extent_buffer(leaf, btrfs_item_nr_offset(slot + batch->nr),
btrfs_item_nr_offset(slot),
(nritems - slot) * sizeof(struct btrfs_item));
/* shift the data */
memmove_extent_buffer(leaf, BTRFS_LEAF_DATA_OFFSET +
data_end - total_data, BTRFS_LEAF_DATA_OFFSET +
data_end, old_data - data_end);
data_end - batch->total_data_size,
BTRFS_LEAF_DATA_OFFSET + data_end,
old_data - data_end);
data_end = old_data;
}
/* setup the item for the new data */
for (i = 0; i < nr; i++) {
btrfs_cpu_key_to_disk(&disk_key, cpu_key + i);
for (i = 0; i < batch->nr; i++) {
btrfs_cpu_key_to_disk(&disk_key, &batch->keys[i]);
btrfs_set_item_key(leaf, &disk_key, slot + i);
item = btrfs_item_nr(slot + i);
data_end -= data_size[i];
data_end -= batch->data_sizes[i];
btrfs_set_token_item_offset(&token, item, data_end);
btrfs_set_token_item_size(&token, item, data_size[i]);
btrfs_set_token_item_size(&token, item, batch->data_sizes[i]);
}
btrfs_set_header_nritems(leaf, nritems + nr);
btrfs_set_header_nritems(leaf, nritems + batch->nr);
btrfs_mark_buffer_dirty(leaf);
if (btrfs_leaf_free_space(leaf) < 0) {
......@@ -3889,20 +3888,14 @@ void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
int btrfs_insert_empty_items(struct btrfs_trans_handle *trans,
struct btrfs_root *root,
struct btrfs_path *path,
const struct btrfs_key *cpu_key, u32 *data_size,
int nr)
const struct btrfs_item_batch *batch)
{
int ret = 0;
int slot;
int i;
u32 total_size = 0;
u32 total_data = 0;
for (i = 0; i < nr; i++)
total_data += data_size[i];
u32 total_size;
total_size = total_data + (nr * sizeof(struct btrfs_item));
ret = btrfs_search_slot(trans, root, cpu_key, path, total_size, 1);
total_size = batch->total_data_size + (batch->nr * sizeof(struct btrfs_item));
ret = btrfs_search_slot(trans, root, &batch->keys[0], path, total_size, 1);
if (ret == 0)
return -EEXIST;
if (ret < 0)
......@@ -3911,7 +3904,7 @@ int btrfs_insert_empty_items(struct btrfs_trans_handle *trans,
slot = path->slots[0];
BUG_ON(slot < 0);
setup_items_for_insert(root, path, cpu_key, data_size, nr);
setup_items_for_insert(root, path, batch);
return 0;
}
......
......@@ -2897,16 +2897,56 @@ static inline int btrfs_del_item(struct btrfs_trans_handle *trans,
return btrfs_del_items(trans, root, path, path->slots[0], 1);
}
/*
* Describes a batch of items to insert in a btree. This is used by
* btrfs_insert_empty_items() and setup_items_for_insert().
*/
struct btrfs_item_batch {
/*
* Pointer to an array containing the keys of the items to insert (in
* sorted order).
*/
const struct btrfs_key *keys;
/* Pointer to an array containing the data size for each item to insert. */
const u32 *data_sizes;
/*
* The sum of data sizes for all items. The caller can compute this while
* setting up the data_sizes array, so it ends up being more efficient
* than having btrfs_insert_empty_items() or setup_item_for_insert()
* doing it, as it would avoid an extra loop over a potentially large
* array, and in the case of setup_item_for_insert(), we would be doing
* it while holding a write lock on a leaf and often on upper level nodes
* too, unnecessarily increasing the size of a critical section.
*/
u32 total_data_size;
/* Size of the keys and data_sizes arrays (number of items in the batch). */
int nr;
};
void setup_items_for_insert(struct btrfs_root *root, struct btrfs_path *path,
const struct btrfs_key *cpu_key, u32 *data_size,
int nr);
const struct btrfs_item_batch *batch);
static inline void setup_item_for_insert(struct btrfs_root *root,
struct btrfs_path *path,
const struct btrfs_key *key,
u32 data_size)
{
struct btrfs_item_batch batch;
batch.keys = key;
batch.data_sizes = &data_size;
batch.total_data_size = data_size;
batch.nr = 1;
setup_items_for_insert(root, path, &batch);
}
int btrfs_insert_item(struct btrfs_trans_handle *trans, struct btrfs_root *root,
const struct btrfs_key *key, void *data, u32 data_size);
int btrfs_insert_empty_items(struct btrfs_trans_handle *trans,
struct btrfs_root *root,
struct btrfs_path *path,
const struct btrfs_key *cpu_key, u32 *data_size,
int nr);
const struct btrfs_item_batch *batch);
static inline int btrfs_insert_empty_item(struct btrfs_trans_handle *trans,
struct btrfs_root *root,
......@@ -2914,7 +2954,14 @@ static inline int btrfs_insert_empty_item(struct btrfs_trans_handle *trans,
const struct btrfs_key *key,
u32 data_size)
{
return btrfs_insert_empty_items(trans, root, path, key, &data_size, 1);
struct btrfs_item_batch batch;
batch.keys = key;
batch.data_sizes = &data_size;
batch.total_data_size = data_size;
batch.nr = 1;
return btrfs_insert_empty_items(trans, root, path, &batch);
}
int btrfs_prev_leaf(struct btrfs_root *root, struct btrfs_path *path);
......
......@@ -679,19 +679,18 @@ static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
struct btrfs_path *path,
struct btrfs_delayed_item *first_item)
{
LIST_HEAD(batch);
LIST_HEAD(item_list);
struct btrfs_delayed_item *curr;
struct btrfs_delayed_item *next;
const int max_size = BTRFS_LEAF_DATA_SIZE(root->fs_info);
struct btrfs_item_batch batch;
int total_size;
int nitems;
char *ins_data = NULL;
struct btrfs_key *ins_keys;
u32 *ins_sizes;
int ret;
list_add_tail(&first_item->tree_list, &batch);
nitems = 1;
list_add_tail(&first_item->tree_list, &item_list);
batch.total_data_size = first_item->data_len;
batch.nr = 1;
total_size = first_item->data_len + sizeof(struct btrfs_item);
curr = first_item;
......@@ -706,39 +705,43 @@ static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
if (total_size + next_size > max_size)
break;
list_add_tail(&next->tree_list, &batch);
nitems++;
list_add_tail(&next->tree_list, &item_list);
batch.nr++;
total_size += next_size;
batch.total_data_size += next->data_len;
curr = next;
}
if (nitems == 1) {
ins_keys = &first_item->key;
ins_sizes = &first_item->data_len;
if (batch.nr == 1) {
batch.keys = &first_item->key;
batch.data_sizes = &first_item->data_len;
} else {
struct btrfs_key *ins_keys;
u32 *ins_sizes;
int i = 0;
ins_data = kmalloc(nitems * sizeof(u32) +
nitems * sizeof(struct btrfs_key), GFP_NOFS);
ins_data = kmalloc(batch.nr * sizeof(u32) +
batch.nr * sizeof(struct btrfs_key), GFP_NOFS);
if (!ins_data) {
ret = -ENOMEM;
goto out;
}
ins_sizes = (u32 *)ins_data;
ins_keys = (struct btrfs_key *)(ins_data + nitems * sizeof(u32));
list_for_each_entry(curr, &batch, tree_list) {
ins_keys = (struct btrfs_key *)(ins_data + batch.nr * sizeof(u32));
batch.keys = ins_keys;
batch.data_sizes = ins_sizes;
list_for_each_entry(curr, &item_list, tree_list) {
ins_keys[i] = curr->key;
ins_sizes[i] = curr->data_len;
i++;
}
}
ret = btrfs_insert_empty_items(trans, root, path, ins_keys, ins_sizes,
nitems);
ret = btrfs_insert_empty_items(trans, root, path, &batch);
if (ret)
goto out;
list_for_each_entry(curr, &batch, tree_list) {
list_for_each_entry(curr, &item_list, tree_list) {
char *data_ptr;
data_ptr = btrfs_item_ptr(path->nodes[0], path->slots[0], char);
......@@ -754,7 +757,7 @@ static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
*/
btrfs_release_path(path);
list_for_each_entry_safe(curr, next, &batch, tree_list) {
list_for_each_entry_safe(curr, next, &item_list, tree_list) {
list_del(&curr->tree_list);
btrfs_delayed_item_release_metadata(root, curr);
btrfs_release_delayed_item(curr);
......
......@@ -1020,8 +1020,7 @@ int btrfs_drop_extents(struct btrfs_trans_handle *trans,
if (btrfs_comp_cpu_keys(&key, &slot_key) > 0)
path->slots[0]++;
}
setup_items_for_insert(root, path, &key,
&args->extent_item_size, 1);
setup_item_for_insert(root, path, &key, args->extent_item_size);
args->extent_inserted = true;
}
......
......@@ -6445,7 +6445,7 @@ static struct inode *btrfs_new_inode(struct btrfs_trans_handle *trans,
struct btrfs_inode_ref *ref;
struct btrfs_key key[2];
u32 sizes[2];
int nitems = name ? 2 : 1;
struct btrfs_item_batch batch;
unsigned long ptr;
unsigned int nofs_flag;
int ret;
......@@ -6537,7 +6537,11 @@ static struct inode *btrfs_new_inode(struct btrfs_trans_handle *trans,
goto fail;
}
ret = btrfs_insert_empty_items(trans, root, path, key, sizes, nitems);
batch.keys = &key[0];
batch.data_sizes = &sizes[0];
batch.total_data_size = sizes[0] + (name ? sizes[1] : 0);
batch.nr = name ? 2 : 1;
ret = btrfs_insert_empty_items(trans, root, path, &batch);
if (ret != 0)
goto fail_unlock;
......
......@@ -60,7 +60,7 @@ static int test_btrfs_split_item(u32 sectorsize, u32 nodesize)
key.type = BTRFS_EXTENT_CSUM_KEY;
key.offset = 0;
setup_items_for_insert(root, path, &key, &value_len, 1);
setup_item_for_insert(root, path, &key, value_len);
item = btrfs_item_nr(0);
write_extent_buffer(eb, value, btrfs_item_ptr_offset(eb, 0),
value_len);
......
......@@ -33,7 +33,7 @@ static void insert_extent(struct btrfs_root *root, u64 start, u64 len,
key.type = BTRFS_EXTENT_DATA_KEY;
key.offset = start;
setup_items_for_insert(root, &path, &key, &value_len, 1);
setup_item_for_insert(root, &path, &key, value_len);
fi = btrfs_item_ptr(leaf, slot, struct btrfs_file_extent_item);
btrfs_set_file_extent_generation(leaf, fi, 1);
btrfs_set_file_extent_type(leaf, fi, type);
......@@ -63,7 +63,7 @@ static void insert_inode_item_key(struct btrfs_root *root)
key.type = BTRFS_INODE_ITEM_KEY;
key.offset = 0;
setup_items_for_insert(root, &path, &key, &value_len, 1);
setup_item_for_insert(root, &path, &key, value_len);
}
/*
......
......@@ -3668,8 +3668,7 @@ static int flush_dir_items_batch(struct btrfs_trans_handle *trans,
int count)
{
char *ins_data = NULL;
struct btrfs_key *ins_keys;
u32 *ins_sizes;
struct btrfs_item_batch batch;
struct extent_buffer *dst;
struct btrfs_key key;
u32 item_size;
......@@ -3677,13 +3676,18 @@ static int flush_dir_items_batch(struct btrfs_trans_handle *trans,
int i;
ASSERT(count > 0);
batch.nr = count;
if (count == 1) {
btrfs_item_key_to_cpu(src, &key, start_slot);
item_size = btrfs_item_size_nr(src, start_slot);
ins_keys = &key;
ins_sizes = &item_size;
batch.keys = &key;
batch.data_sizes = &item_size;
batch.total_data_size = item_size;
} else {
struct btrfs_key *ins_keys;
u32 *ins_sizes;
ins_data = kmalloc(count * sizeof(u32) +
count * sizeof(struct btrfs_key), GFP_NOFS);
if (!ins_data)
......@@ -3691,17 +3695,20 @@ static int flush_dir_items_batch(struct btrfs_trans_handle *trans,
ins_sizes = (u32 *)ins_data;
ins_keys = (struct btrfs_key *)(ins_data + count * sizeof(u32));
batch.keys = ins_keys;
batch.data_sizes = ins_sizes;
batch.total_data_size = 0;
for (i = 0; i < count; i++) {
const int slot = start_slot + i;
btrfs_item_key_to_cpu(src, &ins_keys[i], slot);
ins_sizes[i] = btrfs_item_size_nr(src, slot);
batch.total_data_size += ins_sizes[i];
}
}
ret = btrfs_insert_empty_items(trans, log, dst_path, ins_keys, ins_sizes,
count);
ret = btrfs_insert_empty_items(trans, log, dst_path, &batch);
if (ret)
goto out;
......@@ -3712,7 +3719,8 @@ static int flush_dir_items_batch(struct btrfs_trans_handle *trans,
dst_offset = btrfs_item_ptr_offset(dst, dst_path->slots[0]);
src_offset = btrfs_item_ptr_offset(src, start_slot + i);
copy_extent_buffer(dst, src, dst_offset, src_offset, ins_sizes[i]);
copy_extent_buffer(dst, src, dst_offset, src_offset,
batch.data_sizes[i]);
dst_path->slots[0]++;
}
btrfs_release_path(dst_path);
......@@ -4322,6 +4330,7 @@ static noinline int copy_items(struct btrfs_trans_handle *trans,
int ret;
struct btrfs_key *ins_keys;
u32 *ins_sizes;
struct btrfs_item_batch batch;
char *ins_data;
int i;
struct list_head ordered_sums;
......@@ -4336,13 +4345,17 @@ static noinline int copy_items(struct btrfs_trans_handle *trans,
ins_sizes = (u32 *)ins_data;
ins_keys = (struct btrfs_key *)(ins_data + nr * sizeof(u32));
batch.keys = ins_keys;
batch.data_sizes = ins_sizes;
batch.total_data_size = 0;
batch.nr = nr;
for (i = 0; i < nr; i++) {
ins_sizes[i] = btrfs_item_size_nr(src, i + start_slot);
batch.total_data_size += ins_sizes[i];
btrfs_item_key_to_cpu(src, ins_keys + i, i + start_slot);
}
ret = btrfs_insert_empty_items(trans, log, dst_path,
ins_keys, ins_sizes, nr);
ret = btrfs_insert_empty_items(trans, log, dst_path, &batch);
if (ret) {
kfree(ins_data);
return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment