Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Rafael Monnerat
slapos
Commits
9e118e67
Commit
9e118e67
authored
Nov 15, 2012
by
Marco Mariani
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
renamed variables for clarity, string constants, cleanup
parent
1daa1179
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
64 additions
and
55 deletions
+64
-55
slapos/recipe/addresiliency/__init__.py
slapos/recipe/addresiliency/__init__.py
+4
-4
slapos/recipe/addresiliency/bully.py
slapos/recipe/addresiliency/bully.py
+60
-51
No files found.
slapos/recipe/addresiliency/__init__.py
View file @
9e118e67
...
@@ -39,15 +39,15 @@ class Recipe(GenericSlapRecipe):
...
@@ -39,15 +39,15 @@ class Recipe(GenericSlapRecipe):
confpath
=
os
.
path
.
join
(
self
.
options
[
'script'
],
'bully.conf'
)
confpath
=
os
.
path
.
join
(
self
.
options
[
'script'
],
'bully.conf'
)
ip
=
self
.
parameter_dict
[
'ip-list'
].
split
(
' '
)
ip
_list
=
self
.
parameter_dict
[
'ip-list'
]
print
'Creating bully configuration with ips : %s
\
n
'
%
ip
print
'Creating bully configuration with ips : %s
\
n
'
%
ip
_list
conf
=
self
.
createFile
(
confpath
,
conf
=
self
.
createFile
(
confpath
,
self
.
substituteTemplate
(
self
.
substituteTemplate
(
self
.
getTemplateFilename
(
'bully.conf.in'
),
self
.
getTemplateFilename
(
'bully.conf.in'
),
{
{
'self_id'
:
int
(
self
.
parameter_dict
[
'number'
]),
'self_id'
:
int
(
self
.
parameter_dict
[
'number'
]),
'ip_list'
:
ip
'ip_list'
:
ip
_list
}
}
))
))
path_list
.
append
(
conf
)
path_list
.
append
(
conf
)
...
@@ -58,7 +58,7 @@ class Recipe(GenericSlapRecipe):
...
@@ -58,7 +58,7 @@ class Recipe(GenericSlapRecipe):
wrapper
=
self
.
createPythonScript
(
wrapper
=
self
.
createPythonScript
(
name
=
os
.
path
.
join
(
self
.
options
[
'bin'
],
self
.
parameter_dict
[
'wrapper'
]),
name
=
os
.
path
.
join
(
self
.
options
[
'bin'
],
self
.
parameter_dict
[
'wrapper'
]),
absolute_function
=
'slapos.recipe.addresiliency.bully.run'
,
absolute_function
=
'slapos.recipe.addresiliency.bully.run'
,
arguments
=
{
arguments
=
{
'confpath'
:
confpath
,
'confpath'
:
confpath
,
'server_url'
:
slap_connection
[
'server-url'
],
'server_url'
:
slap_connection
[
'server-url'
],
'key_file'
:
slap_connection
.
get
(
'key-file'
),
'key_file'
:
slap_connection
.
get
(
'key-file'
),
...
...
slapos/recipe/addresiliency/bully.py
View file @
9e118e67
...
@@ -8,12 +8,25 @@ import time
...
@@ -8,12 +8,25 @@ import time
from
slapos
import
slap
as
slapmodule
from
slapos
import
slap
as
slapmodule
import
slapos
import
slapos
log
=
logging
.
getLogger
(
__name__
)
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
BASE_PORT
=
50000
BASE_PORT
=
50000
SLEEPING_MINS
=
2
# XXX was 30, increase after testing
SLEEPING_MINS
=
2
# XXX was 30, increase after testing
log
=
logging
.
getLogger
(
__name__
)
MSG_PING
=
'ping'
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
MSG_HALT
=
'halt'
MSG_VICTORY
=
'victory'
MSG_OK
=
'ok'
STATE_NORMAL
=
'normal'
STATE_WAITINGCONFIRM
=
'waitingConfirm'
STATE_ELECTION
=
'election'
STATE_REORGANIZATION
=
'reorganization'
class
Renamer
(
object
):
class
Renamer
(
object
):
...
@@ -44,7 +57,7 @@ class Renamer(object):
...
@@ -44,7 +57,7 @@ class Renamer(object):
partition_reference
=
cp_old_name
)
partition_reference
=
cp_old_name
)
broken_new_name
=
'broken-{}'
.
format
(
time
.
strftime
(
"%d-%b_%H:%M:%S"
,
time
.
gmtime
()))
broken_new_name
=
'broken-{}'
.
format
(
time
.
strftime
(
"%d-%b_%H:%M:%S"
,
time
.
gmtime
()))
# XXX how
to print the old name
# XXX how
can we retrieve and log the old name?
log
.
debug
(
"Renaming {}: {}"
.
format
(
broken
.
getId
(),
broken_new_name
))
log
.
debug
(
"Renaming {}: {}"
.
format
(
broken
.
getId
(),
broken_new_name
))
broken
.
rename
(
broken_new_name
)
broken
.
rename
(
broken_new_name
)
...
@@ -69,38 +82,36 @@ class ResilientInstance(object):
...
@@ -69,38 +82,36 @@ class ResilientInstance(object):
def
__init__
(
self
,
comm
,
renamer
,
confpath
):
def
__init__
(
self
,
comm
,
renamer
,
confpath
):
self
.
comm
=
comm
self
.
comm
=
comm
self
.
id
=
0
self
.
participant_
id
=
0
self
.
state
=
'normal'
self
.
state
=
STATE_NORMAL
self
.
halter
=
0
self
.
halter
_id
=
0
self
.
inElection
=
False
self
.
inElection
=
False
self
.
alive
=
True
self
.
alive
=
True
self
.
lastPing
=
time
.
clock
()
self
.
lastPing
=
time
.
clock
()
self
.
mainCanal
=
self
.
comm
.
canal
([
'ping'
,
'halt'
,
'victory'
])
self
.
mainCanal
=
self
.
comm
.
canal
([
MSG_PING
,
MSG_HALT
,
MSG_VICTORY
])
self
.
renamer
=
renamer
self
.
renamer
=
renamer
self
.
okCanal
=
self
.
comm
.
canal
([
'ok'
])
self
.
okCanal
=
self
.
comm
.
canal
([
MSG_OK
])
self
.
confpath
=
confpath
self
.
confpath
=
confpath
self
.
loadConnectionInfo
()
self
.
loadConnectionInfo
()
def
loadConnectionInfo
(
self
):
def
loadConnectionInfo
(
self
):
file
=
open
(
self
.
confpath
,
'r'
)
params
=
open
(
self
.
confpath
,
'r'
).
readlines
()
params
=
file
.
read
().
split
(
'
\
n
'
)
self
.
total_participants
=
len
(
params
[
0
].
split
())
file
.
close
()
self
.
nbComp
=
len
([
x
.
strip
(
"' "
)
for
x
in
params
[
0
].
strip
(
'[],'
).
split
(
','
)])
new_id
=
int
(
params
[
1
])
new_id
=
int
(
params
[
1
])
if
self
.
id
!=
new_id
:
if
self
.
participant_
id
!=
new_id
:
self
.
halter
=
new_id
self
.
halter
_id
=
new_id
self
.
id
=
new_id
self
.
participant_
id
=
new_id
## Needs to be changed to use the master
## Needs to be changed to use the master
def
aliveManagement
(
self
):
def
aliveManagement
(
self
):
while
self
.
alive
:
while
self
.
alive
:
log
.
info
(
'XXX sleeping for %d minutes'
%
SLEEPING_MINS
)
log
.
info
(
'XXX sleeping for %d minutes'
%
SLEEPING_MINS
)
time
.
sleep
(
SLEEPING_MINS
*
60
)
time
.
sleep
(
SLEEPING_MINS
*
60
)
if
self
.
id
==
0
:
if
self
.
participant_
id
==
0
:
continue
continue
self
.
comm
.
send
(
'ping'
,
0
)
self
.
comm
.
send
(
MSG_PING
,
0
)
message
,
sender
=
self
.
okCanal
.
get
()
message
,
sender
=
self
.
okCanal
.
get
()
if
message
:
if
message
:
continue
continue
...
@@ -113,29 +124,29 @@ class ResilientInstance(object):
...
@@ -113,29 +124,29 @@ class ResilientInstance(object):
def
main
(
self
):
def
main
(
self
):
while
self
.
alive
:
while
self
.
alive
:
message
,
sender
=
self
.
mainCanal
.
get
()
message
,
sender
=
self
.
mainCanal
.
get
()
if
message
==
'ping'
:
if
message
==
MSG_PING
:
self
.
comm
.
send
(
'ok'
,
sender
)
self
.
comm
.
send
(
MSG_OK
,
sender
)
elif
message
==
'halt'
:
elif
message
==
MSG_HALT
:
self
.
state
=
'waitingConfirm'
self
.
state
=
STATE_WAITINGCONFIRM
self
.
halter
=
sender
self
.
halter
_id
=
int
(
sender
)
self
.
comm
.
send
(
'ok'
,
sender
)
self
.
comm
.
send
(
MSG_OK
,
sender
)
elif
message
==
'victory'
:
elif
message
==
MSG_VICTORY
:
if
int
(
sender
)
==
int
(
self
.
halter
)
and
self
.
state
==
'waitingConfirm'
:
if
int
(
sender
)
==
self
.
halter_id
and
self
.
state
==
STATE_WAITINGCONFIRM
:
log
.
info
(
'{} thinks {} is the leader'
.
format
(
self
.
id
,
sender
))
log
.
info
(
'{} thinks {} is the leader'
.
format
(
self
.
participant_
id
,
sender
))
self
.
comm
.
send
(
'ok'
,
sender
)
self
.
comm
.
send
(
MSG_OK
,
sender
)
self
.
state
=
'normal'
self
.
state
=
STATE_NORMAL
def
election
(
self
):
def
election
(
self
):
self
.
inElection
=
True
self
.
inElection
=
True
self
.
loadConnectionInfo
()
self
.
loadConnectionInfo
()
#Check if I'm the highest instance alive
#
Check if I'm the highest instance alive
for
higher
in
range
(
self
.
id
+
1
,
self
.
nbComp
):
for
higher
in
range
(
self
.
participant_id
+
1
,
self
.
total_participants
):
self
.
comm
.
send
(
'ping'
,
higher
)
self
.
comm
.
send
(
MSG_PING
,
higher
)
message
,
sender
=
self
.
okCanal
.
get
()
message
,
sender
=
self
.
okCanal
.
get
()
if
message
:
if
message
:
log
.
info
(
'{} is alive ({})'
.
format
(
higher
,
self
.
id
))
log
.
info
(
'{} is alive ({})'
.
format
(
higher
,
self
.
participant_
id
))
self
.
inElection
=
False
self
.
inElection
=
False
return
False
return
False
continue
continue
...
@@ -143,29 +154,29 @@ class ResilientInstance(object):
...
@@ -143,29 +154,29 @@ class ResilientInstance(object):
if
not
self
.
alive
:
if
not
self
.
alive
:
return
False
return
False
#I should be the new coordinator, halt those below me
#
I should be the new coordinator, halt those below me
log
.
info
(
'Should be ME : {}'
.
format
(
self
.
id
))
log
.
info
(
'Should be ME : {}'
.
format
(
self
.
participant_
id
))
self
.
state
=
'election'
self
.
state
=
STATE_ELECTION
self
.
halter
=
self
.
id
self
.
halter
_id
=
self
.
participant_
id
ups
=
[]
ups
=
[]
for
lower
in
range
(
self
.
id
):
for
lower
in
range
(
self
.
participant_
id
):
self
.
comm
.
send
(
'halt'
,
lower
)
self
.
comm
.
send
(
MSG_HALT
,
lower
)
message
,
sender
=
self
.
okCanal
.
get
()
message
,
sender
=
self
.
okCanal
.
get
()
if
message
:
if
message
:
ups
.
append
(
lower
)
ups
.
append
(
lower
)
#Broadcast Victory
#Broadcast Victory
self
.
state
=
'reorganization'
self
.
state
=
STATE_REORGANIZATION
for
up
in
ups
:
for
up
in
ups
:
self
.
comm
.
send
(
'victory'
,
up
)
self
.
comm
.
send
(
MSG_VICTORY
,
up
)
message
,
sender
=
self
.
okCanal
.
get
()
message
,
sender
=
self
.
okCanal
.
get
()
if
message
:
if
message
:
continue
continue
log
.
info
(
'Something is wrong... let
\
'
s start over'
)
log
.
info
(
'Something is wrong... let
\
'
s start over'
)
return
self
.
election
()
return
self
.
election
()
self
.
state
=
'normal'
self
.
state
=
STATE_NORMAL
self
.
active
=
True
self
.
active
=
True
log
.
info
(
'{} Is THE LEADER'
.
format
(
self
.
id
))
log
.
info
(
'{} Is THE LEADER'
.
format
(
self
.
participant_
id
))
self
.
renamer
.
failover
()
self
.
renamer
.
failover
()
...
@@ -204,23 +215,21 @@ class Wrapper(object):
...
@@ -204,23 +215,21 @@ class Wrapper(object):
def
__init__
(
self
,
confpath
,
timeout
=
20
):
def
__init__
(
self
,
confpath
,
timeout
=
20
):
self
.
canals
=
[]
self
.
canals
=
[]
self
.
ips
=
[]
self
.
ips
=
[]
self
.
id
=
0
self
.
participant_
id
=
0
self
.
timeout
=
timeout
self
.
timeout
=
timeout
self
.
confpath
=
confpath
self
.
confpath
=
confpath
self
.
getConnectionInfo
()
self
.
getConnectionInfo
()
self
.
socket
=
None
self
.
socket
=
None
def
getConnectionInfo
(
self
):
def
getConnectionInfo
(
self
):
file
=
open
(
self
.
confpath
,
'r'
)
params
=
open
(
self
.
confpath
,
'r'
).
readlines
()
params
=
file
.
read
().
split
(
'
\
n
'
)
self
.
ips
=
params
[
0
].
split
()
file
.
close
()
self
.
participant_id
=
int
(
params
[
1
])
self
.
ips
=
[
x
.
strip
(
"' "
)
for
x
in
params
[
0
].
strip
(
'[],'
).
split
(
','
)]
self
.
id
=
int
(
params
[
1
])
def
start
(
self
):
def
start
(
self
):
self
.
getConnectionInfo
()
self
.
getConnectionInfo
()
self
.
socket
=
socket
.
socket
(
socket
.
AF_INET6
,
socket
.
SOCK_STREAM
)
self
.
socket
=
socket
.
socket
(
socket
.
AF_INET6
,
socket
.
SOCK_STREAM
)
self
.
socket
.
bind
((
self
.
ips
[
self
.
id
],
BASE_PORT
+
self
.
id
))
self
.
socket
.
bind
((
self
.
ips
[
self
.
participant_id
],
BASE_PORT
+
self
.
participant_
id
))
self
.
socket
.
listen
(
5
)
self
.
socket
.
listen
(
5
)
def
send
(
self
,
message
,
number
):
def
send
(
self
,
message
,
number
):
...
@@ -228,7 +237,7 @@ class Wrapper(object):
...
@@ -228,7 +237,7 @@ class Wrapper(object):
try
:
try
:
s
=
socket
.
socket
(
socket
.
AF_INET6
,
socket
.
SOCK_STREAM
)
s
=
socket
.
socket
(
socket
.
AF_INET6
,
socket
.
SOCK_STREAM
)
s
.
connect
((
self
.
ips
[
number
],
BASE_PORT
+
number
))
s
.
connect
((
self
.
ips
[
number
],
BASE_PORT
+
number
))
s
.
send
(
message
+
(
' {}
\
n
'
.
format
(
self
.
id
)))
s
.
send
(
message
+
(
' {}
\
n
'
.
format
(
self
.
participant_
id
)))
except
(
socket
.
error
,
socket
.
herror
,
socket
.
gaierror
,
socket
.
timeout
):
except
(
socket
.
error
,
socket
.
herror
,
socket
.
gaierror
,
socket
.
timeout
):
pass
pass
finally
:
finally
:
...
@@ -269,7 +278,7 @@ def run(args):
...
@@ -269,7 +278,7 @@ def run(args):
computer
=
ResilientInstance
(
wrapper
,
renamer
=
renamer
,
confpath
=
confpath
)
computer
=
ResilientInstance
(
wrapper
,
renamer
=
renamer
,
confpath
=
confpath
)
# idle waiting for connection infos
# idle waiting for connection infos
while
computer
.
nbComp
<
2
:
while
computer
.
total_participants
<
2
:
computer
.
loadConnectionInfo
()
computer
.
loadConnectionInfo
()
time
.
sleep
(
30
)
time
.
sleep
(
30
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment