Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
cdd76505
Commit
cdd76505
authored
Jun 09, 2004
by
tomas@poseidon.bredbandsbolaget.se
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
re-enginered ndb restore to remove new/deletes and data copy
parent
2bb2af5b
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
1830 additions
and
1132 deletions
+1830
-1132
ndb/src/kernel/blocks/backup/restore/Makefile.am
ndb/src/kernel/blocks/backup/restore/Makefile.am
+1
-1
ndb/src/kernel/blocks/backup/restore/Makefile_old
ndb/src/kernel/blocks/backup/restore/Makefile_old
+0
-20
ndb/src/kernel/blocks/backup/restore/Restore.cpp
ndb/src/kernel/blocks/backup/restore/Restore.cpp
+227
-78
ndb/src/kernel/blocks/backup/restore/Restore.hpp
ndb/src/kernel/blocks/backup/restore/Restore.hpp
+36
-16
ndb/src/kernel/blocks/backup/restore/consumer.cpp
ndb/src/kernel/blocks/backup/restore/consumer.cpp
+107
-0
ndb/src/kernel/blocks/backup/restore/consumer.hpp
ndb/src/kernel/blocks/backup/restore/consumer.hpp
+40
-0
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
+96
-0
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
+50
-0
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
+508
-0
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
+79
-0
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
+652
-0
ndb/src/kernel/blocks/backup/restore/main.cpp
ndb/src/kernel/blocks/backup/restore/main.cpp
+34
-1017
No files found.
ndb/src/kernel/blocks/backup/restore/Makefile.am
View file @
cdd76505
ndbtools_PROGRAMS
=
ndb_restore
ndb_restore_SOURCES
=
main.cpp Restore.cpp
ndb_restore_SOURCES
=
main.cpp
consumer.cpp consumer_restore.cpp consumer_printer.cpp
Restore.cpp
LDADD_LOC
=
$(top_builddir)
/ndb/src/libndbclient.la
...
...
ndb/src/kernel/blocks/backup/restore/Makefile_old
deleted
100644 → 0
View file @
2bb2af5b
include .defs.mk
TYPE := *
BIN_TARGET := restore
BIN_TARGET_LIBS :=
BIN_TARGET_ARCHIVES := NDB_API
CCFLAGS_LOC = -I.. -I$(NDB_TOP)/src/ndbapi -I$(NDB_TOP)/include/ndbapi -I$(NDB_TOP)/include/util -I$(NDB_TOP)/include/portlib -I$(NDB_TOP)/include/kernel
#ifneq ($(MYSQLCLUSTER_TOP),)
#CCFLAGS_LOC +=-I$(MYSQLCLUSTER_TOP)/include -D USE_MYSQL
#LDFLAGS_LOC += -L$(MYSQLCLUSTER_TOP)/libmysql_r/ -lmysqlclient_r
#endif
SOURCES = main.cpp Restore.cpp
include $(NDB_TOP)/Epilogue.mk
ndb/src/kernel/blocks/backup/restore/Restore.cpp
View file @
cdd76505
...
...
@@ -33,32 +33,32 @@ Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data
Uint64
Twiddle64
(
Uint64
in
);
// Byte shift 64-bit data
bool
BackupFile
::
Twiddle
(
AttributeS
*
attr
,
Uint32
arraySize
){
BackupFile
::
Twiddle
(
const
AttributeDesc
*
attr_desc
,
AttributeData
*
attr_data
,
Uint32
arraySize
){
if
(
m_hostByteOrder
)
return
true
;
if
(
arraySize
==
0
){
arraySize
=
attr
->
D
esc
->
arraySize
;
arraySize
=
attr
_d
esc
->
arraySize
;
}
switch
(
attr
->
D
esc
->
size
){
switch
(
attr
_d
esc
->
size
){
case
8
:
return
true
;
case
16
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int16_value
[
i
]
=
Twiddle16
(
attr
->
Data
.
u_int16_value
[
i
]);
attr
_data
->
u_int16_value
[
i
]
=
Twiddle16
(
attr_data
->
u_int16_value
[
i
]);
}
return
true
;
case
32
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int32_value
[
i
]
=
Twiddle32
(
attr
->
Data
.
u_int32_value
[
i
]);
attr
_data
->
u_int32_value
[
i
]
=
Twiddle32
(
attr_data
->
u_int32_value
[
i
]);
}
return
true
;
case
64
:
for
(
unsigned
i
=
0
;
i
<
arraySize
;
i
++
){
attr
->
Data
.
u_int64_value
[
i
]
=
Twiddle64
(
attr
->
Data
.
u_int64_value
[
i
]);
attr
_data
->
u_int64_value
[
i
]
=
Twiddle64
(
attr_data
->
u_int64_value
[
i
]);
}
return
true
;
default:
...
...
@@ -208,7 +208,7 @@ TableS::TableS(NdbTableImpl* tableImpl)
m_dictTable
=
tableImpl
;
m_noOfNullable
=
m_nullBitmaskSize
=
0
;
for
(
Uint32
i
=
0
;
i
<
tableImpl
->
getNoOfColumns
();
i
++
)
for
(
int
i
=
0
;
i
<
tableImpl
->
getNoOfColumns
();
i
++
)
createAttr
(
tableImpl
->
getColumn
(
i
));
}
...
...
@@ -246,56 +246,112 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
}
// Constructor
RestoreDataIterator
::
RestoreDataIterator
(
const
RestoreMetaData
&
md
)
:
m_metaData
(
md
)
RestoreDataIterator
::
RestoreDataIterator
(
const
RestoreMetaData
&
md
,
void
(
*
_free_data_callback
)()
)
:
m_metaData
(
md
)
,
free_data_callback
(
_free_data_callback
)
{
debug
<<
"RestoreDataIterator constructor"
<<
endl
;
setDataFile
(
md
,
0
);
m_buffer_sz
=
64
*
1024
;
m_buffer
=
malloc
(
m_buffer_sz
);
m_buffer_ptr
=
m_buffer
;
m_buffer_data_left
=
0
;
}
RestoreDataIterator
::~
RestoreDataIterator
()
{
if
(
m_buffer
)
free
(
m_buffer
);
}
RestoreDataIterator
::~
RestoreDataIterator
(){
TupleS
&
TupleS
::
operator
=
(
const
TupleS
&
tuple
)
{
prepareRecord
(
*
tuple
.
m_currentTable
);
if
(
allAttrData
)
{
allAttrData
=
new
AttributeData
[
getNoOfAttributes
()];
memcpy
(
allAttrData
,
tuple
.
allAttrData
,
getNoOfAttributes
()
*
sizeof
(
AttributeData
));
}
return
*
this
;
};
int
TupleS
::
getNoOfAttributes
()
const
{
if
(
m_currentTable
==
0
)
return
0
;
return
m_currentTable
->
getNoOfAttributes
();
};
const
TableS
*
TupleS
::
getTable
()
const
{
return
m_currentTable
;
};
const
AttributeDesc
*
TupleS
::
getDesc
(
int
i
)
const
{
return
m_currentTable
->
allAttributesDesc
[
i
];
}
AttributeData
*
TupleS
::
getData
(
int
i
)
const
{
return
&
(
allAttrData
[
i
]);
};
bool
TupleS
::
prepareRecord
(
const
TableS
&
tab
){
m_currentTable
=
&
tab
;
for
(
int
i
=
0
;
i
<
allAttributes
.
size
();
i
++
)
{
if
(
allAttributes
[
i
]
!=
NULL
)
delete
allAttributes
[
i
];
}
allAttributes
.
clear
();
AttributeS
*
a
;
for
(
int
i
=
0
;
i
<
tab
.
getNoOfAttributes
();
i
++
){
a
=
new
AttributeS
;
if
(
a
==
NULL
)
{
ndbout_c
(
"Restore: Failed to allocate memory"
);
return
false
;
}
a
->
Desc
=
tab
[
i
];
allAttributes
.
push_back
(
a
);
if
(
allAttrData
)
{
delete
[]
allAttrData
;
m_currentTable
=
0
;
}
allAttrData
=
new
AttributeData
[
tab
.
getNoOfAttributes
()];
if
(
allAttrData
==
0
)
return
false
;
m_currentTable
=
&
tab
;
return
true
;
}
const
TupleS
*
RestoreDataIterator
::
getNextTuple
(
int
&
res
)
{
TupleS
*
tup
=
new
TupleS
();
if
(
tup
==
NULL
)
{
ndbout_c
(
"Restore: Failed to allocate memory"
);
res
=
-
1
;
return
NULL
;
}
if
(
!
tup
->
prepareRecord
(
*
m_currentTable
))
{
res
=-
1
;
return
NULL
;
Uint32
RestoreDataIterator
::
get_buffer_ptr
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
,
FILE
*
stream
)
{
Uint32
sz
=
size
*
nmemb
;
if
(
sz
>
m_buffer_data_left
)
{
if
(
free_data_callback
)
(
*
free_data_callback
)();
memcpy
(
m_buffer
,
m_buffer_ptr
,
m_buffer_data_left
);
size_t
r
=
fread
(((
char
*
)
m_buffer
)
+
m_buffer_data_left
,
1
,
m_buffer_sz
-
m_buffer_data_left
,
stream
);
m_buffer_data_left
+=
r
;
m_buffer_ptr
=
m_buffer
;
if
(
sz
>
m_buffer_data_left
)
sz
=
size
*
(
m_buffer_data_left
/
size
);
}
*
p_buf_ptr
=
m_buffer_ptr
;
m_buffer_ptr
=
((
char
*
)
m_buffer_ptr
)
+
sz
;
m_buffer_data_left
-=
sz
;
return
sz
/
size
;
}
Uint32
RestoreDataIterator
::
fread_buffer
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
,
FILE
*
stream
)
{
void
*
buf_ptr
;
Uint32
r
=
get_buffer_ptr
(
&
buf_ptr
,
size
,
nmemb
,
stream
);
memcpy
(
ptr
,
buf_ptr
,
r
*
size
);
return
r
;
}
const
TupleS
*
RestoreDataIterator
::
getNextTuple
(
int
&
res
)
{
Uint32
dataLength
=
0
;
// Read record length
if
(
fread
(
&
dataLength
,
sizeof
(
dataLength
),
1
,
m_file
)
!=
1
){
if
(
fread
_buffer
(
&
dataLength
,
sizeof
(
dataLength
),
1
,
m_file
)
!=
1
){
err
<<
"getNextTuple:Error reading length of data part"
<<
endl
;
delete
tup
;
res
=
-
1
;
return
NULL
;
}
// if
...
...
@@ -309,34 +365,34 @@ RestoreDataIterator::getNextTuple(int & res) {
// End of this data fragment
debug
<<
"End of fragment"
<<
endl
;
res
=
0
;
delete
tup
;
return
NULL
;
}
// if
tup
->
createDataRecord
(
dataLenBytes
);
// Read tuple data
if
(
fread
(
tup
->
getDataRecord
(),
1
,
dataLenBytes
,
m_file
)
!=
dataLenBytes
)
{
void
*
_buf_ptr
;
if
(
get_buffer_ptr
(
&
_buf_ptr
,
1
,
dataLenBytes
,
m_file
)
!=
dataLenBytes
)
{
err
<<
"getNextTuple:Read error: "
<<
endl
;
delete
tup
;
res
=
-
1
;
return
NULL
;
}
Uint32
*
ptr
=
tup
->
getDataRecord
()
;
Uint32
*
buf_ptr
=
(
Uint32
*
)
_buf_ptr
,
*
ptr
=
buf_ptr
;
ptr
+=
m_currentTable
->
m_nullBitmaskSize
;
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_fixedKeys
.
size
();
i
++
){
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
assert
(
ptr
<
buf_ptr
+
dataLength
);
const
Uint32
attrId
=
m_currentTable
->
m_fixedKeys
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
const
Uint32
sz
=
attr
->
Desc
->
getSizeInWords
();
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
const
Uint32
sz
=
attr_desc
->
getSizeInWords
();
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
ptr
;
attr
_data
->
null
=
false
;
attr
_data
->
void_value
=
ptr
;
if
(
!
Twiddle
(
attr
))
if
(
!
Twiddle
(
attr
_desc
,
attr_data
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -345,17 +401,19 @@ RestoreDataIterator::getNextTuple(int & res) {
}
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_fixedAttribs
.
size
();
i
++
){
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
assert
(
ptr
<
buf_ptr
+
dataLength
);
const
Uint32
attrId
=
m_currentTable
->
m_fixedAttribs
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
const
Uint32
sz
=
attr
->
Desc
->
getSizeInWords
();
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
ptr
;
const
Uint32
sz
=
attr_desc
->
getSizeInWords
();
if
(
!
Twiddle
(
attr
))
attr_data
->
null
=
false
;
attr_data
->
void_value
=
ptr
;
if
(
!
Twiddle
(
attr_desc
,
attr_data
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -366,19 +424,21 @@ RestoreDataIterator::getNextTuple(int & res) {
for
(
int
i
=
0
;
i
<
m_currentTable
->
m_variableAttribs
.
size
();
i
++
){
const
Uint32
attrId
=
m_currentTable
->
m_variableAttribs
[
i
]
->
attrId
;
AttributeS
*
attr
=
tup
->
allAttributes
[
attrId
];
if
(
attr
->
Desc
->
m_column
->
getNullable
()){
const
Uint32
ind
=
attr
->
Desc
->
m_nullBitIndex
;
AttributeData
*
attr_data
=
m_tuple
.
getData
(
attrId
);
const
AttributeDesc
*
attr_desc
=
m_tuple
.
getDesc
(
attrId
);
if
(
attr_desc
->
m_column
->
getNullable
()){
const
Uint32
ind
=
attr_desc
->
m_nullBitIndex
;
if
(
BitmaskImpl
::
get
(
m_currentTable
->
m_nullBitmaskSize
,
tup
->
getDataRecord
()
,
ind
)){
attr
->
Data
.
null
=
true
;
attr
->
Data
.
void_value
=
NULL
;
buf_ptr
,
ind
)){
attr
_data
->
null
=
true
;
attr
_data
->
void_value
=
NULL
;
continue
;
}
}
assert
(
ptr
<
tup
->
getDataRecord
()
+
dataLength
);
assert
(
ptr
<
buf_ptr
+
dataLength
);
typedef
BackupFormat
::
DataFile
::
VariableData
VarData
;
VarData
*
data
=
(
VarData
*
)
ptr
;
...
...
@@ -386,15 +446,15 @@ RestoreDataIterator::getNextTuple(int & res) {
Uint32
id
=
ntohl
(
data
->
Id
);
assert
(
id
==
attrId
);
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
&
data
->
Data
[
0
];
attr
_data
->
null
=
false
;
attr
_data
->
void_value
=
&
data
->
Data
[
0
];
/**
* Compute array size
*/
const
Uint32
arraySize
=
(
4
*
sz
)
/
(
attr
->
D
esc
->
size
/
8
);
assert
(
arraySize
>=
attr
->
D
esc
->
arraySize
);
if
(
!
Twiddle
(
attr
,
attr
->
D
esc
->
arraySize
))
const
Uint32
arraySize
=
(
4
*
sz
)
/
(
attr
_d
esc
->
size
/
8
);
assert
(
arraySize
>=
attr
_d
esc
->
arraySize
);
if
(
!
Twiddle
(
attr
_desc
,
attr_data
,
attr_d
esc
->
arraySize
))
{
res
=
-
1
;
return
NULL
;
...
...
@@ -405,7 +465,7 @@ RestoreDataIterator::getNextTuple(int & res) {
m_count
++
;
res
=
0
;
return
tup
;
return
&
m_tuple
;
}
// RestoreDataIterator::getNextTuple
BackupFile
::
BackupFile
(){
...
...
@@ -558,7 +618,7 @@ RestoreDataIterator::readFragmentHeader(int & ret)
debug
<<
"RestoreDataIterator::getNextFragment"
<<
endl
;
if
(
fread
(
&
Header
,
sizeof
(
Header
),
1
,
m_file
)
!=
1
){
if
(
fread
_buffer
(
&
Header
,
sizeof
(
Header
),
1
,
m_file
)
!=
1
){
ret
=
0
;
return
false
;
}
// if
...
...
@@ -581,6 +641,12 @@ RestoreDataIterator::readFragmentHeader(int & ret)
return
false
;
}
if
(
!
m_tuple
.
prepareRecord
(
*
m_currentTable
))
{
ret
=-
1
;
return
false
;
}
info
<<
"_____________________________________________________"
<<
endl
<<
"Restoring data in table: "
<<
m_currentTable
->
getTableName
()
<<
"("
<<
Header
.
TableId
<<
") fragment "
...
...
@@ -588,6 +654,7 @@ RestoreDataIterator::readFragmentHeader(int & ret)
m_count
=
0
;
ret
=
0
;
return
true
;
}
// RestoreDataIterator::getNextFragment
...
...
@@ -596,7 +663,7 @@ bool
RestoreDataIterator
::
validateFragmentFooter
()
{
BackupFormat
::
DataFile
::
FragmentFooter
footer
;
if
(
fread
(
&
footer
,
sizeof
(
footer
),
1
,
m_file
)
!=
1
){
if
(
fread
_buffer
(
&
footer
,
sizeof
(
footer
),
1
,
m_file
)
!=
1
){
err
<<
"getFragmentFooter:Error reading fragment footer"
<<
endl
;
return
false
;
}
...
...
@@ -787,14 +854,14 @@ RestoreLogIterator::getNextLogEntry(int & res) {
const
Uint32
sz
=
ah
->
getDataSize
();
if
(
sz
==
0
){
attr
->
Data
.
null
=
true
;
attr
->
Data
.
void_value
=
NULL
;
attr
->
Data
->
null
=
true
;
attr
->
Data
->
void_value
=
NULL
;
}
else
{
attr
->
Data
.
null
=
false
;
attr
->
Data
.
void_value
=
ah
->
getDataPtr
();
attr
->
Data
->
null
=
false
;
attr
->
Data
->
void_value
=
ah
->
getDataPtr
();
}
Twiddle
(
attr
);
Twiddle
(
attr
->
Desc
,
attr
->
Data
);
m_logEntry
.
m_values
.
push_back
(
attr
);
ah
=
ah
->
getNext
();
...
...
@@ -804,3 +871,85 @@ RestoreLogIterator::getNextLogEntry(int & res) {
res
=
0
;
return
&
m_logEntry
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
AttributeS
&
attr
){
const
AttributeData
&
data
=
*
(
attr
.
Data
);
const
AttributeDesc
&
desc
=
*
(
attr
.
Desc
);
if
(
data
.
null
)
{
ndbout
<<
"<NULL>"
;
return
ndbout
;
}
NdbRecAttr
tmprec
;
tmprec
.
setup
(
desc
.
m_column
,
(
char
*
)
data
.
void_value
);
ndbout
<<
tmprec
;
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
)
{
ndbout
<<
tuple
.
getTable
()
->
getTableName
()
<<
"; "
;
for
(
int
i
=
0
;
i
<
tuple
.
getNoOfAttributes
();
i
++
)
{
AttributeData
*
attr_data
=
tuple
.
getData
(
i
);
const
AttributeDesc
*
attr_desc
=
tuple
.
getDesc
(
i
);
const
AttributeS
attr
=
{
attr_desc
,
attr_data
};
debug
<<
i
<<
" "
<<
attr_desc
->
m_column
->
getName
();
ndbout
<<
attr
;
if
(
i
!=
(
tuple
.
getNoOfAttributes
()
-
1
))
ndbout
<<
delimiter
<<
" "
;
}
// for
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logE
)
{
switch
(
logE
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
ndbout
<<
"INSERT "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_DELETE
:
ndbout
<<
"DELETE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_UPDATE
:
ndbout
<<
"UPDATE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
default:
ndbout
<<
"Unknown log entry type (not insert, delete or update)"
;
}
for
(
int
i
=
0
;
i
<
logE
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
logE
.
m_values
[
i
];
ndbout
<<
attr
->
Desc
->
m_column
->
getName
()
<<
"="
;
ndbout
<<
(
*
attr
);
if
(
i
<
(
logE
.
m_values
.
size
()
-
1
))
ndbout
<<
", "
;
}
return
ndbout
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
table
){
ndbout
<<
endl
<<
"Table: "
<<
table
.
getTableName
()
<<
endl
;
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
ndbout
<<
desc
->
m_column
->
getName
()
<<
": "
<<
desc
->
m_column
->
getType
();
ndbout
<<
" key: "
<<
desc
->
m_column
->
getPrimaryKey
();
ndbout
<<
" array: "
<<
desc
->
arraySize
;
ndbout
<<
" size: "
<<
desc
->
size
<<
endl
;
}
// for
return
ndbout
;
}
ndb/src/kernel/blocks/backup/restore/Restore.hpp
View file @
cdd76505
...
...
@@ -18,6 +18,7 @@
#define RESTORE_H
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <BackupFormat.hpp>
#include <NdbApi.hpp>
#include "myVector.hpp"
...
...
@@ -25,6 +26,8 @@
#include <ndb_version.h>
#include <version.h>
static
const
char
*
delimiter
=
";"
;
// Delimiter in file dump
const
int
FileNameLenC
=
256
;
const
int
TableNameLenC
=
256
;
const
int
AttrNameLenC
=
256
;
...
...
@@ -82,26 +85,30 @@ public:
struct
AttributeS
{
const
AttributeDesc
*
Desc
;
AttributeData
Data
;
AttributeData
*
Data
;
};
class
TupleS
{
private:
friend
class
RestoreDataIterator
;
const
TableS
*
m_currentTable
;
myVector
<
AttributeS
*>
allAttributes
;
Uint32
*
dataRecord
;
const
TableS
*
m_currentTable
;
AttributeData
*
allAttrData
;
bool
prepareRecord
(
const
TableS
&
);
public:
TupleS
()
{
dataRecord
=
NULL
;};
~
TupleS
()
{
if
(
dataRecord
!=
NULL
)
delete
[]
dataRecord
;};
int
getNoOfAttributes
()
const
{
return
allAttributes
.
size
();
};
const
TableS
*
getTable
()
const
{
return
m_currentTable
;};
const
AttributeS
*
operator
[](
int
i
)
const
{
return
allAttributes
[
i
];};
Uint32
*
getDataRecord
()
{
return
dataRecord
;};
void
createDataRecord
(
Uint32
bytes
)
{
dataRecord
=
new
Uint32
[
bytes
];};
TupleS
()
{};
~
TupleS
()
{
if
(
allAttrData
)
delete
[]
allAttrData
;
};
TupleS
(
const
TupleS
&
tuple
);
// disable copy constructor
TupleS
&
operator
=
(
const
TupleS
&
tuple
);
int
getNoOfAttributes
()
const
;
const
TableS
*
getTable
()
const
;
const
AttributeDesc
*
getDesc
(
int
i
)
const
;
AttributeData
*
getData
(
int
i
)
const
;
};
// class TupleS
class
TableS
{
...
...
@@ -206,7 +213,7 @@ public:
const
char
*
getFilename
()
const
{
return
m_fileName
;}
Uint32
getNodeId
()
const
{
return
m_nodeId
;}
const
BackupFormat
::
FileHeader
&
getFileHeader
()
const
{
return
m_fileHeader
;}
bool
Twiddle
(
AttributeS
*
attr
,
Uint32
arraySize
=
0
);
bool
Twiddle
(
const
AttributeDesc
*
attr_desc
,
AttributeData
*
attr_data
,
Uint32
arraySize
=
0
);
};
class
RestoreMetaData
:
public
BackupFile
{
...
...
@@ -243,20 +250,28 @@ public:
class
RestoreDataIterator
:
public
BackupFile
{
const
RestoreMetaData
&
m_metaData
;
Uint32
m_count
;
TupleS
m_tuple
;
const
TableS
*
m_currentTable
;
TupleS
m_tuple
;
void
*
m_buffer
;
void
*
m_buffer_ptr
;
Uint32
m_buffer_sz
;
Uint32
m_buffer_data_left
;
void
(
*
free_data_callback
)();
public:
// Constructor
RestoreDataIterator
(
const
RestoreMetaData
&
);
RestoreDataIterator
(
const
RestoreMetaData
&
,
void
(
*
free_data_callback
)()
);
~
RestoreDataIterator
();
// Read data file fragment header
bool
readFragmentHeader
(
int
&
res
);
bool
validateFragmentFooter
();
Uint32
get_buffer_ptr
(
void
**
p_buf_ptr
,
Uint32
size
,
Uint32
nmemb
,
FILE
*
stream
);
Uint32
fread_buffer
(
void
*
ptr
,
Uint32
size
,
Uint32
nmemb
,
FILE
*
stream
);
const
TupleS
*
getNextTuple
(
int
&
res
);
};
...
...
@@ -286,6 +301,11 @@ public:
const
LogEntry
*
getNextLogEntry
(
int
&
res
);
};
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
RestoreMetaData
&
);
#endif
ndb/src/kernel/blocks/backup/restore/consumer.cpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer.hpp"
#ifdef USE_MYSQL
int
BackupConsumer
::
create_table_string
(
const
TableS
&
table
,
char
*
tableName
,
char
*
buf
){
int
pos
=
0
;
int
pos2
=
0
;
char
buf2
[
2048
];
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
"CREATE TABLE "
,
tableName
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"("
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
" primary key("
);
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
// ndbout << desc->name << ": ";
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
desc
->
m_column
->
getName
(),
" "
);
switch
(
desc
->
m_column
->
getType
()){
case
NdbDictionary
:
:
Column
::
Int
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int"
);
break
;
case
NdbDictionary
:
:
Column
::
Unsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Float
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"float"
);
break
;
case
NdbDictionary
:
:
Column
::
Decimal
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"decimal"
);
break
;
case
NdbDictionary
:
:
Column
::
Char
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"char"
);
break
;
case
NdbDictionary
:
:
Column
::
Varchar
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar"
);
break
;
case
NdbDictionary
:
:
Column
::
Binary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Varbinary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigint
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigunsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Double
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"double"
);
break
;
case
NdbDictionary
:
:
Column
::
Datetime
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"datetime"
);
break
;
case
NdbDictionary
:
:
Column
::
Timespec
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"time"
);
break
;
case
NdbDictionary
:
:
Column
::
Undefined
:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
break
;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
}
if
(
desc
->
arraySize
>
1
)
{
int
attrSize
=
desc
->
arraySize
;
pos
+=
sprintf
(
buf
+
pos
,
"%s%u%s"
,
"("
,
attrSize
,
")"
);
}
if
(
desc
->
m_column
->
getPrimaryKey
())
{
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
" not null"
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s%s"
,
desc
->
m_column
->
getName
(),
","
);
}
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
","
);
}
// for
pos2
--
;
// remove trailing comma
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
")"
);
// pos--; // remove trailing comma
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
buf2
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
") type=ndbcluster"
);
return
0
;
}
#endif // USE_MYSQL
ndb/src/kernel/blocks/backup/restore/consumer.hpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_HPP
#define CONSUMER_HPP
#include "Restore.hpp"
class
BackupConsumer
{
public:
virtual
bool
init
()
{
return
true
;}
virtual
bool
table
(
const
TableS
&
){
return
true
;}
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
)
{
return
true
;};
#endif
virtual
void
tuple
(
const
TupleS
&
){}
virtual
void
tuple_free
(){}
virtual
void
endOfTuples
(){}
virtual
void
logEntry
(
const
LogEntry
&
){}
virtual
void
endOfLogEntrys
(){}
protected:
#ifdef USE_MYSQL
int
create_table_string
(
const
TableS
&
table
,
char
*
,
char
*
);
#endif
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_printer.hpp"
bool
BackupPrinter
::
table
(
const
TableS
&
tab
)
{
if
(
m_print
||
m_print_meta
)
{
m_ndbout
<<
tab
;
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#ifdef USE_MYSQL
bool
BackupPrinter
::
table
(
const
TableS
&
tab
,
MYSQL
*
mysql
)
{
if
(
m_print
||
m_print_meta
)
{
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
tab
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
ndbout_c
(
"%s"
,
stmtCreateDB
);
char
buf
[
2048
];
create_table_string
(
tab
,
tableName
,
buf
);
ndbout_c
(
"%s"
,
buf
);
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#endif
void
BackupPrinter
::
tuple
(
const
TupleS
&
tup
)
{
m_dataCount
++
;
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupPrinter
::
logEntry
(
const
LogEntry
&
logE
)
{
if
(
m_print
||
m_print_log
)
m_ndbout
<<
logE
<<
endl
;
m_logCount
++
;
}
void
BackupPrinter
::
endOfLogEntrys
()
{
if
(
m_print
||
m_print_log
)
{
ndbout
<<
"Printed "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
" to stdout."
<<
endl
;
}
}
ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_PRINTER_HPP
#define CONSUMER_PRINTER_HPP
#include "consumer.hpp"
class
BackupPrinter
:
public
BackupConsumer
{
NdbOut
&
m_ndbout
;
public:
BackupPrinter
(
NdbOut
&
out
=
ndbout
)
:
m_ndbout
(
out
)
{
m_print
=
false
;
m_print_log
=
false
;
m_print_data
=
false
;
m_print_meta
=
false
;
}
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfTuples
()
{};
virtual
void
endOfLogEntrys
();
bool
m_print
;
bool
m_print_log
;
bool
m_print_data
;
bool
m_print_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
void
callback
(
int
,
NdbConnection
*
,
void
*
);
bool
BackupRestore
::
init
()
{
release
();
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
err
<<
"Failed to connect to ndb!!"
<<
endl
;
return
false
;
}
info
<<
"Connected to ndb!!"
<<
endl
;
m_callback
=
new
restore_callback_t
[
m_parallelism
];
if
(
m_callback
==
0
)
{
err
<<
"Failed to allocate callback structs"
<<
endl
;
return
false
;
}
m_tuples
=
new
TupleS
[
m_parallelism
];
if
(
m_tuples
==
0
)
{
err
<<
"Failed to allocate tuples"
<<
endl
;
return
false
;
}
m_free_callback
=
m_callback
;
for
(
Uint32
i
=
0
;
i
<
m_parallelism
;
i
++
)
{
m_callback
[
i
].
restore
=
this
;
m_callback
[
i
].
connection
=
0
;
m_callback
[
i
].
tup
=
&
m_tuples
[
i
];
if
(
i
>
0
)
m_callback
[
i
-
1
].
next
=
&
(
m_callback
[
i
]);
}
m_callback
[
m_parallelism
-
1
].
next
=
0
;
return
true
;
}
void
BackupRestore
::
release
()
{
if
(
m_ndb
)
{
delete
m_ndb
;
m_ndb
=
0
;
}
if
(
m_callback
)
{
delete
[]
m_callback
;
m_callback
=
0
;
}
if
(
m_tuples
)
{
delete
[]
m_tuples
;
m_tuples
=
0
;
}
}
BackupRestore
::~
BackupRestore
()
{
release
();
}
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
{
return
true
;
}
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
return
;
restore_callback_t
*
cb
=
m_free_callback
;
if
(
cb
==
0
)
assert
(
false
);
m_free_callback
=
cb
->
next
;
cb
->
retries
=
0
;
*
(
cb
->
tup
)
=
tup
;
// must do copy!
tuple_a
(
cb
);
if
(
m_free_callback
==
0
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
1
);
}
}
void
BackupRestore
::
tuple_a
(
restore_callback_t
*
cb
)
{
while
(
cb
->
retries
<
10
)
{
/**
* start transactions
*/
cb
->
connection
=
m_ndb
->
startTransaction
();
if
(
cb
->
connection
==
NULL
)
{
/*
if (errorHandler(cb))
{
continue;
}
*/
exitHandler
();
}
// if
const
TupleS
&
tup
=
*
(
cb
->
tup
);
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
cb
->
connection
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// if
int
ret
=
0
;
for
(
int
j
=
0
;
j
<
2
;
j
++
)
{
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeDesc
*
attr_desc
=
tup
.
getDesc
(
i
);
const
AttributeData
*
attr_data
=
tup
.
getData
(
i
);
int
size
=
attr_desc
->
size
;
int
arraySize
=
attr_desc
->
arraySize
;
char
*
dataPtr
=
attr_data
->
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr_desc
->
m_column
->
getPrimaryKey
())
{
if
(
j
==
1
)
continue
;
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
}
else
{
if
(
j
==
0
)
continue
;
if
(
attr_data
->
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
}
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
attr_desc
->
m_column
->
getType
());
break
;
}
}
if
(
ret
<
0
)
break
;
}
if
(
ret
<
0
)
{
if
(
errorHandler
(
cb
))
continue
;
exitHandler
();
}
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb
->
connection
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
m_transactions
++
;
return
;
}
err
<<
"Unable to recover from errors. Exiting..."
<<
endl
;
exitHandler
();
}
void
BackupRestore
::
cback
(
int
result
,
restore_callback_t
*
cb
)
{
m_transactions
--
;
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
errorHandler
(
cb
))
tuple_a
(
cb
);
// retry
else
{
err
<<
"Restore: Failed to restore data due to a unrecoverable error. Exiting..."
<<
endl
;
exitHandler
();
}
}
else
{
/**
* OK! close transaction
*/
m_ndb
->
closeTransaction
(
cb
->
connection
);
cb
->
connection
=
0
;
cb
->
next
=
m_free_callback
;
m_free_callback
=
cb
;
m_dataCount
++
;
}
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
bool
BackupRestore
::
errorHandler
(
restore_callback_t
*
cb
)
{
NdbError
error
=
cb
->
connection
->
getNdbError
();
m_ndb
->
closeTransaction
(
cb
->
connection
);
cb
->
connection
=
0
;
cb
->
retries
++
;
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
err
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
err
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
void
BackupRestore
::
exitHandler
()
{
release
();
exit
(
-
1
);
}
void
BackupRestore
::
tuple_free
()
{
if
(
!
m_restore
)
return
;
// Send all transactions to NDB
if
(
m_transactions
>
0
)
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
while
(
m_transactions
>
0
)
m_ndb
->
pollNdb
(
3000
,
m_transactions
);
}
void
BackupRestore
::
endOfTuples
()
{
tuple_free
();
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
err
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
err
<<
"Cannot get operation: "
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
err
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
tup
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
.
m_values
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
->
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
#if 1
trans
->
execute
(
Commit
);
#else
const
int
ret
=
trans
->
execute
(
Commit
);
// Both insert update and delete can fail during log running
// and it's ok
if
(
ret
!=
0
)
{
err
<<
"execute failed: "
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
#endif
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
m_restore
)
{
info
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cb
=
(
restore_callback_t
*
)
aObject
;
(
cb
->
restore
)
->
cback
(
result
,
cb
);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data->string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data->string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data->null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_RESTORE_HPP
#define CONSUMER_RESTORE_HPP
#include "consumer.hpp"
struct
restore_callback_t
{
class
BackupRestore
*
restore
;
class
TupleS
*
tup
;
class
NdbConnection
*
connection
;
int
retries
;
restore_callback_t
*
next
;
};
class
BackupRestore
:
public
BackupConsumer
{
public:
BackupRestore
(
Uint32
parallelism
=
1
)
{
m_ndb
=
0
;
m_logCount
=
m_dataCount
=
0
;
m_restore
=
false
;
m_restore_meta
=
false
;
m_parallelism
=
parallelism
;
m_callback
=
0
;
m_tuples
=
0
;
m_free_callback
=
0
;
m_transactions
=
0
;
}
virtual
~
BackupRestore
();
virtual
bool
init
();
virtual
void
release
();
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
tuple_free
();
virtual
void
tuple_a
(
restore_callback_t
*
cb
);
virtual
void
cback
(
int
result
,
restore_callback_t
*
cb
);
virtual
bool
errorHandler
(
restore_callback_t
*
cb
);
virtual
void
exitHandler
();
virtual
void
endOfTuples
();
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfLogEntrys
();
void
connectToMysql
();
Ndb
*
m_ndb
;
bool
m_restore
;
bool
m_restore_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
Uint32
m_parallelism
;
Uint32
m_transactions
;
TupleS
*
m_tuples
;
restore_callback_t
*
m_callback
;
restore_callback_t
*
m_free_callback
;
};
#endif
ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp
0 → 100644
View file @
cdd76505
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
);
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
);
bool
BackupRestore
::
init
()
{
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
ndbout
<<
"Failed to connect to ndb!!"
<<
endl
;
return
false
;
}
ndbout
<<
"Connected to ndb!!"
<<
endl
;
#if USE_MYSQL
if
(
use_mysql
)
{
if
(
mysql_thread_safe
()
==
0
)
{
ndbout
<<
"Not thread safe mysql library..."
<<
endl
;
exit
(
-
1
);
}
ndbout
<<
"Connecting to MySQL..."
<<
endl
;
/**
* nwe param:
* port
* host
* user
*/
bool
returnValue
=
true
;
mysql_init
(
&
mysql
);
{
int
portNo
=
3306
;
if
(
mysql_real_connect
(
&
mysql
,
ga_host
,
ga_user
,
ga_password
,
ga_database
,
ga_port
,
::
ga_socket
,
0
)
==
NULL
)
{
ndbout_c
(
"Connect failed: %s"
,
mysql_error
(
&
mysql
));
returnValue
=
false
;
}
ndbout
<<
"Connected to MySQL!!!"
<<
endl
;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return
returnValue
;
}
#endif
if
(
m_callback
)
{
delete
[]
m_callback
;
m_callback
=
0
;
}
m_callback
=
new
restore_callback_t
[
m_parallelism
];
if
(
m_callback
==
0
)
{
ndbout
<<
"Failed to allocate callback structs"
<<
endl
;
return
false
;
}
m_free_callback
=
m_callback
;
for
(
int
i
=
0
;
i
<
m_parallelism
;
i
++
)
{
m_callback
[
i
].
restore
=
this
;
m_callback
[
i
].
connection
=
0
;
m_callback
[
i
].
retries
=
0
;
if
(
i
>
0
)
m_callback
[
i
-
1
].
next
=
&
(
m_callback
[
i
]);
}
m_callback
[
m_parallelism
-
1
].
next
=
0
;
return
true
;
}
BackupRestore
::~
BackupRestore
()
{
if
(
m_ndb
!=
0
)
delete
m_ndb
;
if
(
m_callback
)
delete
[]
m_callback
;
}
#ifdef USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
,
MYSQL
*
mysqlp
){
if
(
!
m_restore_meta
)
{
return
true
;
}
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
table
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
/*ignore return value. mysql_select_db will trap errors anyways*/
if
(
mysql_query
(
mysqlp
,
stmtCreateDB
)
==
0
)
{
//ndbout_c("%s", stmtCreateDB);
}
if
(
mysql_select_db
(
&
mysql
,
database
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
char
buf
[
2048
];
/**
* create table ddl
*/
if
(
create_table_string
(
table
,
tableName
,
buf
))
{
ndbout_c
(
"Unable to create a table definition since the "
"backup contains undefined types"
);
return
false
;
}
//ndbout_c("%s", buf);
if
(
mysql_query
(
mysqlp
,
buf
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
else
{
ndbout_c
(
"Successfully restored table %s into database %s"
,
tableName
,
database
);
}
return
true
;
}
#endif
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
{
return
true
;
}
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
{
delete
&
tup
;
return
;
}
restore_callback_t
*
cb
=
m_free_callback
;
if
(
cb
)
{
m_free_callback
=
cb
->
next
;
cb
->
retries
=
0
;
cb
->
tup
=
&
tup
;
tuple_a
(
cb
);
}
if
(
m_free_callback
==
0
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
1
);
}
}
void
BackupRestore
::
tuple_a
(
restore_callback_t
*
cb
)
{
while
(
cb
->
retries
<
10
)
{
/**
* start transactions
*/
cb
->
connection
=
m_ndb
->
startTransaction
();
if
(
cb
->
connection
==
NULL
)
{
/*
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
continue;
}
*/
asynchExitHandler
();
}
// if
const
TupleS
&
tup
=
*
(
cb
->
tup
);
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
cb
->
connection
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
Uint32
ret
=
0
;
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
{
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
}
else
{
if
(
attr
->
Data
.
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
}
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
break
;
}
asynchExitHandler
();
}
}
if
(
ret
<
0
)
continue
;
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb
->
connection
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
m_transactions
++
;
}
ndbout_c
(
"Unable to recover from errors. Exiting..."
);
asynchExitHandler
();
}
void
BackupRestore
::
cback
(
int
result
,
restore_callback_t
*
cb
)
{
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
asynchErrorHandler
(
cb
->
connection
,
m_ndb
))
{
cb
->
retries
++
;
tuple_a
(
cb
);
}
else
{
ndbout_c
(
"Restore: Failed to restore data "
"due to a unrecoverable error. Exiting..."
);
delete
m_ndb
;
delete
cb
->
tup
;
exit
(
-
1
);
}
}
else
{
/**
* OK! close transaction
*/
m_ndb
->
closeTransaction
(
cb
->
connection
);
delete
cb
->
tup
;
m_transactions
--
;
}
}
void
BackupRestore
::
asynchExitHandler
()
{
if
(
m_ndb
!=
NULL
)
delete
m_ndb
;
exit
(
-
1
);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
void
BackupRestore
::
endOfTuples
()
{
if
(
!
m_restore
)
return
;
// Send all transactions to NDB
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
m_ndb
->
pollNdb
(
3000
,
m_transactions
);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
ndbout
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
tup
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
.
m_values
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
#if 1
trans
->
execute
(
Commit
);
#else
const
int
ret
=
trans
->
execute
(
Commit
);
// Both insert update and delete can fail during log running
// and it's ok
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
#endif
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
m_restore
)
{
ndbout
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cb
=
(
restore_callback_t
*
)
aObject
;
(
cb
->
restore
)
->
cback
(
result
,
cb
);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
)
{
NdbError
error
=
trans
->
getNdbError
();
ndb
->
closeTransaction
(
trans
);
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
ndbout
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
ndbout
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
ndb/src/kernel/blocks/backup/restore/main.cpp
View file @
cdd76505
...
...
@@ -14,9 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Restore.hpp"
#include <getarg.h>
#include <NdbSleep.h>
#include <Vector.hpp>
#include <ndb_limits.h>
#include <NdbTCP.h>
...
...
@@ -26,18 +24,15 @@
#include <NdbOut.hpp>
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logEntry
);
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
RestoreMetaData
&
);
#include "consumer_restore.hpp"
#include "consumer_printer.hpp"
extern
FilteredNdbOut
err
;
extern
FilteredNdbOut
info
;
extern
FilteredNdbOut
debug
;
static
const
char
*
delimiter
=
";"
;
// Delimiter in file dump
static
int
ga_nodeId
=
0
;
static
int
ga_nParallelism
=
1
;
static
int
ga_nParallelism
=
1
28
;
static
int
ga_backupId
=
0
;
static
bool
ga_dont_ignore_systab_0
=
false
;
static
myVector
<
class
BackupConsumer
*>
g_consumers
;
...
...
@@ -58,19 +53,11 @@ static MYSQL mysql;
#ifdef NDB_WIN32
static
const
char
*
ga_backupPath
=
".
\\
"
;
static
const
char
*
ga_backupPath
=
".
"
DIR_SEPARATOR
;
#else
static
const
char
*
ga_backupPath
=
".
/"
;
static
const
char
*
ga_backupPath
=
".
"
DIR_SEPARATOR
;
#endif
typedef
struct
{
void
*
ndb
;
void
*
restore
;
TupleS
*
tup
;
int
transaction
;
int
retries
;
}
restore_callback_t
;
static
const
char
*
ga_connect_NDB
=
NULL
;
/**
...
...
@@ -78,102 +65,9 @@ static const char* ga_connect_NDB = NULL;
*/
static
bool
ga_restore
=
false
;
static
bool
ga_print
=
false
;
class
BackupConsumer
{
public:
virtual
bool
init
()
{
return
true
;}
virtual
bool
table
(
const
TableS
&
){
return
true
;}
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
)
{
return
true
;};
#endif
virtual
void
tuple
(
const
TupleS
&
){}
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
)
{};
// virtual bool asynchErrorHandler(NdbConnection * trans){return true;};
virtual
void
asynchExitHandler
(){};
virtual
void
endOfTuples
(){}
virtual
void
logEntry
(
const
LogEntry
&
){}
virtual
void
endOfLogEntrys
(){}
protected:
#ifdef USE_MYSQL
int
create_table_string
(
const
TableS
&
table
,
char
*
,
char
*
);
#endif
};
class
BackupPrinter
:
public
BackupConsumer
{
NdbOut
&
m_ndbout
;
public:
BackupPrinter
(
NdbOut
&
out
=
ndbout
)
:
m_ndbout
(
out
)
{
m_print
=
false
;
m_print_log
=
false
;
m_print_data
=
false
;
m_print_meta
=
false
;
}
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfTuples
()
{};
virtual
void
endOfLogEntrys
();
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
);
bool
m_print
;
bool
m_print_log
;
bool
m_print_data
;
bool
m_print_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
class
BackupRestore
:
public
BackupConsumer
{
public:
BackupRestore
()
{
m_ndb
=
0
;
m_logCount
=
m_dataCount
=
0
;
m_restore
=
false
;
m_restore_meta
=
false
;
}
virtual
~
BackupRestore
();
virtual
bool
init
();
virtual
bool
table
(
const
TableS
&
);
#ifdef USE_MYSQL
virtual
bool
table
(
const
TableS
&
,
MYSQL
*
mysqlp
);
#endif
virtual
void
tuple
(
const
TupleS
&
);
virtual
void
tupleAsynch
(
const
TupleS
&
,
restore_callback_t
*
callback
);
virtual
void
asynchExitHandler
();
virtual
void
endOfTuples
();
virtual
void
logEntry
(
const
LogEntry
&
);
virtual
void
endOfLogEntrys
();
void
connectToMysql
();
Ndb
*
m_ndb
;
bool
m_restore
;
bool
m_restore_meta
;
Uint32
m_logCount
;
Uint32
m_dataCount
;
};
bool
readArguments
(
const
int
argc
,
const
char
**
argv
)
{
BackupPrinter
*
printer
=
new
BackupPrinter
();
if
(
printer
==
NULL
)
return
false
;
BackupRestore
*
restore
=
new
BackupRestore
();
if
(
restore
==
NULL
)
{
delete
printer
;
return
false
;
}
int
_print
=
0
;
int
_print_meta
=
0
;
...
...
@@ -238,8 +132,17 @@ readArguments(const int argc, const char** argv)
{
arg_printusage
(
args
,
num_args
,
argv
[
0
],
"<path to backup files>
\n
"
);
return
false
;
}
BackupPrinter
*
printer
=
new
BackupPrinter
();
if
(
printer
==
NULL
)
return
false
;
BackupRestore
*
restore
=
new
BackupRestore
(
ga_nParallelism
);
if
(
restore
==
NULL
)
{
delete
printer
;
delete
restore
;
return
false
;
}
...
...
@@ -321,10 +224,7 @@ clearConsumers()
g_consumers
.
clear
();
}
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
);
static
NdbConnection
*
asynchTrans
[
1024
];
bool
static
bool
checkSysTable
(
const
char
*
tableName
)
{
return
ga_dont_ignore_systab_0
||
...
...
@@ -335,6 +235,13 @@ checkSysTable(const char *tableName)
}
static
void
free_data_callback
()
{
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
tuple_free
();
}
int
main
(
int
argc
,
const
char
**
argv
)
{
...
...
@@ -343,6 +250,12 @@ main(int argc, const char** argv)
return
-
1
;
}
if
(
ga_connect_NDB
!=
NULL
)
{
// Use connection string
Ndb
::
setConnectString
(
ga_connect_NDB
);
}
/**
* we must always load meta data, even if we will only print it to stdout
*/
...
...
@@ -414,13 +327,11 @@ main(int argc, const char** argv)
}
}
if
(
ga_restore
||
ga_print
)
{
if
(
ga_restore
)
{
RestoreDataIterator
dataIter
(
metaData
);
RestoreDataIterator
dataIter
(
metaData
,
&
free_data_callback
);
// Read data file header
if
(
!
dataIter
.
readHeader
())
...
...
@@ -432,17 +343,13 @@ main(int argc, const char** argv)
while
(
dataIter
.
readFragmentHeader
(
res
))
{
const
TupleS
*
tuple
=
0
;
const
TupleS
*
tuple
;
while
((
tuple
=
dataIter
.
getNextTuple
(
res
))
!=
NULL
)
{
if
(
checkSysTable
(
tuple
->
getTable
()
->
getTableName
()))
{
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
{
g_consumers
[
i
]
->
tupleAsynch
(
*
tuple
,
0
);
}
}
}
while
(
tuple
!=
NULL
);
for
(
int
i
=
0
;
i
<
g_consumers
.
size
();
i
++
)
g_consumers
[
i
]
->
tuple
(
*
tuple
);
}
// while (tuple != NULL);
if
(
res
<
0
)
{
...
...
@@ -504,893 +411,3 @@ main(int argc, const char** argv)
return
1
;
}
// main
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
AttributeS
&
attr
){
const
AttributeData
&
data
=
attr
.
Data
;
const
AttributeDesc
&
desc
=
*
attr
.
Desc
;
if
(
data
.
null
)
{
ndbout
<<
"<NULL>"
;
return
ndbout
;
}
NdbRecAttr
tmprec
;
tmprec
.
setup
(
desc
.
m_column
,
(
char
*
)
data
.
void_value
);
ndbout
<<
tmprec
;
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TupleS
&
tuple
)
{
ndbout
<<
tuple
.
getTable
()
->
getTableName
()
<<
"; "
;
for
(
int
i
=
0
;
i
<
tuple
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tuple
[
i
];
debug
<<
i
<<
" "
<<
attr
->
Desc
->
m_column
->
getName
();
ndbout
<<
(
*
attr
);
if
(
i
!=
(
tuple
.
getNoOfAttributes
()
-
1
))
ndbout
<<
delimiter
<<
" "
;
}
// for
return
ndbout
;
}
// Print tuple data
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
LogEntry
&
logE
)
{
switch
(
logE
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
ndbout
<<
"INSERT "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_DELETE
:
ndbout
<<
"DELETE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
case
LogEntry
:
:
LE_UPDATE
:
ndbout
<<
"UPDATE "
<<
logE
.
m_table
->
getTableName
()
<<
" "
;
break
;
default:
ndbout
<<
"Unknown log entry type (not insert, delete or update)"
;
}
for
(
int
i
=
0
;
i
<
logE
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
logE
.
m_values
[
i
];
ndbout
<<
attr
->
Desc
->
m_column
->
getName
()
<<
"="
;
ndbout
<<
(
*
attr
);
if
(
i
<
(
logE
.
m_values
.
size
()
-
1
))
ndbout
<<
", "
;
}
return
ndbout
;
}
NdbOut
&
operator
<<
(
NdbOut
&
ndbout
,
const
TableS
&
table
){
ndbout
<<
endl
<<
"Table: "
<<
table
.
getTableName
()
<<
endl
;
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
ndbout
<<
desc
->
m_column
->
getName
()
<<
": "
<<
desc
->
m_column
->
getType
();
ndbout
<<
" key: "
<<
desc
->
m_column
->
getPrimaryKey
();
ndbout
<<
" array: "
<<
desc
->
arraySize
;
ndbout
<<
" size: "
<<
desc
->
size
<<
endl
;
}
// for
return
ndbout
;
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
bool
BackupPrinter
::
table
(
const
TableS
&
tab
)
{
if
(
m_print
||
m_print_meta
)
{
m_ndbout
<<
tab
;
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#ifdef USE_MYSQL
bool
BackupPrinter
::
table
(
const
TableS
&
tab
,
MYSQL
*
mysql
)
{
if
(
m_print
||
m_print_meta
)
{
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
tab
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
ndbout_c
(
"%s"
,
stmtCreateDB
);
char
buf
[
2048
];
create_table_string
(
tab
,
tableName
,
buf
);
ndbout_c
(
"%s"
,
buf
);
ndbout_c
(
"Successfully printed table: %s"
,
tab
.
m_dictTable
->
getName
());
}
return
true
;
}
#endif
void
BackupPrinter
::
tuple
(
const
TupleS
&
tup
)
{
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupPrinter
::
logEntry
(
const
LogEntry
&
logE
)
{
if
(
m_print
||
m_print_log
)
m_ndbout
<<
logE
<<
endl
;
m_logCount
++
;
}
bool
BackupRestore
::
init
()
{
if
(
!
m_restore
&&
!
m_restore_meta
)
return
true
;
if
(
ga_connect_NDB
!=
NULL
)
{
// Use connection string
Ndb
::
setConnectString
(
ga_connect_NDB
);
}
m_ndb
=
new
Ndb
();
if
(
m_ndb
==
NULL
)
return
false
;
// Turn off table name completion
m_ndb
->
useFullyQualifiedNames
(
false
);
m_ndb
->
init
(
1024
);
if
(
m_ndb
->
waitUntilReady
(
30
)
!=
0
)
{
ndbout
<<
"Failed to connect to ndb!!"
<<
endl
;
delete
m_ndb
;
return
false
;
}
ndbout
<<
"Connected to ndb!!"
<<
endl
;
#if USE_MYSQL
if
(
use_mysql
)
{
if
(
mysql_thread_safe
()
==
0
)
{
ndbout
<<
"Not thread safe mysql library..."
<<
endl
;
exit
(
-
1
);
}
ndbout
<<
"Connecting to MySQL..."
<<
endl
;
/**
* nwe param:
* port
* host
* user
*/
bool
returnValue
=
true
;
mysql_init
(
&
mysql
);
{
int
portNo
=
3306
;
if
(
mysql_real_connect
(
&
mysql
,
ga_host
,
ga_user
,
ga_password
,
ga_database
,
ga_port
,
ga_socket
,
0
)
==
NULL
)
{
ndbout_c
(
"Connect failed: %s"
,
mysql_error
(
&
mysql
));
returnValue
=
false
;
}
ndbout
<<
"Connected to MySQL!!!"
<<
endl
;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return
returnValue
;
}
#endif
return
true
;
}
BackupRestore
::~
BackupRestore
()
{
if
(
m_ndb
!=
0
)
delete
m_ndb
;
}
#ifdef USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
,
MYSQL
*
mysqlp
){
if
(
!
m_restore_meta
)
{
return
true
;
}
char
tmpTabName
[
MAX_TAB_NAME_SIZE
*
2
];
sprintf
(
tmpTabName
,
"%s"
,
table
.
getTableName
());
char
*
database
=
strtok
(
tmpTabName
,
"/"
);
char
*
schema
=
strtok
(
NULL
,
"/"
);
char
*
tableName
=
strtok
(
NULL
,
"/"
);
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if
(
database
==
NULL
)
return
false
;
if
(
schema
==
NULL
)
return
false
;
if
(
tableName
==
NULL
)
tableName
=
schema
;
char
stmtCreateDB
[
255
];
sprintf
(
stmtCreateDB
,
"CREATE DATABASE %s"
,
database
);
/*ignore return value. mysql_select_db will trap errors anyways*/
if
(
mysql_query
(
mysqlp
,
stmtCreateDB
)
==
0
)
{
//ndbout_c("%s", stmtCreateDB);
}
if
(
mysql_select_db
(
&
mysql
,
database
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
char
buf
[
2048
];
/**
* create table ddl
*/
if
(
create_table_string
(
table
,
tableName
,
buf
))
{
ndbout_c
(
"Unable to create a table definition since the "
"backup contains undefined types"
);
return
false
;
}
//ndbout_c("%s", buf);
if
(
mysql_query
(
mysqlp
,
buf
)
!=
0
)
{
ndbout_c
(
"Error: %s"
,
mysql_error
(
&
mysql
));
return
false
;
}
else
{
ndbout_c
(
"Successfully restored table %s into database %s"
,
tableName
,
database
);
}
return
true
;
}
int
BackupConsumer
::
create_table_string
(
const
TableS
&
table
,
char
*
tableName
,
char
*
buf
){
int
pos
=
0
;
int
pos2
=
0
;
char
buf2
[
2048
];
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
"CREATE TABLE "
,
tableName
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"("
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
" primary key("
);
for
(
int
j
=
0
;
j
<
table
.
getNoOfAttributes
();
j
++
)
{
const
AttributeDesc
*
desc
=
table
[
j
];
// ndbout << desc->name << ": ";
pos
+=
sprintf
(
buf
+
pos
,
"%s%s"
,
desc
->
m_column
->
getName
(),
" "
);
switch
(
desc
->
m_column
->
getType
()){
case
NdbDictionary
:
:
Column
::
Int
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int"
);
break
;
case
NdbDictionary
:
:
Column
::
Unsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"int unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Float
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"float"
);
break
;
case
NdbDictionary
:
:
Column
::
Decimal
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"decimal"
);
break
;
case
NdbDictionary
:
:
Column
::
Char
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"char"
);
break
;
case
NdbDictionary
:
:
Column
::
Varchar
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar"
);
break
;
case
NdbDictionary
:
:
Column
::
Binary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Varbinary
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"varchar binary"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigint
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint"
);
break
;
case
NdbDictionary
:
:
Column
::
Bigunsigned
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"bigint unsigned"
);
break
;
case
NdbDictionary
:
:
Column
::
Double
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"double"
);
break
;
case
NdbDictionary
:
:
Column
::
Datetime
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"datetime"
);
break
;
case
NdbDictionary
:
:
Column
::
Timespec
:
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
"time"
);
break
;
case
NdbDictionary
:
:
Column
::
Undefined
:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
break
;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return
-
1
;
}
if
(
desc
->
arraySize
>
1
)
{
int
attrSize
=
desc
->
arraySize
;
pos
+=
sprintf
(
buf
+
pos
,
"%s%u%s"
,
"("
,
attrSize
,
")"
);
}
if
(
desc
->
m_column
->
getPrimaryKey
())
{
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
" not null"
);
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s%s"
,
desc
->
m_column
->
getName
(),
","
);
}
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
","
);
}
// for
pos2
--
;
// remove trailing comma
pos2
+=
sprintf
(
buf2
+
pos2
,
"%s"
,
")"
);
// pos--; // remove trailing comma
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
buf2
);
pos
+=
sprintf
(
buf
+
pos
,
"%s"
,
") type=ndbcluster"
);
return
0
;
}
#endif // USE_MYSQL
bool
BackupRestore
::
table
(
const
TableS
&
table
){
if
(
!
m_restore_meta
)
{
return
true
;
}
NdbDictionary
::
Dictionary
*
dict
=
m_ndb
->
getDictionary
();
if
(
dict
->
createTable
(
*
table
.
m_dictTable
)
==
-
1
)
{
err
<<
"Create table "
<<
table
.
getTableName
()
<<
" failed: "
<<
dict
->
getNdbError
()
<<
endl
;
return
false
;
}
info
<<
"Successfully restored table "
<<
table
.
getTableName
()
<<
endl
;
return
true
;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static
void
callback
(
int
result
,
NdbConnection
*
trans
,
void
*
aObject
)
{
restore_callback_t
*
cbData
=
(
restore_callback_t
*
)
aObject
;
if
(
result
<
0
)
{
/**
* Error. temporary or permanent?
*/
if
(
asynchErrorHandler
(
trans
,
(
Ndb
*
)
cbData
->
ndb
))
{
((
Ndb
*
)
cbData
->
ndb
)
->
closeTransaction
(
asynchTrans
[
cbData
->
transaction
]);
cbData
->
retries
++
;
((
BackupRestore
*
)
cbData
)
->
tupleAsynch
(
*
(
TupleS
*
)(
cbData
->
tup
),
cbData
);
}
else
{
ndbout_c
(
"Restore: Failed to restore data "
"due to a unrecoverable error. Exiting..."
);
delete
(
Ndb
*
)
cbData
->
ndb
;
delete
cbData
->
tup
;
delete
cbData
;
exit
(
-
1
);
}
}
else
{
/**
* OK! close transaction
*/
((
Ndb
*
)
cbData
->
ndb
)
->
closeTransaction
(
asynchTrans
[
cbData
->
transaction
]);
delete
cbData
->
tup
;
delete
cbData
;
}
}
static
int
nPreparedTransactions
=
0
;
void
BackupPrinter
::
tupleAsynch
(
const
TupleS
&
tup
,
restore_callback_t
*
callback
)
{
m_dataCount
++
;
if
(
m_print
||
m_print_data
)
m_ndbout
<<
tup
<<
endl
;
}
void
BackupRestore
::
tupleAsynch
(
const
TupleS
&
tup
,
restore_callback_t
*
cbData
)
{
if
(
!
m_restore
)
{
delete
&
tup
;
return
;
}
Uint32
retries
;
if
(
cbData
!=
0
)
retries
=
cbData
->
retries
;
else
retries
=
0
;
while
(
retries
<
10
)
{
/**
* start transactions
*/
asynchTrans
[
nPreparedTransactions
]
=
m_ndb
->
startTransaction
();
if
(
asynchTrans
[
nPreparedTransactions
]
==
NULL
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
asynchTrans
[
nPreparedTransactions
]
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
if
(
op
->
writeTuple
()
==
-
1
)
{
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
// if
Uint32
ret
=
0
;
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
{
ret
=
op
->
equal
(
i
,
dataPtr
,
length
);
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
}
}
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
char
*
dataPtr
=
attr
->
Data
.
string_value
;
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
!
attr
->
Desc
->
m_column
->
getPrimaryKey
())
if
(
attr
->
Data
.
null
)
ret
=
op
->
setValue
(
i
,
NULL
,
0
);
else
ret
=
op
->
setValue
(
i
,
dataPtr
,
length
);
if
(
ret
<
0
)
{
ndbout_c
(
"Column: %d type %d"
,
i
,
tup
.
getTable
()
->
m_dictTable
->
getColumn
(
i
)
->
getType
());
if
(
asynchErrorHandler
(
asynchTrans
[
nPreparedTransactions
],
m_ndb
))
{
retries
++
;
continue
;
}
asynchExitHandler
();
}
}
restore_callback_t
*
cb
;
if
(
cbData
==
0
)
{
cb
=
new
restore_callback_t
;
cb
->
retries
=
0
;
}
else
cb
=
cbData
;
cb
->
ndb
=
m_ndb
;
cb
->
restore
=
this
;
cb
->
tup
=
(
TupleS
*
)
&
tup
;
cb
->
transaction
=
nPreparedTransactions
;
// Prepare transaction (the transaction is NOT yet sent to NDB)
asynchTrans
[
nPreparedTransactions
]
->
executeAsynchPrepare
(
Commit
,
&
callback
,
cb
);
if
(
nPreparedTransactions
==
ga_nParallelism
-
1
)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb
->
sendPollNdb
(
3000
,
ga_nParallelism
);
nPreparedTransactions
=
0
;
}
else
nPreparedTransactions
++
;
m_dataCount
++
;
return
;
}
ndbout_c
(
"Unable to recover from errors. Exiting..."
);
asynchExitHandler
();
}
void
BackupRestore
::
asynchExitHandler
()
{
if
(
m_ndb
!=
NULL
)
delete
m_ndb
;
exit
(
-
1
);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool
asynchErrorHandler
(
NdbConnection
*
trans
,
Ndb
*
ndb
)
{
NdbError
error
=
trans
->
getNdbError
();
ndb
->
closeTransaction
(
trans
);
switch
(
error
.
status
)
{
case
NdbError
:
:
Success
:
return
false
;
// ERROR!
break
;
case
NdbError
:
:
TemporaryError
:
NdbSleep_MilliSleep
(
10
);
return
true
;
// RETRY
break
;
case
NdbError
:
:
UnknownResult
:
ndbout
<<
error
<<
endl
;
return
false
;
// ERROR!
break
;
default:
case
NdbError
:
:
PermanentError
:
switch
(
error
.
code
)
{
case
499
:
case
250
:
NdbSleep_MilliSleep
(
10
);
return
true
;
//temp errors?
default:
break
;
}
//ERROR
ndbout
<<
error
<<
endl
;
return
false
;
break
;
}
return
false
;
}
void
BackupRestore
::
tuple
(
const
TupleS
&
tup
)
{
if
(
!
m_restore
)
return
;
while
(
1
)
{
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
getTable
();
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
// TODO: check return value and handle error
if
(
op
->
writeTuple
()
==
-
1
)
{
ndbout
<<
"writeTuple call failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
i
,
dataPtr
,
length
);
}
for
(
int
i
=
0
;
i
<
tup
.
getNoOfAttributes
();
i
++
)
{
const
AttributeS
*
attr
=
tup
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
*
arraySize
)
/
8
;
if
(
!
attr
->
Desc
->
m_column
->
getPrimaryKey
())
if
(
attr
->
Data
.
null
)
op
->
setValue
(
i
,
NULL
,
0
);
else
op
->
setValue
(
i
,
dataPtr
,
length
);
}
int
ret
=
trans
->
execute
(
Commit
);
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
m_ndb
->
closeTransaction
(
trans
);
if
(
ret
==
0
)
break
;
}
m_dataCount
++
;
}
void
BackupRestore
::
endOfTuples
()
{
if
(
!
m_restore
)
return
;
// Send all transactions to NDB
m_ndb
->
sendPreparedTransactions
(
0
);
// Poll all transactions
m_ndb
->
pollNdb
(
3000
,
nPreparedTransactions
);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
nPreparedTransactions
=
0
;
}
void
BackupRestore
::
logEntry
(
const
LogEntry
&
tup
)
{
if
(
!
m_restore
)
return
;
NdbConnection
*
trans
=
m_ndb
->
startTransaction
();
if
(
trans
==
NULL
)
{
// Deep shit, TODO: handle the error
ndbout
<<
"Cannot start transaction"
<<
endl
;
exit
(
-
1
);
}
// if
const
TableS
*
table
=
tup
.
m_table
;
NdbOperation
*
op
=
trans
->
getNdbOperation
(
table
->
getTableName
());
if
(
op
==
NULL
)
{
ndbout
<<
"Cannot get operation: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
// if
int
check
=
0
;
switch
(
tup
.
m_type
)
{
case
LogEntry
:
:
LE_INSERT
:
check
=
op
->
insertTuple
();
break
;
case
LogEntry
:
:
LE_UPDATE
:
check
=
op
->
updateTuple
();
break
;
case
LogEntry
:
:
LE_DELETE
:
check
=
op
->
deleteTuple
();
break
;
default:
ndbout
<<
"Log entry has wrong operation type."
<<
" Exiting..."
;
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
tup
.
m_values
.
size
();
i
++
)
{
const
AttributeS
*
attr
=
tup
.
m_values
[
i
];
int
size
=
attr
->
Desc
->
size
;
int
arraySize
=
attr
->
Desc
->
arraySize
;
const
char
*
dataPtr
=
attr
->
Data
.
string_value
;
const
Uint32
length
=
(
size
/
8
)
*
arraySize
;
if
(
attr
->
Desc
->
m_column
->
getPrimaryKey
())
op
->
equal
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
else
op
->
setValue
(
attr
->
Desc
->
attrId
,
dataPtr
,
length
);
}
#if 1
trans
->
execute
(
Commit
);
#else
const
int
ret
=
trans
->
execute
(
Commit
);
// Both insert update and delete can fail during log running
// and it's ok
if
(
ret
!=
0
)
{
ndbout
<<
"execute failed: "
;
ndbout
<<
trans
->
getNdbError
()
<<
endl
;
exit
(
-
1
);
}
#endif
m_ndb
->
closeTransaction
(
trans
);
m_logCount
++
;
}
void
BackupRestore
::
endOfLogEntrys
()
{
if
(
ga_restore
)
{
ndbout
<<
"Restored "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
endl
;
}
}
void
BackupPrinter
::
endOfLogEntrys
()
{
if
(
m_print
||
m_print_log
)
{
ndbout
<<
"Printed "
<<
m_dataCount
<<
" tuples and "
<<
m_logCount
<<
" log entries"
<<
" to stdout."
<<
endl
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment