Commit 69dd773b authored by Igor Babaev's avatar Igor Babaev

An implementation of index intersect via a modified Unique class.

This code is planned to be used for mwl#21.
parent 5719ca5c
......@@ -31,6 +31,7 @@ extern "C" {
#define tree_set_pointer(element,ptr) *((uchar **) (element+1))=((uchar*) (ptr))
#define TREE_NO_DUPS 1
#define TREE_ONLY_DUPS 2
typedef enum { left_root_right, right_root_left } TREE_WALK;
typedef uint32 element_count;
......
......@@ -221,6 +221,8 @@ TREE_ELEMENT *tree_insert(TREE *tree, void *key, uint key_size,
}
if (element == &tree->null_element)
{
if (tree->flag & TREE_ONLY_DUPS)
return((TREE_ELEMENT *) 1);
uint alloc_size=sizeof(TREE_ELEMENT)+key_size+tree->size_of_element;
tree->allocated+=alloc_size;
......
......@@ -50,10 +50,6 @@ static int write_keys(SORTPARAM *param,uchar * *sort_keys,
uint count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
static void make_sortkey(SORTPARAM *param,uchar *to, uchar *ref_pos);
static void register_used_fields(SORTPARAM *param);
static int merge_index(SORTPARAM *param,uchar *sort_buffer,
BUFFPEK *buffpek,
uint maxbuffer,IO_CACHE *tempfile,
IO_CACHE *outfile);
static bool save_index(SORTPARAM *param,uchar **sort_keys, uint count,
FILESORT_INFO *table_sort);
static uint suffix_length(ulong string_length);
......@@ -145,6 +141,7 @@ ha_rows filesort(THD *thd, TABLE *table, SORT_FIELD *sortorder, uint s_length,
/* filesort cannot handle zero-length records. */
DBUG_ASSERT(param.sort_length);
param.ref_length= table->file->ref_length;
param.min_dupl_count= 0;
param.addon_field= 0;
param.addon_length= 0;
if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) &&
......@@ -1216,7 +1213,13 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
rec_length= param->rec_length;
res_length= param->res_length;
sort_length= param->sort_length;
offset= rec_length-res_length;
element_count dupl_count;
uchar *src;
uint dupl_count_ofs= rec_length-sizeof(element_count);
uint min_dupl_count= param->min_dupl_count;
offset= rec_length-
(flag && min_dupl_count ? sizeof(dupl_count) : 0)-res_length;
uint wr_len= flag ? res_length : rec_length;
maxcount= (ulong) (param->keys/((uint) (Tb-Fb) +1));
to_start_filepos= my_b_tell(to_file);
strpos= sort_buffer;
......@@ -1262,16 +1265,20 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
*/
buffpek= (BUFFPEK*) queue_top(&queue);
memcpy(param->unique_buff, buffpek->key, rec_length);
if (my_b_write(to_file, (uchar*) buffpek->key, rec_length))
{
error=1; goto err; /* purecov: inspected */
}
if (min_dupl_count)
memcpy(&dupl_count, param->unique_buff+dupl_count_ofs,
sizeof(dupl_count));
buffpek->key+= rec_length;
buffpek->mem_count--;
if (!--max_rows)
if (! --buffpek->mem_count)
{
error= 0; /* purecov: inspected */
goto end; /* purecov: inspected */
if (!(error= (int) read_to_buffer(from_file,buffpek,
rec_length)))
{
VOID(queue_remove(&queue,0));
reuse_freed_buff(&queue, buffpek, rec_length);
}
else if (error == -1)
goto err; /* purecov: inspected */
}
queue_replaced(&queue); // Top element has been used
}
......@@ -1287,27 +1294,42 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
for (;;)
{
buffpek= (BUFFPEK*) queue_top(&queue);
src= buffpek->key;
if (cmp) // Remove duplicates
{
if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
(uchar**) &buffpek->key))
{
if (min_dupl_count)
{
element_count cnt;
memcpy(&cnt, (uchar *) buffpek->key+dupl_count_ofs, sizeof(cnt));
dupl_count+= cnt;
}
goto skip_duplicate;
memcpy(param->unique_buff, (uchar*) buffpek->key, rec_length);
}
if (flag == 0)
if (min_dupl_count)
{
if (my_b_write(to_file,(uchar*) buffpek->key, rec_length))
{
error=1; goto err; /* purecov: inspected */
memcpy(param->unique_buff+dupl_count_ofs, &dupl_count,
sizeof(dupl_count));
}
src= param->unique_buff;
}
else
if (!flag || !min_dupl_count || dupl_count >= min_dupl_count)
{
if (my_b_write(to_file, (uchar*) buffpek->key+offset, res_length))
if (my_b_write(to_file, src+(flag ? offset : 0), wr_len))
{
error=1; goto err; /* purecov: inspected */
}
}
if (cmp)
{
memcpy(param->unique_buff, (uchar*) buffpek->key, rec_length);
if (min_dupl_count)
memcpy(&dupl_count, param->unique_buff+dupl_count_ofs,
sizeof(dupl_count));
}
if (!--max_rows)
{
error= 0; /* purecov: inspected */
......@@ -1343,9 +1365,33 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
{
if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (uchar**) &buffpek->key))
{
buffpek->key+= rec_length; // Remove duplicate
if (min_dupl_count)
{
element_count cnt;
memcpy(&cnt, (uchar *) buffpek->key+dupl_count_ofs, sizeof(cnt));
dupl_count+= cnt;
}
buffpek->key+= rec_length;
--buffpek->mem_count;
}
if (min_dupl_count)
memcpy(param->unique_buff+dupl_count_ofs, &dupl_count,
sizeof(dupl_count));
if (!flag || !min_dupl_count || dupl_count >= min_dupl_count)
{
src= param->unique_buff;
if (my_b_write(to_file, src+(flag ? offset : 0), wr_len))
{
error=1; goto err; /* purecov: inspected */
}
if (!--max_rows)
{
error= 0;
goto end;
}
}
}
do
......@@ -1367,12 +1413,17 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
else
{
register uchar *end;
strpos= buffpek->key+offset;
for (end= strpos+buffpek->mem_count*rec_length ;
strpos != end ;
strpos+= rec_length)
{
if (my_b_write(to_file, strpos, res_length))
src= buffpek->key+offset;
for (end= src+buffpek->mem_count*rec_length ;
src != end ;
src+= rec_length)
{
if (flag && min_dupl_count &&
memcmp(&min_dupl_count, src+dupl_count_ofs,
sizeof(dupl_count_ofs))<0)
continue;
if (my_b_write(to_file, src, wr_len))
{
error=1; goto err;
}
......@@ -1393,7 +1444,7 @@ err:
/* Do a merge to output-file (save only positions) */
static int merge_index(SORTPARAM *param, uchar *sort_buffer,
int merge_index(SORTPARAM *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile)
{
......
This diff is collapsed.
......@@ -274,12 +274,13 @@ public:
enum {
QS_TYPE_RANGE = 0,
QS_TYPE_INDEX_MERGE = 1,
QS_TYPE_RANGE_DESC = 2,
QS_TYPE_FULLTEXT = 3,
QS_TYPE_ROR_INTERSECT = 4,
QS_TYPE_ROR_UNION = 5,
QS_TYPE_GROUP_MIN_MAX = 6
QS_TYPE_INDEX_INTERSECT = 1,
QS_TYPE_INDEX_MERGE = 2,
QS_TYPE_RANGE_DESC = 3,
QS_TYPE_FULLTEXT = 4,
QS_TYPE_ROR_INTERSECT = 5,
QS_TYPE_ROR_UNION = 6,
QS_TYPE_GROUP_MIN_MAX = 7
};
/* Get type of this quick select - one of the QS_TYPE_* values */
......@@ -393,8 +394,17 @@ protected:
friend QUICK_RANGE_SELECT *get_quick_select(PARAM*,uint idx,
SEL_ARG *key_tree,
MEM_ROOT *alloc);
friend
int read_keys_and_merge_scans(THD *thd, TABLE *head,
List<QUICK_RANGE_SELECT> quick_selects,
QUICK_RANGE_SELECT *pk_quick_select,
READ_RECORD *read_record,
bool intersection,
Unique **unique_ptr);
friend class QUICK_SELECT_DESC;
friend class QUICK_INDEX_MERGE_SELECT;
friend class QUICK_INDEX_INTERSECT_SELECT;
friend class QUICK_ROR_INTERSECT_SELECT;
friend class QUICK_GROUP_MIN_MAX_SELECT;
......@@ -545,6 +555,45 @@ public:
READ_RECORD read_record;
};
class QUICK_INDEX_INTERSECT_SELECT : public QUICK_SELECT_I
{
Unique *unique;
public:
QUICK_INDEX_INTERSECT_SELECT(THD *thd, TABLE *table);
~QUICK_INDEX_INTERSECT_SELECT();
int init();
int reset(void);
int get_next();
bool reverse_sorted() { return false; }
bool unique_key_range() { return false; }
int get_type() { return QS_TYPE_INDEX_INTERSECT; }
void add_keys_and_lengths(String *key_names, String *used_lengths);
void add_info_string(String *str);
bool is_keys_used(const MY_BITMAP *fields);
#ifndef DBUG_OFF
void dbug_dump(int indent, bool verbose);
#endif
bool push_quick_back(QUICK_RANGE_SELECT *quick_sel_range);
/* range quick selects this index_merge read consists of */
List<QUICK_RANGE_SELECT> quick_selects;
/* quick select that uses clustered primary key (NULL if none) */
QUICK_RANGE_SELECT* pk_quick_select;
/* true if this select is currently doing a clustered PK scan */
bool doing_pk_scan;
MEM_ROOT alloc;
THD *thd;
int read_keys_and_merge();
/* used to get rows collected in Unique */
READ_RECORD read_record;
};
/*
Rowid-Ordered Retrieval (ROR) index intersection quick select.
......
......@@ -2949,6 +2949,7 @@ class user_var_entry
DTCollation collation;
};
/*
Unique -- class for unique (removing of duplicates).
Puts all values to the TREE. If the tree becomes too big,
......@@ -2967,11 +2968,14 @@ class Unique :public Sql_alloc
uchar *record_pointers;
bool flush();
uint size;
uint full_size;
uint min_dupl_count;
public:
ulong elements;
Unique(qsort_cmp2 comp_func, void *comp_func_fixed_arg,
uint size_arg, ulonglong max_in_memory_size_arg);
uint size_arg, ulonglong max_in_memory_size_arg,
uint min_dupl_count_arg= 0);
~Unique();
ulong elements_in_tree() { return tree.elements_in_tree; }
inline bool unique_add(void *ptr)
......@@ -2983,6 +2987,9 @@ public:
DBUG_RETURN(!tree_insert(&tree, ptr, 0, tree.custom_arg));
}
bool is_in_memory() { return (my_b_tell(&file) == 0); }
void close_for_expansion() { tree.flag= TREE_ONLY_DUPS; }
bool get(TABLE *table);
static double get_use_cost(uint *buffer, uint nkeys, uint key_size,
ulonglong max_in_memory_size);
......@@ -3002,6 +3009,11 @@ public:
friend int unique_write_to_file(uchar* key, element_count count, Unique *unique);
friend int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique);
friend int unique_write_to_file_with_count(uchar* key, element_count count,
Unique *unique);
friend int unique_intersect_write_to_ptrs(uchar* key, element_count count,
Unique *unique);
};
......
......@@ -13637,6 +13637,7 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit,
*/
if (quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_MERGE ||
quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_INTERSECT ||
quick_type == QUICK_SELECT_I::QS_TYPE_ROR_UNION ||
quick_type == QUICK_SELECT_I::QS_TYPE_ROR_INTERSECT)
DBUG_RETURN(0);
......@@ -14042,6 +14043,7 @@ check_reverse_order:
QUICK_SELECT_DESC *tmp;
int quick_type= select->quick->get_type();
if (quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_MERGE ||
quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_INTERSECT ||
quick_type == QUICK_SELECT_I::QS_TYPE_ROR_INTERSECT ||
quick_type == QUICK_SELECT_I::QS_TYPE_ROR_UNION ||
quick_type == QUICK_SELECT_I::QS_TYPE_GROUP_MIN_MAX)
......@@ -16810,6 +16812,7 @@ static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
{
quick_type= tab->select->quick->get_type();
if ((quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_MERGE) ||
(quick_type == QUICK_SELECT_I::QS_TYPE_ROR_INTERSECT) ||
(quick_type == QUICK_SELECT_I::QS_TYPE_ROR_INTERSECT) ||
(quick_type == QUICK_SELECT_I::QS_TYPE_ROR_UNION))
tab->type = JT_INDEX_MERGE;
......@@ -17015,6 +17018,7 @@ static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
{
if (quick_type == QUICK_SELECT_I::QS_TYPE_ROR_UNION ||
quick_type == QUICK_SELECT_I::QS_TYPE_ROR_INTERSECT ||
quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_INTERSECT ||
quick_type == QUICK_SELECT_I::QS_TYPE_INDEX_MERGE)
{
extra.append(STRING_WITH_LEN("; Using "));
......
......@@ -57,6 +57,7 @@ typedef struct st_sort_param {
uint addon_length; /* Length of added packed fields */
uint res_length; /* Length of records in final sorted file/buffer */
uint keys; /* Max keys / buffer */
element_count min_dupl_count;
ha_rows max_rows,examined_rows;
TABLE *sort_form; /* For quicker make_sortkey */
SORT_FIELD *local_sortorder;
......@@ -80,4 +81,9 @@ int merge_buffers(SORTPARAM *param,IO_CACHE *from_file,
IO_CACHE *to_file, uchar *sort_buffer,
BUFFPEK *lastbuff,BUFFPEK *Fb,
BUFFPEK *Tb,int flag);
int merge_index(SORTPARAM *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile);
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length);
......@@ -33,7 +33,6 @@
#include "mysql_priv.h"
#include "sql_sort.h"
int unique_write_to_file(uchar* key, element_count count, Unique *unique)
{
/*
......@@ -45,6 +44,12 @@ int unique_write_to_file(uchar* key, element_count count, Unique *unique)
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
}
int unique_write_to_file_with_count(uchar* key, element_count count, Unique *unique)
{
return my_b_write(&unique->file, key, unique->size) ||
my_b_write(&unique->file, &count, sizeof(element_count)) ? 1 : 0;
}
int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique)
{
memcpy(unique->record_pointers, key, unique->size);
......@@ -52,10 +57,26 @@ int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique)
return 0;
}
int unique_intersect_write_to_ptrs(uchar* key, element_count count, Unique *unique)
{
if (count >= unique->min_dupl_count)
{
memcpy(unique->record_pointers, key, unique->size);
unique->record_pointers+=unique->size;
}
return 0;
}
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
uint size_arg, ulonglong max_in_memory_size_arg)
uint size_arg, ulonglong max_in_memory_size_arg,
uint min_dupl_count_arg)
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
{
min_dupl_count= min_dupl_count_arg;
full_size= size;
if (min_dupl_count_arg)
full_size+= sizeof(element_count);
my_b_clear(&file);
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
NULL, comp_func_fixed_arg);
......@@ -276,7 +297,11 @@ double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
result= 2*log2_n_fact(last_tree_elems + 1.0);
if (n_full_trees)
result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
#if 1
result /= TIME_FOR_COMPARE_ROWID;
#else
result /= TIME_FOR_COMPARE_ROWID * 10;
#endif
DBUG_PRINT("info",("unique trees sizes: %u=%u*%lu + %lu", nkeys,
n_full_trees, n_full_trees?max_elements_in_tree:0,
......@@ -327,7 +352,10 @@ bool Unique::flush()
file_ptr.count=tree.elements_in_tree;
file_ptr.file_pos=my_b_tell(&file);
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
tree_walk_action action= min_dupl_count ?
(tree_walk_action) unique_write_to_file_with_count :
(tree_walk_action) unique_write_to_file;
if (tree_walk(&tree, action,
(void*) this, left_root_right) ||
insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
return 1;
......@@ -357,6 +385,7 @@ Unique::reset()
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
}
elements= 0;
tree.flag= 0;
}
/*
......@@ -576,14 +605,16 @@ bool Unique::get(TABLE *table)
{
SORTPARAM sort_param;
table->sort.found_records=elements+tree.elements_in_tree;
if (my_b_tell(&file) == 0)
{
/* Whole tree is in memory; Don't use disk if you don't need to */
if ((record_pointers=table->sort.record_pointers= (uchar*)
my_malloc(size * tree.elements_in_tree, MYF(0))))
{
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
tree_walk_action action= min_dupl_count ?
(tree_walk_action) unique_intersect_write_to_ptrs :
(tree_walk_action) unique_write_to_ptrs;
(void) tree_walk(&tree, action,
this, left_root_right);
return 0;
}
......@@ -614,7 +645,10 @@ bool Unique::get(TABLE *table)
sort_param.max_rows= elements;
sort_param.sort_form=table;
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
size;
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
full_size;
sort_param.min_dupl_count= min_dupl_count;
sort_param.res_length= 0;
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
sort_param.not_killable=1;
......@@ -635,8 +669,9 @@ bool Unique::get(TABLE *table)
if (flush_io_cache(&file) ||
reinit_io_cache(&file,READ_CACHE,0L,0,0))
goto err;
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
file_ptr, file_ptr+maxbuffer,0))
sort_param.res_length= sort_param.rec_length-
(min_dupl_count ? sizeof(min_dupl_count) : 0);
if (merge_index(&sort_param, sort_buffer, file_ptr, maxbuffer, &file, outfile))
goto err;
error=0;
err:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment