Commit e7ef3238 authored by reggie@big_geek's avatar reggie@big_geek

Merge rburnett@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  big_geek.:C:/Work/mysql/mysql-5.1
parents 3eb9e170 0e70717e
......@@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name,
if ((my_errno= write_ndb_file(name)))
DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG
if (ndb_binlog_thread_running > 0)
ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0);
ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2),
m_dbname, m_tabname, FALSE);
#endif /* HAVE_NDB_BINLOG */
DBUG_RETURN(my_errno);
}
......@@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name,
" Event: %s", name2);
/* a warning has been issued to the client */
}
if (share && ndb_binlog_thread_running <= 0)
share->flags|= NSF_NO_BINLOG;
ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length,
share->db, share->table_name,
......@@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd)
/* no such database defined, skip table */
continue;
}
strxnmov(end, FN_LEN-1-(key-end), "/", elmt.name, NullS);
end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS);
const void *data= 0, *pack_data= 0;
uint length, pack_length;
int discover= 0;
......@@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd)
my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_lock(&LOCK_open);
if (discover)
{
/* ToDo 4.1 database needs to be created if missing */
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
{
/* ToDo 4.1 handle error */
}
pthread_mutex_unlock(&LOCK_open);
}
#ifdef HAVE_NDB_BINLOG
else if (ndb_binlog_thread_running > 0)
else
{
/* set up replication for this table */
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
(byte*) key, strlen(key)))
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
|| share == 0)
{
/*
there is no binlog creation setup for this table
attempt to do it
*/
pthread_mutex_unlock(&ndbcluster_mutex);
pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name,
share);
pthread_mutex_unlock(&LOCK_open);
}
else
pthread_mutex_unlock(&ndbcluster_mutex);
ndbcluster_create_binlog_setup(ndb, key, end-key,
elmt.database, elmt.name,
TRUE);
}
#endif
pthread_mutex_unlock(&LOCK_open);
}
}
while (unhandled && retries--);
......@@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
#ifdef HAVE_NDB_BINLOG
/* setup logging to binlog for all discovered tables */
if (ndb_binlog_thread_running > 0)
{
char *end;
char *end1=
char *end, *end1=
strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
for (i= 0; i < ok_tables.records; i++)
{
file_name= (char*)hash_element(&ok_tables, i);
end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
(byte*)name, end - name))
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
{
/*
there is no binlog creation setup for this table
attempt to do it
*/
pthread_mutex_unlock(&ndbcluster_mutex);
pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, name, db, file_name, share);
pthread_mutex_unlock(&LOCK_open);
pthread_mutex_lock(&ndbcluster_mutex);
}
/* Table existed in the mysqld so there should be a share */
DBUG_ASSERT(share != NULL);
pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, name, end-name,
db, file_name, TRUE);
pthread_mutex_unlock(&LOCK_open);
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
#endif
......
......@@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
share->table= 0;
if (ndb_binlog_thread_running <= 0)
{
DBUG_ASSERT(_table != 0);
if (_table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK;
if (_table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG;
if (_table)
{
if (_table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK;
if (_table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG;
}
else
{
share->flags|= NSF_NO_BINLOG;
}
return;
}
while (1)
......@@ -1626,32 +1632,51 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
create/discover.
*/
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
uint key_len,
const char *db,
const char *table_name,
NDB_SHARE *share)
my_bool share_may_exist)
{
DBUG_ENTER("ndbcluster_create_binlog_setup");
DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
key, key_len, db, table_name, share_may_exist));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
DBUG_ASSERT(strlen(key) == key_len);
pthread_mutex_lock(&ndbcluster_mutex);
/* Handle any trailing share */
if (share == 0)
NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, key_len);
if (share && share_may_exist)
{
share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, strlen(key));
if (share)
handle_trailing_share(share);
if (share->flags & NSF_NO_BINLOG ||
share->op != 0 ||
share->op_old != 0)
{
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0); // replication already setup, or should not
}
}
else
if (share)
{
handle_trailing_share(share);
}
/* Create share which is needed to hold replication information */
if (!(share= get_share(key, 0, true, true)))
{
sql_print_error("NDB Binlog: "
"allocating table share for %s failed", key);
}
if (ndb_binlog_thread_running <= 0)
{
share->flags|= NSF_NO_BINLOG;
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
pthread_mutex_unlock(&ndbcluster_mutex);
while (share && !IS_TMP_PREFIX(table_name))
......@@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
sql_print_error("NDB Binlog: logging of table %s "
"with no PK and blob attributes is not supported",
share->key);
share->flags|= NSF_NO_BINLOG;
DBUG_RETURN(-1);
}
/* No primary key, subscribe for all attributes */
......
......@@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton();
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
uint key_len,
const char *db,
const char *table_name,
NDB_SHARE *share);
my_bool share_may_exist);
int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
const char *event_name, NDB_SHARE *share);
int ndbcluster_create_event_ops(NDB_SHARE *share,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment