Commit bff5c82f authored by Julien Muchembled's avatar Julien Muchembled

Add SSL support

parent 107ca7df
......@@ -40,23 +40,12 @@ There's also a tool to convert back to FileStorage.
See also http://www.neoppod.org/links for more detailed information about
features related to scalability.
Disclaimer
==========
In addition of the disclaimer contained in the licence this code is
released under, please consider the following.
NEO does not implement any authentication mechanism between its nodes, and
does not encrypt data exchanged between nodes either.
If you want to protect your cluster from malicious nodes, or your data from
being snooped, please consider encrypted tunelling (such as openvpn).
Requirements
============
- Linux 2.6 or later
- Python 2.7.x
- Python 2.7.x (2.7.9 or later for SSL support)
- For storage nodes using MySQL backend:
......@@ -166,6 +155,24 @@ Packs are currently not replicated, which means packing should always be done
up to a TID that is already fully replicated, so that the backup cluster has a
full history (and not random holes).
SSL support
-----------
In addition to any external solution like OpenVPN, NEO has builtin SSL support
to authenticate and encrypt communications between nodes.
All commands and configuration files have options to specify a CA certificate,
the node certificate and the node private key. A certificate can be shared
by several nodes.
NEO always uses the latest SSL protocol supported by the Python interpreter,
without fallback to older versions. A "SSL: WRONG_VERSION_NUMBER" error means
that a node runs in an older environment (Python + OpenSSL) than others.
Note also that you can't mix non-SSL nodes and SSL nodes, even between a
upstream cluster and a backup one. In doing so, connections can get stuck,
or fail with malformed packets or SSL handshake errors.
Deployment
==========
......
......@@ -168,6 +168,3 @@
- Investigate delta compression for stored data
Idea would be to have a few most recent revisions being stored fully, and
older revision delta-compressed, in order to save space.
- Consider using multicast for cluster-wide notifications. (BANDWITH)
Currently, multi-receivers notifications are sent in unicast to each
receiver. Multicast should be used.
......@@ -43,6 +43,13 @@ partitions: 12
;upstream_cluster: test2
;upstream_masters: 127.0.0.1:30000
# The 3 following options must be specified to enabled SSL.
# CA should be the same for all nodes, and it can be the concatenation of
# several CAs and CRLs.
;ca = ~/etc/ca.crt
;cert = ~/etc/cert.crt
;key = ~/etc/cert.key
# Individual nodes parameters
# Some parameters makes no sense to be defined in [DEFAULT] section.
# They are:
......@@ -60,6 +67,10 @@ bind: 127.0.0.1:9999
# common section.
;logfile: ~/log/admin.log
# Nodes can have their own certicates.
;cert = admin.crt
;key = admin.key
# Master nodes
[master]
bind: 127.0.0.1:10000
......
......@@ -30,7 +30,8 @@ class Application(BaseApplication):
"""The storage node application."""
def __init__(self, config):
super(Application, self).__init__(config.getDynamicMasterList())
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
for address in config.getMasters():
self.nm.createMaster(address=address)
......
......@@ -65,7 +65,9 @@ class Storage(BaseStorage.BaseStorage,
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None:
_app = Application(master_nodes, name, compress=compress, **kw)
ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')]
_app = Application(master_nodes, name, compress=compress,
ssl=ssl if any(ssl) else None, **kw)
self.app = _app
@property
......
......@@ -39,5 +39,20 @@
be added/removed without requiring a config change each time.
</description>
</key>
<key name="ca" datatype="existing-file">
<description>
Certificate authority in PEM format.
</description>
</key>
<key name="cert" datatype="existing-file">
<description>
Certificate in PEM format.
</description>
</key>
<key name="key" datatype="existing-file">
<description>
Private key in PEM format.
</description>
</key>
</sectiontype>
</component>
......@@ -14,13 +14,38 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import logging
from .event import EventManager
from .node import NodeManager
class BaseApplication(object):
def __init__(self, dynamic_master_list=None):
ssl = None
def __init__(self, ssl=None, dynamic_master_list=None):
if ssl:
if not all(ssl):
raise ValueError("To enable encryption, 3 files must be"
" provided: the CA certificate, and the certificate"
" of this node with its private key.")
ca, cert, key = ssl
import ssl
version, version_name = max((getattr(ssl, k), k)
for k in dir(ssl) if k.startswith("PROTOCOL_TLSv"))
self.ssl = context = ssl.SSLContext(version)
context.options |= (0
| ssl.OP_CIPHER_SERVER_PREFERENCE
| ssl.OP_NO_COMPRESSION
)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(ca)
context.load_cert_chain(cert, key)
context.verify_flags |= ssl.VERIFY_X509_STRICT | (
context.cert_store_stats()['crl'] and ssl.VERIFY_CRL_CHECK_LEAF)
logging.info("TLS %s enabled for %s",
float(version_name[13:].replace("_", ".")), self)
self._handlers = {}
self.em = EventManager()
self.nm = NodeManager(dynamic_master_list)
......
......@@ -25,6 +25,9 @@ def getOptionParser():
parser = OptionParser()
parser.add_option('-l', '--logfile',
help='log debugging information to specified SQLite DB')
parser.add_option('--ca', help='certificate authority in PEM format')
parser.add_option('--cert', help='certificate in PEM format')
parser.add_option('--key', help='private key in PEM format')
return parser
def getServerOptionParser():
......@@ -80,6 +83,11 @@ class ConfigurationManager(object):
def getLogfile(self):
return self.__getPath('logfile', True)
def getSSL(self):
r = [self.__getPath(key, True) for key in ('ca', 'cert', 'key')]
if any(r):
return r
def getMasters(self):
""" Get the master node list except itself """
masters = self.__get('masters')
......
......@@ -299,6 +299,7 @@ class ListeningConnection(BaseConnection):
"""A listen connection."""
def __init__(self, app, handler, addr):
self._ssl = app.ssl
logging.debug('listening to %s:%d', *addr)
connector = self.ConnectorClass(addr)
BaseConnection.__init__(self, app.em, handler, connector, addr)
......@@ -310,6 +311,9 @@ class ListeningConnection(BaseConnection):
logging.debug('accepted a connection from %s:%d', *addr)
handler = self.getHandler()
new_conn = ServerConnection(self.em, handler, connector, addr)
if self._ssl:
connector.ssl(self._ssl)
self.em.addWriter(new_conn)
handler.connectionAccepted(new_conn)
def getAddress(self):
......@@ -580,6 +584,7 @@ class ClientConnection(Connection):
client = True
def __init__(self, app, handler, node):
self._ssl = app.ssl
addr = node.getAddress()
connector = self.ConnectorClass(addr)
Connection.__init__(self, app.em, handler, connector, addr)
......@@ -619,6 +624,8 @@ class ClientConnection(Connection):
self.writable()
def _connectionCompleted(self):
if self._ssl:
self.connector.ssl(self._ssl)
self.writable = self.lockWrapper(super(ClientConnection, self).writable)
self.connecting = False
self.updateTimeout(time())
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
import ssl
import errno
from time import time
from . import logging
......@@ -109,6 +110,14 @@ class SocketConnector(object):
self.socket.close()
self._error('listen', e)
def ssl(self, ssl):
self.socket = ssl.wrap_socket(self.socket,
server_side=self.is_server,
do_handshake_on_connect=False,
suppress_ragged_eofs=False)
self.__class__ = self.SSLHandshakeConnectorClass
self.queued or self.queued.append('')
def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
......@@ -212,6 +221,62 @@ class SocketConnectorIPv6(SocketConnector):
registerConnectorHandler(SocketConnectorIPv4)
registerConnectorHandler(SocketConnectorIPv6)
def overlay_connector_class(cls):
name = cls.__name__[1:]
alias = name + 'ConnectorClass'
for base in connector_registry.itervalues():
setattr(base, alias, type(name + base.__name__,
cls.__bases__ + (base,), cls.__dict__))
return cls
@overlay_connector_class
class _SSL:
def _error(self, op, exc):
if isinstance(exc, ssl.SSLError):
logging.debug("%s failed for %s: %s", op, self, exc)
raise ConnectorException
SocketConnector._error(self, op, exc)
def receive(self, read_buf):
try:
while 1:
read_buf.append(self.socket.recv(4096))
except ssl.SSLWantReadError:
pass
except socket.error, e:
self._error('recv', e)
@overlay_connector_class
class _SSLHandshake(_SSL):
def receive(self, read_buf=None):
# ???Writer | send | receive
# -----------+--------+--------
# want read | remove | -
# want write | - | add
try:
self.socket.do_handshake()
except ssl.SSLWantReadError:
return read_buf is None
except ssl.SSLWantWriteError:
return read_buf is not None
except socket.error, e:
self._error('SSL handshake', e)
if not self.queued[0]:
del self.queued[0]
self.__class__ = self.SSLConnectorClass
cipher, proto, bits = self.socket.cipher()
logging.debug("SSL handshake done for %s: %s %s", self, cipher, bits)
if read_buf is None:
return self.send()
self.receive(read_buf)
return self.queued
send = receive
class ConnectorException(Exception):
pass
......
......@@ -47,7 +47,8 @@ class Application(BaseApplication):
uuid = None
def __init__(self, config):
super(Application, self).__init__(config.getDynamicMasterList())
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster()
......
......@@ -74,6 +74,7 @@ class BackupApplication(object):
self.nm.createMaster(address=master_address)
em = property(lambda self: self.app.em)
ssl = property(lambda self: self.app.ssl)
def close(self):
self.nm.close()
......
......@@ -44,8 +44,8 @@ uuid_int = (lambda ns: lambda uuid:
)({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
class TerminalNeoCTL(object):
def __init__(self, address):
self.neoctl = NeoCTL(address)
def __init__(self, *args, **kw):
self.neoctl = NeoCTL(*args, **kw)
def __del__(self):
self.neoctl.close()
......@@ -231,8 +231,8 @@ class TerminalNeoCTL(object):
class Application(object):
"""The storage node application."""
def __init__(self, address):
self.neoctl = TerminalNeoCTL(address)
def __init__(self, *args, **kw):
self.neoctl = TerminalNeoCTL(*args, **kw)
def execute(self, args):
"""Execute the command given."""
......
......@@ -27,8 +27,8 @@ class NeoCTL(BaseApplication):
connection = None
connected = False
def __init__(self, address):
super(NeoCTL, self).__init__()
def __init__(self, address, **kw):
super(NeoCTL, self).__init__(**kw)
self.server = self.nm.createAdmin(address=address)
self.handler = CommandEventHandler(self)
self.response_queue = []
......
......@@ -42,5 +42,6 @@ def main(args=None):
logging.setup(options.logfile)
from neo.neoctl.app import Application
print Application(address).execute(args)
ssl = options.ca, options.cert, options.key
print Application(address, ssl=ssl if any(ssl) else None).execute(args)
......@@ -66,6 +66,7 @@ UNIT_TEST_MODULES = [
'neo.tests.threaded.test',
'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication',
'neo.tests.threaded.testSSL',
]
FUNC_TEST_MODULES = [
......
......@@ -41,7 +41,8 @@ class Application(BaseApplication):
"""The storage node application."""
def __init__(self, config):
super(Application, self).__init__(config.getDynamicMasterList())
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
# set the cluster name
self.name = config.getCluster()
......
......@@ -68,6 +68,9 @@ IP_VERSION_FORMAT_DICT = {
ADDRESS_TYPE = socket.AF_INET
SSL = os.path.dirname(__file__) + os.sep
SSL = SSL + "ca.crt", SSL + "node.crt", SSL + "node.key"
logging.default_root_handler.handle = lambda record: None
logging.backlog(None, 1<<20)
......
-----BEGIN CERTIFICATE-----
MIIC7TCCAdWgAwIBAgIJAL8e44sA7PDMMA0GCSqGSIb3DQEBCwUAMA0xCzAJBgNV
BAMMAkNBMB4XDTE1MDkzMDEzNTQzMFoXDTIxMDMyMjEzNTQzMFowDTELMAkGA1UE
AwwCQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCgT7DVKM4ViQt3
B0oJC4RGi10dNfpNZZpgA5iC2UJ1u6AqqCf0PCQkrmuIzW3l1TenlOiLNdVASkkT
wf1lekIgg4tR8/22oGTAnfY6R9r1C6jAMV72v1sffz8D6qfkMPzKchJt55zywdhm
KscUsMGzXPGIeKrG20m83dSIO4RmCmq/f4BcuWJu6Kkq4n9Wc2IsvpKk+lqEUxI/
QoqdT6OvMXooGs3t892uvKDu++muBj2Y/yyaXt1tCCjDFsRMLWl3Skks+4PeMCZ4
wugyXEBk3d5Yzdv5NsFzFBjAuRCGxJXEOEcfHj4Xj9qTCErZ1jKzgnuxJCtgdqRC
r4beX1U3AgMBAAGjUDBOMB0GA1UdDgQWBBSFY/jKvo0iSTEzzOIcZZUZCT8JfTAf
BgNVHSMEGDAWgBSFY/jKvo0iSTEzzOIcZZUZCT8JfTAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4IBAQAB0LKDAuhodpyNVwEE9Yl+Q/IiPEPCaix6URJnRn1O
gQnXuZLo1xtJh6wJh1faG1/qNCFMxWEJ+0VkJ7r6v38cNXfYG9OcmD0S6YnNjSuO
VliAtqVVtj8MppJ4vMatLrNi4cvyYucebtNyBCzSIAi+6bkkHeaVgi1EtxXvq+AS
iZp3gl84oXv/gV7Bz4SXmVpFJnhsDMoQZG2KAULAgfZ2Am2I+ffG90cD/oEnS/3O
k3btqTvgIO8MWt8PY3sUOhJEoJYKnC9DppmhOhUTn4zzIIDSluKEOBHZiFb9AcmF
PvzL+8xiORCdUe1d6ANQQlUd0MM810BXZFYEXFbgKg8o
-----END CERTIFICATE-----
#!/bin/sh -e
DAYS=2000
at_exit() { rm -f "$CAkey"; }
trap at_exit 0
CAkey=`mktemp`
openssl req -new -x509 -nodes -keyout "$CAkey" -out ca.crt -subj /CN=CA -days $DAYS
openssl req -new -nodes -keyout node.key -subj /CN=node |
openssl x509 -CA ca.crt -CAkey "$CAkey" -req -out node.crt -set_serial 1 -days $DAYS
......@@ -38,7 +38,7 @@ from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
UUID_NAMESPACES
from neo.lib.util import dump
from .. import cluster, DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory, SSL
from neo.client.Storage import Storage
from neo.storage.database import buildDatabaseManager
......@@ -223,6 +223,8 @@ class NEOProcess(object):
class NEOCluster(object):
SSL = None
def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
db_user=DB_USER, db_password='', name=None,
cleanup_on_delete=False, temp_dir=None, clear_databases=True,
......@@ -289,7 +291,7 @@ class NEOCluster(object):
self._newProcess(NodeTypes.STORAGE, logger and 'storage_%u' % i,
0, adapter=adapter, database=self.db_template(db))
# create neoctl
self.neoctl = NeoCTL((self.local_ip, admin_port))
self.neoctl = NeoCTL((self.local_ip, admin_port), ssl=self.SSL)
def _newProcess(self, node_type, logfile=None, port=None, **kw):
self.uuid_dict[node_type] = uuid = 1 + self.uuid_dict.get(node_type, 0)
......@@ -301,6 +303,8 @@ class NEOCluster(object):
kw['logfile'] = os.path.join(self.temp_dir, logfile + '.log')
if port is not None:
kw['bind'] = '%s:%u' % (buildUrlFromString(self.local_ip), port)
if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL
self.process_dict.setdefault(node_type, []).append(
NEOProcess(command_dict[node_type], uuid, kw))
......@@ -402,6 +406,8 @@ class NEOCluster(object):
def getZODBStorage(self, **kw):
master_nodes = self.master_nodes.replace('/', ' ')
if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL
result = Storage(
master_nodes=master_nodes,
name=self.cluster_name,
......@@ -633,6 +639,15 @@ class NEOCluster(object):
class NEOFunctionalTest(NeoTestBase):
def setUp(self):
if random.randint(0, 1):
NEOCluster.SSL = SSL
super(NEOFunctionalTest, self).setUp()
def _tearDown(self, success):
super(NEOFunctionalTest, self)._tearDown(success)
NEOCluster.SSL = None
def setupLog(self):
logging.setup(os.path.join(self.getTempDirectory(), 'test.log'))
......
-----BEGIN CERTIFICATE-----
MIICkDCCAXgCAQEwDQYJKoZIhvcNAQELBQAwDTELMAkGA1UEAwwCQ0EwHhcNMTUw
OTMwMTM1NDMwWhcNMjEwMzIyMTM1NDMwWjAPMQ0wCwYDVQQDDARub2RlMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuJ+ClJyjhJOJdGUyqHn79opMLP3m
1g27uBWKT+OGd4FcreVoRDxPVuxZxMtDCZcBfUHVvOoSlS06khwSxViEe1hxwHRa
n2qMWlwvaWeNY0CFH5V+DI4XSNojgny85Lb5jB69FuPcrHnwxLk2OFntrXEeNbEa
d7QSoNbPajbJIp5BS/WR9iu5Z5JYdumLWjTvOU+eZc4iA6Wa2kdDtbqkGi4wOJ1L
/ggATL+p+QFcubVptztPT8vq7gvDdGJgXLJ2lPHV0V/sdJB1FB4mSJEDDjSm2Hpp
qPVJSO1GrAy5Ld+0SnXIZZejhIUJIumocY08r+vzDSQ/8NnqXR4Odz1TWwIDAQAB
MA0GCSqGSIb3DQEBCwUAA4IBAQBlYkkInDDDcgnNRdUmzxwejs1PmEehZ3H5FkMp
TsmpoVC+oqM+QywMu8UJRtCjXnnJdAUbVYuZ1Tjm7qvFIhN+5OlIVxJ+8WcmZPSe
lj0N7Dv2nE1diTDS+qPZVPZ0demo1LafRmPomPWiM/CQRlMPxXnimuiYOROhWGn6
jsyoOwquMkAc6Ub++l4OCxLAP0eTgJFkivmqpaYZXG4o7zFvcQ3rQ66rQrMl69sR
8/MVqbT5Sq1CEJbepP4GaFfa5l3CVy7WH2MhCV1/9mNwcXafkTgx3q2HsPon4Dze
kNwiguNAM4L/j4dbIwz+CIVWcgpBCrfv2JYu+jGlRpxIDeWR
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQC4n4KUnKOEk4l0
ZTKoefv2ikws/ebWDbu4FYpP44Z3gVyt5WhEPE9W7FnEy0MJlwF9QdW86hKVLTqS
HBLFWIR7WHHAdFqfaoxaXC9pZ41jQIUflX4MjhdI2iOCfLzktvmMHr0W49ysefDE
uTY4We2tcR41sRp3tBKg1s9qNskinkFL9ZH2K7lnklh26YtaNO85T55lziIDpZra
R0O1uqQaLjA4nUv+CABMv6n5AVy5tWm3O09Py+ruC8N0YmBcsnaU8dXRX+x0kHUU
HiZIkQMONKbYemmo9UlI7UasDLkt37RKdchll6OEhQki6ahxjTyv6/MNJD/w2epd
Hg53PVNbAgMBAAECggEACCWh0YUIAjWwfx5oGd8oKzz3F5Usto1slzb8H4Je0K84
s8PH0hsHNULAw1pk3ut2+VwKXofFwid4yjHK8kJxti+09VUtGpPNFb+zp+cV6idS
uI4UPeGXTXOy1XNDsqQQZwqENZpghovrAANcTShKwLiZKK1kCZw8pjRUes9oGMrn
zLXvNPgpJbRUPPYFa34jJyaJtiBEgIBWBmAeEk4ccTYFk4Kon93Ljmobm/6H8WZE
bxomzJjFVMSjW/NmMWnalX5H7GegJVOtkOGvHf25dHBdbdU772ReY4PRZJEGwn8E
QcZjzaKkLB34IlbjWjH0nnanNa6DyjhulaqkHjAtIQKBgQDf3zPefVHss4FrajU8
Nvch6mpayOeJ3RzOAiMzmfI/O3zcJqqrlOOKQjvP5feAoLOJTcvu4hAMfUix/7LR
Qag5nALIllox6QnoMWmvKUbcekXunf/PUVFQ7SC3RJKtqh5oq5wRGJWcDPP6/Okk
t+64NeQOT3lPs2LNyy7KpB5YlQKBgQDTHln2fYKCLnJ66+3QPjDSJrwdUD6HFiwp
X5BfNUKpnxged+buFbgp5TB2vu6Z5A3AxwmWXBFiuvHLpl3uSAEIRbFQ+elagUXa
BoiPOnq1b00X/Vdkd3sX4czVemIG9DIA1uo31Gd9fFWMLHZGBOwqgNKiJOgQZXTq
dX/tAGrQLwKBgGt59aXf1j/j4cMWxx30aWq/5nVVNEtsetKwFgRE6RbQUV5DtfYP
0bljmOFzTwJSpD7LuZcisn+8efTyg/+QHNojevafsAd8EISHjGxKTbm1ffNTqSb3
rClE3kr9wclb/aNUl+VhPxoe4dbiKm+1WgbX4He6Ucwgm9OeswUYC3WNAoGAK1i/
/+wlL7V5q+NlIKykOYHafepL7FCRIK2OZv34gfs4aIkV0SyEc5WrLbZmJxK8ACjd
vxGIQE1B+B5gitwd2iT1Ezs8vmhsfyd4QnAvYbFIkvRhTS97BpxGAk7ucZ8R5To7
PNtPpGQy7GT0o8u+8bshhEkvnK44Iyuc6Hx9ceECgYAjM1jEreHzi96v8pmneJja
VIoNNu2PoTS1nTWN+/6//B62GxpKM5UCYZDRDC8urPXb6tXSC29I104yrNJQEGXz
/acVGnA6GJV639pXz++hf2NMHKandlKY+Cz2euZT/qUU80W+B5korsubIS6ATeee
+QU2yqNaC1ZvTO2L1OLYRg==
-----END PRIVATE KEY-----
......@@ -56,6 +56,7 @@ class ConnectionTests(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
self.app = Mock({'__repr__': 'Fake App'})
self.app.ssl = None
self.em = self.app.em = Mock({'__repr__': 'Fake Em'})
self.handler = Mock({'__repr__': 'Fake Handler'})
self.address = ("127.0.0.7", 93413)
......
......@@ -502,6 +502,8 @@ class ConnectionFilter(object):
class NEOCluster(object):
SSL = None
def __init__(orig, self): # temporary definition for SimpleQueue patch
orig(self)
lock = self._lock
......@@ -563,7 +565,8 @@ class NEOCluster(object):
self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
weak_self = weakref.proxy(self)
kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter,
getPartitions=partitions, getReset=clear_databases)
getPartitions=partitions, getReset=clear_databases,
getSSL=self.SSL)
if upstream is not None:
self.upstream = weakref.proxy(upstream)
kw.update(getUpstreamCluster=upstream.name,
......@@ -600,7 +603,7 @@ class NEOCluster(object):
self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
for x in db_list]
self.admin_list = [AdminApplication(**kw)]
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
......@@ -634,7 +637,7 @@ class NEOCluster(object):
for node in getattr(self, node_type + '_list'):
node.resetNode(**kw)
self.neoctl.close()
self.neoctl = NeoCTL(self.admin.getVirtualAddress())
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def start(self, storage_list=None, fast_startup=False):
self._patch()
......@@ -656,10 +659,13 @@ class NEOCluster(object):
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list)
def newClient(self):
return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL)
@cached_property
def client(self):
client = ClientApplication(name=self.name,
master_nodes=self.master_nodes, compress=self.compress)
client = self.newClient()
# Make sure client won't be reused after it was closed.
def close():
client = self.client
......
......@@ -29,7 +29,7 @@ from neo.lib.connection import ConnectionClosed, MTClientConnection
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID
from .. import expectedFailure, _UnexpectedSuccess, Patch
from . import ClientApplication, NEOCluster, NEOThreadedTest
from . import NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum
from neo.client.exception import NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......@@ -601,8 +601,7 @@ class Test(NEOThreadedTest):
# (at this time, we still have x=0 and y=1)
t2, c2 = cluster.getTransaction()
# Copy y to x using a different Master-Client connection
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client = cluster.newClient()
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn)
......@@ -715,8 +714,7 @@ class Test(NEOThreadedTest):
self.tic()
# modify x with another client
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client = cluster.newClient()
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn)
......@@ -804,8 +802,7 @@ class Test(NEOThreadedTest):
with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayNotifyInformation)
cluster.client.master_conn.close()
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client = cluster.newClient()
p = Patch(client.storage_bootstrap_handler, notReady=notReady)
try:
p.apply()
......
#
# Copyright (C) 2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from .. import SSL
from . import NEOCluster, test, testReplication
class SSLMixin:
@classmethod
def setUpClass(cls):
NEOCluster.SSL = SSL
@classmethod
def tearDownClass(cls):
NEOCluster.SSL = None
class SSLTests(SSLMixin, test.Test):
# exclude expected failures
testDeadlockAvoidance = testStorageFailureDuringTpcFinish = None
class SSLReplicationTests(SSLMixin, testReplication.ReplicationTests):
# do not repeat slowest tests with SSL
testBackupNodeLost = testBackupNormalCase = None
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment