Commit a48d51c2 authored by Kirill Smelkov's avatar Kirill Smelkov

Sync NEO/py

Sync with current NEO in Python implementation as the first step.

We'll be using some common bits and in particular on-the-wire protocol
must be the same and for py/go interoperability testing we'll also need
python parts.
parents 1617cecf 875fc1b9
[run]
branch = True
source = neo
omit =
neo/debug.py
neo/scripts/runner.py
[report]
exclude_lines =
pragma: no cover
if __name__ == .__main__.:
*.pyc
*.pyo
*.swp
*~
/.coverage
/build/
/dist/
/htmlcov/
/neo/tests/mock.py
/neoppod.egg-info/
Although NEO is considered ready for production use in most cases, there are
a few bugs to know because they concern basic features of ZODB (marked with Z),
or promised features of NEO (marked with N).
All the listed bugs will be fixed with high priority.
(N) A backup cell may be wrongly marked as corrupted while checking replicas
----------------------------------------------------------------------------
This happens in the following conditions:
1. a backup cluster starts to check replicas whereas a cell is outdated
2. this cell becomes updated, but only up to a tid smaller than the max tid
to check (this can't happen for a non-backup cluster)
3. the cluster actually starts to check the related partition
4. the cell is checked completely before it could replicate up to the max tid
to check
Workaround: make sure all cells are up-to-date before checking replicas.
Found by running testBackupNodeLost many times.
This diff is collapsed.
This diff is collapsed.
graft tools
include neo.conf CHANGELOG.rst TODO TESTS.txt ZODB3.patch
NEO is a distributed, redundant and scalable implementation of ZODB API.
NEO stands for Nexedi Enterprise Object.
Overview
========
A NEO cluster is composed of the following types of nodes:
- "master" nodes (mandatory, 1 or more)
Takes care of transactionality. Only one master node is really active
(the active master node is called "primary master") at any given time,
extra masters are spares (they are called "secondary masters").
- "storage" nodes (mandatory, 1 or more)
Stores data, preserving history. All available storage nodes are in use
simultaneously. This offers redundancy and data distribution.
Available backends: MySQL (InnoDB, RocksDB or TokuDB), SQLite
- "admin" nodes (mandatory for startup, optional after)
Accepts commands from neoctl tool and transmits them to the
primary master, and monitors cluster state.
- "client" nodes
Well... Something needing to store/load data in a NEO cluster.
ZODB API is fully implemented except:
- pack: only old revisions of objects are removed (it should be possible
to use `zc.zodbdgc <https://pypi.python.org/pypi/zc.zodbdgc>`_
for garbage collection)
- blobs: not implemented (not considered yet)
Any ZODB like FileStorage can be converted to NEO instantaneously,
which means the database is operational before all data are imported.
There's also a tool to convert back to FileStorage.
For more detailed information about features related to scalability,
see the `Architecture and Characteristics` section of https://neo.nexedi.com/.
Requirements
============
- Linux 2.6 or later
- Python 2.7.x (2.7.9 or later for SSL support)
- For storage nodes using MySQL backend:
- MySQLdb: https://github.com/PyMySQL/mysqlclient-python
- For client nodes: ZODB 3.10.x or later
Installation
============
a. NEO can be installed like any other egg (see setup.py). Or you can simply
make `neo` directory available for Python to import (for example, by
adding its container directory to the PYTHONPATH environment variable).
b. Write a neo.conf file like the example provided. If you use MySQL,
you'll also need create 1 database per storage node.
c. Start all required nodes::
$ neomaster -f neo.conf
$ neostorage -f neo.conf -s storage1
$ neostorage -f neo.conf -s storage2
$ neoadmin -f neo.conf
d. Tell the cluster to initialize storage nodes::
$ neoctl -a <admin> start
e. Clients can connect when the cluster is in RUNNING state::
$ neoctl -a <admin> print cluster
RUNNING
f. See `importer.conf` file to import an existing database,
or `neoctl` command for more administrative tasks.
Alternatively, you can use `neosimple` command to quickly setup a cluster for
testing.
How to use
==========
First make sure Python can import 'neo.client' package.
In zope
-------
a. Edit your zope.conf, add a neo import and edit the `zodb_db` section by
replacing its filestorage subsection by a NEOStorage one.
It should look like::
%import neo.client
<zodb_db main>
<NEOStorage>
master_nodes 127.0.0.1:10000
name <cluster name>
</NEOStorage>
mount-point /
</zodb_db>
b. Start zope
In a Python script
------------------
Just create the storage object and play with it::
from neo.client.Storage import Storage
s = Storage(master_nodes="127.0.0.1:10010", name="main")
...
"name" and "master_nodes" parameters have the same meaning as in
configuration file.
Shutting down
-------------
Before shutting down NEO, all clients like Zope instances should be stopped,
so that cluster become idle. This is required for multi-DB setups, to prevent
critical failures in second phase of TPC.
A cluster (i.e. masters+storages+admin) can be stopped gracefully by putting it
in STOPPING state using neoctl::
neoctl -a <admin> set cluster STOPPING
This can also be done manually, which helps if your cluster is in bad state:
- Stop all master nodes first with a SIGINT or SIGTERM, so that storage nodes
don't become in OUT_OF_DATE state.
- Next stop remaining nodes with a SIGINT or SIGTERM.
Master-slave asynchronous replication
-------------------------------------
This is the recommanded way to backup a NEO cluster.
Once a cluster with appropriate `upstream_cluster` & `upstream_masters`
configuration is started, you can switch it into backup mode
using::
neoctl -a <admin> set cluster STARTING_BACKUP
It remembers it is in such mode when it is stopped, and it can be put back into
normal mode (RUNNING) by setting it into STOPPING_BACKUP state.
Packs are currently not replicated, which means packing should always be done
up to a TID that is already fully replicated, so that the backup cluster has a
full history (and not random holes).
SSL support
-----------
In addition to any external solution like OpenVPN, NEO has builtin SSL support
to authenticate and encrypt communications between nodes.
All commands and configuration files have options to specify a CA certificate,
the node certificate and the node private key. A certificate can be shared
by several nodes.
NEO always uses the latest SSL protocol supported by the Python interpreter,
without fallback to older versions. A "SSL: WRONG_VERSION_NUMBER" error means
that a node runs in an older environment (Python + OpenSSL) than others.
Note also that you can't mix non-SSL nodes and SSL nodes, even between a
upstream cluster and a backup one. In doing so, connections can get stuck,
or fail with malformed packets or SSL handshake errors.
Deployment
==========
NEO has no built-in deployment features such as process daemonization. We use
`supervisor <http://supervisord.org/>`_ with configuration like below::
[group:neo]
programs=master_01,storage_01,admin
[program:storage_01]
priority=10
command=neostorage -s storage_01 -f /neo/neo.conf
[program:master_01]
priority=20
command=neomaster -s master_01 -f /neo/neo.conf
[program:admin]
priority=20
command=neoadmin -s admin -f /neo/neo.conf
Developers
==========
Developers interested in NEO may refer to
`NEO Web site <https://neo.nexedi.com/>`_ and subscribe to following mailing
lists:
- `neo-users <https://mail.tiolive.com/mailman/listinfo/neo-users>`_:
users discussion
- `neo-dev <https://mail.tiolive.com/mailman/listinfo/neo-dev>`_:
developers discussion
Automated test results are published at
https://www.erp5.com/quality/integration/P-ERP5.Com.Unit%20Tests/Base_viewListMode?proxy_form_id=WebSection_viewERP5UnitTestForm&proxy_field_id=listbox&proxy_field_selection_name=WebSection_viewERP5UnitTestForm_listbox_selection&reset=1&listbox_title=NEO-%25
Commercial Support
==================
Nexedi provides commercial support for NEO: https://www.nexedi.com/
Documentation
- Clarify the use of each error codes:
- NOT_READY removed (connection kept opened until ready)
- Split PROTOCOL_ERROR (BAD IDENTIFICATION, ...)
- Add docstrings (think of doctests)
Code
Code changes often impact more than just one node. They are categorised by
node where the most important changes are needed.
General
- Review XXX/TODO code tags (CODE)
- When all cells are OUT_OF_DATE in backup mode, the one with most data
could become UP_TO_DATE with appropriate backup_tid, so that the cluster
stays operational. (FEATURE)
- Finish renaming UUID into NID everywhere (CODE)
- Delayed connection acceptation even when a storage node is not ready ?
Currently, any node that connects too early to another that is busy for
some reasons is immediately rejected with the 'not ready' error code.
This is mainly the case for :
- Client rejected before the cluster is operational
- Empty storages rejected during recovery process
- Implement transaction garbage collection API (FEATURE)
NEO packing implementation does not update transaction metadata when
deleting object revisions. This inconsistency must be made possible to
clean up from a client application, much in the same way garbage
collection part of packing is done.
- Factorise node initialisation for admin, client and storage (CODE)
The same code to ask/receive node list and partition table exists in too
many places.
- Clarify handler methods to call when a connection is accepted from a
listening connection and when remote node is identified
(cf. neo/lib/bootstrap.py).
- Review PENDING/SHUTDOWN states, don't use notifyNodeInformation()
to do a state-switch, use a exception-based mechanism ? (CODE)
- Review handler split (CODE)
The current handler split is the result of small incremental changes. A
global review is required to make them square.
- Review transactional isolation of various methods
Some methods might not implement proper transaction isolation when they
should. An example is object history (undoLog), which can see data
committed by future transactions.
- Add a 'devid' storage configuration so that master do not distribute
replicated partitions on storages with same 'devid'.
Storage
- Use libmysqld instead of a stand-alone MySQL server.
- In backup mode, 2 simultaneous replication should be possible so that:
- outdated cells does not block backup for too long time
- constantly modified partitions does not prevent outdated cells to
replicate
Current behaviour is undefined and the above 2 scenarios may happen.
- Create a specialized PartitionTable that know the database and replicator
to remove duplicates and remove logic from handlers (CODE)
- Make listening address and port optional, and if they are not provided
listen on all interfaces on any available port.
- Make replication speed configurable (HIGH AVAILABILITY)
In its current implementation, replication runs at lowest priority, to
not degrade performance for client nodes. But when there's only 1 storage
left for a partition, it may be wanted to guarantee a minimum speed to
avoid complete data loss if another failure happens too early.
- Find a way not to always start replication from the beginning. Currently,
a temporarily down nodes can't replicate from where it was interrupted,
which is an issue on big databases. (SPEED)
- Pack segmentation & throttling (HIGH AVAILABILITY)
In its current implementation, pack runs in one call on all storage nodes
at the same time, which locks down the whole cluster. This task should
be split in chunks and processed in "background" on storage nodes.
Packing throttling should probably be at the lowest possible priority
(below interactive use and below replication).
- Verify data checksum on reception (FUNCTIONALITY)
In current implementation, client generates a checksum before storing,
which is only checked upon load. This doesn't prevent from storing
altered data, which misses the point of having a checksum, and creates
weird decisions (ex: if checksum verification fails on load, what should
be done ? hope to find a storage with valid checksum ? assume that data
is correct in storage but was altered when it travelled through network
as we loaded it ?).
- Check replicas: (HIGH AVAILABILITY)
- Automatically tell corrupted cells to fix their data when a good source
is known.
- Add an option to also check all rows of trans/obj/data, instead of only
keys (trans.tid & obj.{tid,oid}).
Master
- Implement back-channel for invalidations in read-only mode,
so that clients of backup clusters are notified of new data.
- Master node data redundancy (HIGH AVAILABILITY)
Secondary master nodes should replicate primary master data (ie, primary
master should inform them of such changes).
This data takes too long to extract from storage nodes, and losing it
increases the risk of starting from underestimated values.
This risk is (currently) unavoidable when all nodes stop running, but this
case must be avoided.
- If the cluster can't start automatically because the last partition table
is not operational, allow the user to select an older operational one,
and truncate the DB.
- Optimize operational status check by recording which rows are ready
instead of parsing the whole partition table. (SPEED)
Client
- Race conditions on the partition table ?
(update by the poll thread vs. access by other threads)
- Merge Application into Storage (SPEED)
- Optimize cache.py by rewriting it either in C or Cython (LOAD LATENCY)
- Use generic bootstrap module (CODE)
Admin
- Make admin node able to monitor multiple clusters simultaneously
- Send notifications (ie: mail) when a storage or master node is lost
- Add ctl command to list last transactions, like fstail for FileStorage.
Tests
- Split neo/tests/threaded/test.py
- Use another mock library: Python 3.3+ has unittest.mock, which is
available for earlier versions at https://pypi.python.org/pypi/mock
Later
- Consider auto-generating cluster name upon initial startup (it might
actually be a partition property).
- Consider ways to centralise the configuration file, or make the
configuration updatable automatically on all nodes.
- Consider storing some metadata on master nodes (partition table [version],
...). This data should be treated non-authoritatively, as a way to lower
the probability to use an outdated partition table.
- Decentralize primary master tasks as much as possible (consider
distributed lock mechanisms, ...)
- Choose how to compute the storage size
- Investigate delta compression for stored data
Idea would be to have a few most recent revisions being stored fully, and
older revision delta-compressed, in order to save space.
NEO 1.6
=======
The `ttrans` table in MySQL/SQLite backends is automatically upgraded at
startup.
However, because the algorithm to verify unfinished data has changed in a way
that such data can not be migrated, a storage node will refuse to start to
if either `ttrans` or `tobj` are not empty.
Therefore, you must first stop NEO cleanly, before upgrading the code. If any
temporary table is not empty, you are requested to restart with an older
version of NEO.
The change in `ttrans` is such that NEO < 1.6 is still usable after a
successful upgrade to NEO >= 1.6, provided you always make sure that `ttrans`
and `tobj` are empty when switching from one to another.
NEO 1.4
=======
The schema of `data` table in MySQL/SQLite backends has changed and there is
no transparent migration because the changes are optional and this table may
be huge. You can either use the following SQL commands to upgrade each storage,
or migrate data to new nodes with replication.
MySQL
-----
::
ALTER TABLE data
-- minor optimization
MODIFY value MEDIUMBLOB NOT NULL,
-- fix store of multiple values that only differ by the compression flag
DROP KEY hash, ADD UNIQUE (hash, compression);
SQLite
------
::
-- In SQLite, ALTER TABLE is limited so it's all or nothing
-- (here the added 'NOT NULL' on value column is cosmetic)
CREATE TABLE new_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash BLOB NOT NULL,
compression INTEGER NOT NULL,
value BLOB NOT NULL);
INSERT INTO new_data SELECT * FROM data;
CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON new_data(hash, compression);
DROP TABLE data;
ALTER TABLE new_data RENAME TO data;
NEO 1.0
=======
The format of MySQL tables has changed in NEO 1.0 and there is no backward
compatibility or transparent migration, so you will have to use the following
SQL commands to migrate each storage from NEO 0.10.x::
-- make sure 'tobj' & 'ttrans' are empty first
-- and all storages have up-to-date partition tables
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
UPDATE new_data, data SET new_data.compression=data.compression, new_data.value=data.value WHERE new_data.hash=data.hash;
DROP TABLE data;
RENAME TABLE new_data TO data;
CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_tid BIGINT UNSIGNED NULL, PRIMARY KEY (partition, tid, oid), KEY (partition, oid, tid), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial as tid, data.id as data_id, value_serial as value_tid FROM obj LEFT JOIN data ON (obj.hash=data.hash);
DROP TABLE obj;
RENAME TABLE new_obj TO obj;
ALTER TABLE tobj CHANGE serial tid BIGINT UNSIGNED NOT NULL, CHANGE hash data_id BIGINT UNSIGNED NULL, CHANGE value_serial value_tid BIGINT UNSIGNED NULL;
ALTER TABLE trans ADD COLUMN ttid BIGINT UNSIGNED NOT NULL;
UPDATE trans SET ttid=tid;
ALTER TABLE ttrans ADD COLUMN ttid BIGINT UNSIGNED NOT NULL;
ALTER TABLE config MODIFY name VARBINARY(255) NOT NULL;
CREATE TEMPORARY TABLE nid (new INT NOT NULL AUTO_INCREMENT PRIMARY KEY, old CHAR(32) NOT NULL, KEY (old)) ENGINE = InnoDB SELECT DISTINCT uuid as old FROM pt ORDER BY uuid;
ALTER TABLE pt DROP PRIMARY KEY, ADD nid INT NOT NULL after rid;
UPDATE pt, nid SET pt.nid=nid.new, state=state-1 WHERE pt.uuid=nid.old;
ALTER TABLE pt DROP uuid, ADD PRIMARY KEY (rid, nid);
UPDATE config, nid SET config.name='nid', config.value=nid.new WHERE config.name='uuid' AND nid.old=config.value;
DELETE FROM config WHERE name='loid';
NEO 0.10
========
The format of MySQL tables has changed in NEO 0.10 and there is no backward
compatibility or transparent migration, so you will have to use the following
SQL commands to migrate each storage from NEO < 0.10::
-- make sure 'tobj' is empty first
DROP TABLE obj_short, tobj;
ALTER TABLE obj CHANGE checksum hash BINARY(20) NULL;
UPDATE obj SET value=NULL WHERE value='';
UPDATE obj SET hash=UNHEX(SHA1(value));
ALTER TABLE obj ADD KEY (hash(4));
CREATE TABLE data (hash BINARY(20) NOT NULL PRIMARY KEY, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT hash, compression, value FROM obj WHERE hash IS NOT NULL;
ALTER TABLE obj DROP compression, DROP value;
UPDATE obj, obj as undone SET obj.hash=undone.hash WHERE obj.value_serial IS NOT NULL AND obj.partition=undone.partition AND obj.oid=undone.oid AND obj.value_serial=undone.serial;
If 'tobj' is not empty, this means your cluster was not shutdown properly.
Restart it to flush the last committed transaction.
This diff is collapsed.
# This file describes how to use the NEO Importer storage backend,
# which is the recommended way to import the data of an existing Zope database
# into a NEO cluster.
#
# Note that the 'neomigrate' command is another way to migrate to NEO but:
# - the database is unusable as long as migration is not finished;
# - it can not merge bases at mountpoints;
# - it can not resume an interrupted migration;
# - it does not preserve metadata that are specific to "undo" transactions
# (which would then behave like normal transactions);
# - it only supports conversion from FileStorage;
# - it is slower.
# The only advantage of 'neomigrate' over Importer is that data can be imported
# directly to a NEO cluster with replicas or several storage nodes.
# Importer backend can only be used with a single storage node.
#
# Here is how to proceed once this file is ready:
# 1. Restart ZODB clients to connect to new NEO cluster (not started yet).
# 2. Start NEO cluster (use 'neoctl -a <admin> start' command if necessary).
# 3. Your Zope applications should work and background migration of data
# started automatically. The only downtime depends on how fast you are to do
# the first 2 steps. The only limitations in this mode are that:
# - pack is not supported
# - IStorage.history() is not implemented (this is used for example by
# history tab in Zope Management Interface)
# 4. A warning message reporting that "All data are imported"
# is emitted to storage node's log when the migration is finished.
# This can take days with big databases.
# The following steps can be scheduled any time after migration is over,
# at your convenience:
# 5. Change NEO configuration to stop using Importer backend.
# 6. Stop clients.
# 7. Restart NEO cluster.
# 8. Start clients. This was the second, very short, downtime.
# 9. Optionally, add storage nodes and balance partitions.
#
# Data are migrated even if your ZODB clients are stopped.
# The first section describes the destination NEO storage.
# See neo.conf for description of parameters.
[neo]
# Once migration is finished, restart NEO storage to use the below directly
# (instead of adapter=Importer & database=/path_to_this_file).
adapter=MySQL
database=neo
# The other sections are for source databases.
[root]
# Example with FileStorage but this can be anything else.
# ZEO is possible but less efficient: ZEO servers must be stopped
# if NEO opens FileStorage DBs directly.
# Note that NEO uses 'new_oid' method to get the last OID, that's why the
# source DB can't be open read-only. NEO never modifies a FileStorage DB.
storage=
<filestorage>
path /path/to/root.fs
</filestorage>
# (leading spaces indicate value continuation)
# This file can stop here if your source DB is not splitted.
# Otherwise, you need to describe mountpoints and other ZODB.
# OID mapping can't be changed once the NEO cluster is started with this
# configuration.
# Mountpoints for this ZODB (see the use case at the end of this file).
# <section_name>=<oid>
foo=421
bar=666
# Following sections must define 'oid' parameter.
[foo]
# Any reference to oid 421 in 'root' is changed to point to oid 123 of 'foo'.
# Of course, original oid 421 in 'root' will become unreferenced.
oid=123
storage=
<filestorage>
path /path/to/foo.fs
</filestorage>
baz=1000
[bar]
oid=4567
storage=
<filestorage>
path /path/to/bar.fs
</filestorage>
[baz]
oid=2000
storage=
<filestorage>
path /path/to/baz.fs
</filestorage>
## Case of several databases linked with MountedObject objects
#
# MountedObject is provided by ZODBMountPoint Zope product.
# It relies on IAcquirer and ITraversable to fetch the real object.
#
# Given the following multi-base:
# - in 'foo', /a/b is the real object
# - in 'root', /c/d is a MountedObject object configured to point to
# /a/b in 'foo'
#
# So you need to get the oid of /a/b in 'foo':
# unrestrictedTraverse("/a/b")._p_oid
# which equals to 123 in the above example
#
# And that of /c/d in 'root':
# unrestrictedTraverse("/c").__dict__["d"]._p_oid -> 421
# The way to retrieve the mount point depends on the container type.
# For a BTreeFolder2, it would be: c._tree["d"]._p_oid
# Note: Unless otherwise noted, all parameters in this configuration file
# must be identical for all nodes in a given cluster.
# This file is optional: parameters can be given at the command line.
# See SafeConfigParser at https://docs.python.org/2/library/configparser.html
# for more information about the syntax.
# ~ and ~user constructions are expanded for all paths, even for those that
# may appear in 'database' option.
# Common parameters.
[DEFAULT]
# The cluster name
# This must be set.
# IMPORTANT: It must be a name unique to a given cluster, to prevent foreign
# misconfigured nodes from interfering.
cluster: test
# The list of master nodes
# This list should be identical for all nodes in a given cluster for
# maximum availability.
# With replicas, it is recommended to have 1 master node per machine
# (physical or not). Otherwise, 1 is enough (but more do not harm).
masters: 127.0.0.1:10000
# Partition table configuration
# Data in the cluster is distributed among nodes using a partition table, which
# has the following parameters.
# Replicas: How many copies of a partition should exist at a time.
# 0 means no redundancy
# 1 means there is a spare copy of all partitions
replicas: 0
# IMPORTANT: NEO does not try to spread replicas on different physical devices
# so replicas should not be used if you have at least 2 nodes
# storing data on the same device.
# Partitions: How data spreads among storage nodes.
# IMPORTANT: This can not be changed once the cluster contains data.
partitions: 12
# The maximum number of usable storage nodes is: partitions * (replicas + 1)
# Master-slave asynchronous replication
# The following 2 options are only required if you want to backup another
# NEO cluster.
;upstream_cluster: test2
;upstream_masters: 127.0.0.1:30000
# The 3 following options must be specified to enabled SSL.
# CA should be the same for all nodes, and it can be the concatenation of
# several CAs and CRLs.
;ca = ~/etc/ca.crt
;cert = ~/etc/cert.crt
;key = ~/etc/cert.key
# Individual nodes parameters
# Some parameters makes no sense to be defined in [DEFAULT] section.
# They are:
# bind: The ip:port the node will listen on.
# database: Storage nodes only. Syntax for:
# - MySQL: [user[:password]@]database[unix_socket]
# Database must be created manually.
# - SQLite: path
# engine: Optional parameter for MySQL.
# Can be InnoDB (default), RocksDB or TokuDB.
# Admin node
[admin]
bind: 127.0.0.1:9999
# Paths to log files can be specified here, but be careful not to do it in a
# common section.
;logfile: ~/log/admin.log
# Nodes can have their own certificates.
;cert = admin.crt
;key = admin.key
# Master nodes
[master]
bind: 127.0.0.1:10000
# Storage nodes
[storage]
database: neo:neo@neo1
bind: 127.0.0.1:20000
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication):
"""The storage node application."""
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
for address in config.getMasters():
self.nm.createMaster(address=address)
self.name = config.getCluster()
self.server = config.getBind()
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.getUUID()
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
self.reset()
registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
super(Application, self).close()
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
def log(self):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def run(self):
try:
self._run()
except Exception:
logging.exception('Pre-mortem data:')
self.log()
logging.flush()
raise
def _run(self):
"""Make sure that the status is sane and start a loop."""
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
# Make a listening port.
handler = AdminEventHandler(self)
self.listening_conn = ListeningConnection(self, handler, self.server)
while self.cluster_state != ClusterStates.STOPPING:
self.connectToPrimary()
try:
while True:
self.em.poll(1)
except PrimaryFailure:
logging.error('primary master is down')
self.listening_conn.close()
while not self.em.isIdle():
self.em.poll(1)
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage.
"""
self.cluster_state = None
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
if max_offset == 0:
max_offset = self.pt.getPartitions()
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
conn.send(Errors.ProtocolError('invalid partition table offset'))
else:
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, Packets
from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
node_filter = None
else:
node_filter = lambda n: n.getType() is node_type
logging.info("ask list of %s nodes", node_type)
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler):
""" This class is just used to dispatch message to right handler"""
def _connectionLost(self, conn):
app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None)
conn.cancelRequests("connection to master lost")
app.reset()
app.uuid = None
raise PrimaryFailure
def connectionFailed(self, conn):
self._connectionLost(conn)
def connectionClosed(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}):
if 'conn' in kw:
# expected answer
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state):
self.app.cluster_state = state
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
# XXX: to be deleted ?
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage, ConflictResolution, POSException
from zope.interface import implementer
import ZODB.interfaces
from neo.lib import logging
from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def raiseReadOnlyError(*args, **kw):
raise POSException.ReadOnlyError()
@implementer(
ZODB.interfaces.IStorage,
# ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IExternalGC,
ZODB.interfaces.ReadVerifyingStorage,
)
class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
def __init__(self, master_nodes, name, read_only=False,
compress=None, logfile=None, _app=None, **kw):
"""
Do not pass those parameters (used internally):
_app
"""
if compress is None:
compress = True
if logfile:
logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if read_only:
for method_id in (
'new_oid',
'tpc_begin',
'tpc_vote',
'tpc_abort',
'store',
'deleteObject',
'undo',
'undoLog',
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None:
ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')]
_app = Application(master_nodes, name, compress=compress,
ssl=ssl if any(ssl) else None, **kw)
self.app = _app
@property
def _cache(self):
return self.app._cache
def load(self, oid, version=''):
# XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make
# it optional.
assert version == '', 'Versions are not supported'
try:
return self.app.load(oid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def new_oid(self):
return self.app.new_oid()
def tpc_begin(self, transaction, tid=None, status=' '):
"""
Note: never blocks in NEO.
"""
return self.app.tpc_begin(self, transaction, tid, status)
def tpc_vote(self, transaction):
return self.app.tpc_vote(transaction)
def tpc_abort(self, transaction):
return self.app.tpc_abort(transaction)
def tpc_finish(self, transaction, f=None):
return self.app.tpc_finish(transaction, f)
def store(self, oid, serial, data, version, transaction):
assert version == '', 'Versions are not supported'
return self.app.store(oid, serial, data, version, transaction)
def deleteObject(self, oid, serial, transaction):
self.app.store(oid, serial, None, None, transaction)
# multiple revisions
def loadSerial(self, oid, serial):
try:
return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def loadBefore(self, oid, tid):
try:
return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid)
except NEOStorageNotFoundError:
return None
@property
def iterator(self):
return self.app.iterator
# undo
def undo(self, transaction_id, txn):
return self.app.undo(transaction_id, txn)
def undoLog(self, first=0, last=-20, filter=None):
return self.app.undoLog(first, last, filter)
def supportsUndo(self):
return True
def supportsTransactionalUndo(self):
return True
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
return data, serial, ''
def __len__(self):
return self.app.getObjectCount()
def registerDB(self, db, limit=None):
self.app.registerDB(db, limit)
def history(self, oid, *args, **kw):
try:
return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def sync(self):
return self.app.sync()
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
return self.importFrom(source)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(self, source, start, stop, preindex)
def pack(self, t, referencesf, gc=False):
if gc:
logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.')
self.app.pack(t)
def lastSerial(self):
# seems unused
raise NotImplementedError
def lastTransaction(self):
# Used in ZODB unit tests
return self.app.last_tid
def _clear_temp(self):
raise NotImplementedError
def set_max_oid(self, possible_new_max_oid):
# seems used only by FileStorage
raise NotImplementedError
def close(self):
# WARNING: This does not handle the case where an app is shared by
# several Storage instances, but this is something that only
# happens in threaded tests (and this method is not called on
# extra Storages).
app = self.app
if app is not None:
self.app = None
app.close()
def getTid(self, oid):
try:
return self.app.getLastTID(oid)
except NEOStorageNotFoundError:
raise KeyError
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
self.app.checkCurrentSerialInTransaction(oid, serial, transaction)
##############################################################################
#
# Copyright (C) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
def patch():
from hashlib import md5
from ZODB.Connection import Connection
H = lambda f: md5(f.func_code.co_code).hexdigest()
# Allow serial to be returned as late as tpc_finish
#
# This makes possible for storage to allocate serial inside tpc_finish,
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
h = H(Connection.tpc_finish)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
# <patch>
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert isinstance(serial, str), repr(serial)
for oid_iterator in (self._modified, self._creating):
for oid in oid_iterator:
obj = self._cache.get(oid, None)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
# </patch>
self._tpc_cleanup()
global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish
elif hasattr(Connection, '_handle_serial'): # merged upstream ?
assert hasattr(Connection, '_warn_about_returned_serial')
# sync() is used to provide a "network barrier", which is required for
# NEO & ZEO to make sure our view of the storage includes all changes done
# so far by other clients. But a round-trip to the server introduces
# latency so it must not be done when it's not useful. Note also that a
# successful commit (which ends with a response from the master) already
# acts as a "network barrier".
# BBB: What this monkey-patch does has been merged in ZODB5.
if not hasattr(Connection, '_flush_invalidations'):
return
assert H(Connection.afterCompletion) in (
'cd3a080b80fd957190ff3bb867149448', # Python 2.7
)
def afterCompletion(self, *ignored):
self._readCurrent.clear()
# PATCH: do not call sync()
self._flush_invalidations()
Connection.afterCompletion = afterCompletion
patch()
import app # set up signal handlers early enough to do it in the main thread
This diff is collapsed.
This diff is collapsed.
<component prefix="neo.client.config">
<sectiontype name="NeoStorage" datatype=".NeoStorage"
implements="ZODB.storage">
<description>
A scalable storage for Zope
</description>
<key name="master_nodes" required="yes">
<description>
Give the list of the master node like ip:port ip:port...
</description>
</key>
<key name="name" required="yes">
<description>
Give the name of the cluster
</description>
</key>
<key name="compress" datatype="boolean">
<description>
If true, data is automatically compressed (unless compressed size is
not smaller). This is the default behaviour.
</description>
</key>
<key name="read-only" datatype="boolean">
<description>
If true, only reads may be executed against the storage. Note
that the "pack" operation is not considered a write operation
and is still allowed on a read-only neostorage.
</description>
</key>
<key name="logfile" datatype="existing-dirpath">
<description>
Log debugging information to specified SQLite DB.
</description>
</key>
<key name="cache-size" datatype="byte-size" default="20MB">
<description>
Storage cache size in bytes. Records are cached uncompressed.
Optional ``KB``, ``MB`` or ``GB`` suffixes can (and usually are)
used to specify units other than bytes.
</description>
</key>
<key name="dynamic_master_list" datatype="existing-dirpath">
<description>
The file designated by this option contains an updated list of master
nodes which are known to be part of current cluster, so new nodes can
be added/removed without requiring a config change each time.
</description>
</key>
<key name="ca" datatype="existing-file">
<description>
Certificate authority in PEM format.
</description>
</key>
<key name="cert" datatype="existing-file">
<description>
Certificate in PEM format.
</description>
</key>
<key name="key" datatype="existing-file">
<description>
Private key in PEM format.
</description>
</key>
</sectiontype>
</component>
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB.config import BaseConfig
class NeoStorage(BaseConfig):
def open(self):
from .Storage import Storage
config = self.config
return Storage(**{k: getattr(config, k)
for k in config.getSectionAttributes()})
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import POSException
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageReadRetry(NEOStorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
pass
class NEOStorageDoesNotExistError(NEOStorageNotFoundError):
"""
This error is a refinement of NEOStorageNotFoundError: this means
that some object was not found, but also that it does not exist at all.
"""
class NEOStorageCreationUndoneError(NEOStorageDoesNotExistError):
"""
This error is a refinement of NEOStorageDoesNotExistError: this means that
some object existed at some point, but its creation was undone.
"""
# TODO: Inherit from transaction.interfaces.TransientError
# (not recognized yet by ERP5 as a transient error).
class NEOPrimaryMasterLost(POSException.ReadConflictError):
"""
A read operation to a storage node failed because we get disconnected from
the primary master (which causes all connections to storages to be closed
as well). The connection failure to the master may be temporary so we'd
like to restart the transaction; if it isn't, we'll get failures when
trying to reconnect.
"""
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import handler
from ZODB.POSException import StorageError, ReadOnlyError
class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX
def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message)
def readOnlyAccess(self, conn, message):
raise ReadOnlyError(message)
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import MTEventHandler
from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeStates
from . import AnswerBaseHandler
from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def answerPartitionTable(self, conn, ptid, row_list):
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerLastTransaction(*args):
pass
class PrimaryNotificationsHandler(MTEventHandler):
""" Handler that process the notifications from the primary master """
def notPrimaryMaster(self, *args):
try:
super(PrimaryNotificationsHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
self.app.primary_master_node, = e.args
def _acceptIdentification(self, node, num_partitions, num_replicas):
self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
app_last_tid = app.__dict__.get('last_tid', '')
if app_last_tid != ltid:
# Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
app._cache_lock_acquire()
try:
if app_last_tid < ltid:
app._cache.clear_current()
# In the past, we tried not to invalidate the
# Connection caches entirely, using the list of
# oids that are invalidated by clear_current.
# This was wrong because these caches may have
# entries that are not in the NEO cache anymore.
else:
# The DB was truncated. It happens so
# rarely that we don't need to optimize.
app._cache.clear()
# Make sure a parallel load won't refill the cache
# with garbage.
app._loading_oid = app._loading_invalidated = None
finally:
app._cache_lock_release()
db = app.getDB()
db is None or db.invalidateCache()
app.last_tid = ltid
app.ignore_invalidations = False
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app
app.last_tid = tid
# Update cache
cache = app._cache
app._cache_lock_acquire()
try:
for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
if callback is not None:
callback(tid)
finally:
app._cache_lock_release()
def connectionClosed(self, conn):
app = self.app
if app.master_conn is not None:
msg = "connection to primary master node closed"
logging.critical(msg)
app.master_conn = None
for txn_context in app.txn_contexts():
txn_context.error = msg
try:
app.__dict__.pop('pt').clear()
except KeyError:
pass
app.primary_master_node = None
super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def stopOperation(self, conn):
logging.critical("master node ask to stop operation")
def notifyClusterInformation(self, conn, state):
# TODO: on shutdown, abort any transaction that is not voted
logging.info("cluster switching to %s state", state)
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
if app.ignore_invalidations:
return
app.last_tid = tid
app._cache_lock_acquire()
try:
invalidate = app._cache.invalidate
loading = app._loading_oid
for oid in oid_list:
invalidate(oid, tid)
if oid == loading:
app._loading_oid = None
app._loading_invalidated = tid
db = app.getDB()
if db is not None:
db.invalidate(tid, oid_list)
finally:
app._cache_lock_release()
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
# XXX: 'update' automatically closes UNKNOWN nodes. Do we really want
# to do the same thing for nodes in other non-running states ?
getByUUID = self.app.nm.getByUUID
for node in node_list:
if node[3] != NodeStates.RUNNING:
node = getByUUID(node[2])
if node and node.isConnected():
node.getConnection().close()
def notifyDeadlock(self, conn, ttid, locking_tid):
for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid:
txn_context.conflict_dict[None] = locking_tid
txn_context.wakeup(conn)
break
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
def answerBeginTransaction(self, conn, ttid):
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
oid_list.reverse()
self.app.new_oid_list = oid_list
def incompleteTransaction(self, conn, message):
raise NEOStorageError("storage nodes for which vote failed can not be"
" disconnected without making the cluster non-operational")
def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid)
def answerPack(self, conn, status):
if not status:
raise NEOStorageError('Already packing')
def answerLastTransaction(self, conn, ltid):
pass
def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid)
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from zlib import decompress
from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import Packets, uuid_str
from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler
from ..transactions import Transaction
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError
class StorageEventHandler(MTEventHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionLost(conn, new_state)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn)
def _acceptIdentification(*args):
pass
class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """
def notReady(self, conn, message):
conn.close()
raise NodeNotReady(message)
class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """
def answerObject(self, conn, oid, *args):
self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflict, oid):
txn_context = self.app.getHandlerData()
if conflict:
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that
# also resolves a conflict successfully to the same storage nodes.
# Warning: if a storage (S1) is much faster than another (S2), then
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
# receive the conflict answer from the first store on S2.
logging.info('%s reports a conflict on %s:%s with %s',
uuid_str(conn.getUUID()), dump(oid),
dump(txn_context.ttid), dump(conflict))
# If this conflict is not already resolved, mark it for
# resolution.
if txn_context.resolved_dict.get(oid, '') < conflict:
txn_context.conflict_dict[oid] = conflict
else:
txn_context.written(self.app, conn.getUUID(), oid)
answerCheckCurrentSerial = answerStoreObject
def answerRebaseTransaction(self, conn, oid_list):
txn_context = self.app.getHandlerData()
ttid = txn_context.ttid
queue = txn_context.queue
try:
for oid in oid_list:
# We could have an extra parameter to tell the storage if we
# still have the data, and in this case revert what was done
# in Transaction.written. This would save bandwidth in case of
# conflict.
conn.ask(Packets.AskRebaseObject(ttid, oid),
queue=queue, oid=oid)
except ConnectionClosed:
txn_context.involved_nodes[conn.getUUID()] = 2
def answerRebaseObject(self, conn, conflict, oid):
if conflict:
txn_context = self.app.getHandlerData()
serial, conflict, data = conflict
assert serial and serial < conflict, (serial, conflict)
resolved = conflict <= txn_context.resolved_dict.get(oid, '')
try:
cached = txn_context.cache_dict.pop(oid)
except KeyError:
if resolved:
# We should still be waiting for an answer from this node.
assert conn.uuid in txn_context.data_dict[oid][2]
return
assert oid in txn_context.data_dict
if serial <= txn_context.conflict_dict.get(oid, ''):
# Another node already reported this conflict or a newer,
# by answering to this rebase or to the previous store.
return
# A node has not answered yet to a previous store. Do not wait
# it to report the conflict because it may fail before.
else:
# The data for this oid are now back on client side.
# Revert what was done in Transaction.written
assert not resolved
if data is None: # undo or CHECKED_SERIAL
data = cached
else:
compression, checksum, data = data
if checksum != makeChecksum(data):
raise NEOStorageError(
'wrong checksum while getting back data for'
' object %s during rebase of transaction %s'
% (dump(oid), dump(txn_context.ttid)))
if compression:
data = decompress(data)
size = len(data)
txn_context.data_size += size
if cached:
assert cached == data
txn_context.cache_size -= size
txn_context.data_dict[oid] = data, serial, None
txn_context.conflict_dict[oid] = conflict
def answerStoreTransaction(self, conn):
pass
answerVoteTransaction = answerStoreTransaction
def connectionClosed(self, conn):
txn_context = self.app.getHandlerData()
if type(txn_context) is Transaction:
txn_context.nodeLost(self.app, conn.getUUID())
super(StorageAnswersHandler, self).connectionClosed(conn)
def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
self.app.setHandlerData(({
'time': TimeStamp(tid).timeTime(),
'user_name': user,
'description': desc,
'id': tid,
'oids': oid_list,
'packed': packed,
}, ext))
def answerObjectHistory(self, conn, _, history_list):
# history_list is a list of tuple (serial, size)
self.app.setHandlerData(history_list)
def oidNotFound(self, conn, message):
# This can happen either when :
# - loading an object
# - asking for history
raise NEOStorageNotFoundError(message)
def oidDoesNotExist(self, conn, message):
raise NEOStorageDoesNotExistError(message)
def tidNotFound(self, conn, message):
# This can happen when requiring txn informations
raise NEOStorageNotFoundError(message)
def nonReadableCell(self, conn, message):
logging.info('non readable cell')
raise NEOStorageReadRetry(True)
def answerTIDs(self, conn, tid_list, tid_set):
tid_set.update(tid_list)
def answerObjectUndoSerial(self, conn, object_tid_dict, partition,
partition_oid_dict, undo_object_tid_dict):
del partition_oid_dict[partition]
undo_object_tid_dict.update(object_tid_dict)
def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid)
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage
from neo.lib.protocol import ZERO_TID, MAX_TID
from neo.lib.util import u64, add64
from .exception import NEOStorageCreationUndoneError, NEOStorageNotFoundError
CHUNK_LENGTH = 100
class Record(BaseStorage.DataRecord):
""" BaseStorage Transaction record yielded by the Transaction object """
def __str__(self):
oid = u64(self.oid)
tid = u64(self.tid)
args = (oid, tid, len(self.data), self.data_txn)
return 'Record %s:%s: %s (%s)' % args
class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """
def __init__(self, app, txn):
super(Transaction, self).__init__(txn['id'],
'p' if txn['packed'] else ' ',
txn['user_name'], txn['description'], txn['ext'])
self.app = app
self.oid_list = txn['oids']
def __iter__(self):
""" Iterate over the transaction records """
load = self.app._loadFromStorage
for oid in self.oid_list:
try:
data, _, _, data_tid = load(oid, self.tid, None)
except NEOStorageCreationUndoneError:
data = data_tid = None
except NEOStorageNotFoundError:
# Transactions are not updated after a pack, so their object
# will not be found in the database. Skip them.
continue
yield Record(oid, self.tid, data, data_tid)
def __str__(self):
return 'Transaction #%s: %s %s' \
% (u64(self.tid), self.user, self.status)
def iterator(app, start=None, stop=None):
"""NEO transaction iterator"""
if start is None:
start = ZERO_TID
stop = min(stop or MAX_TID, app.last_tid)
while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk:
break # nothing more
for txn in chunk:
yield Transaction(app, txn)
start = add64(max_tid, 1)
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, time
from neo.lib import logging
from neo.lib.locking import Lock
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady
from .exception import NEOPrimaryMasterLost
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
def __init__(self, app):
self.app = app
self.connection_dict = {}
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
self._lock = Lock()
self.node_failure_dict = {}
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
app = self.app
if app.master_conn is None:
raise NEOPrimaryMasterLost
conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, app.id_timestamp)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
except NodeNotReady:
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
return conn
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# The use of 'random' suffles cells to randomise node to access.
uuid = cell.getUUID()
# First, prefer a connected node.
if uuid in self.connection_dict:
return random()
# Then one that didn't fail recently.
failure = self.node_failure_dict.get(uuid)
if failure:
if time.time() < failure:
# At last, order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(uuid, None)
return 1 + random()
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if node.isRunning():
uuid = node.getUUID()
try:
# Already connected to node
return self.connection_dict[uuid]
except KeyError:
with self._lock:
# Second lookup, if another thread initiated connection
# while we were waiting for connection lock.
try:
return self.connection_dict[uuid]
except KeyError:
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
return conn
def removeConnection(self, node):
self.connection_dict.pop(node.getUUID(), None)
def closeAll(self):
with self._lock:
while 1:
try:
conn = self.connection_dict.popitem()[1]
except KeyError:
break
conn.setReconnectionNoDelay()
conn.close()
#
# Copyright (C) 2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import Packets
from .exception import NEOStorageError
@apply
class _WakeupPacket(object):
handler_method_name = 'pong'
decode = tuple
getId = int
class Transaction(object):
cache_size = 0 # size of data in cache_dict
data_size = 0 # size of data in data_dict
error = None
locking_tid = None
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
def __init__(self, txn):
self.queue = SimpleQueue()
self.txn = txn
# data being stored
self.data_dict = {} # {oid: (value, serial, [node_id])}
# data stored: this will go to the cache on tpc_finish
self.cache_dict = {} # {oid: value}
# conflicts to resolve
self.conflict_dict = {} # {oid: serial}
# resolved conflicts
self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may
# disappear from the cluster. In any case, we always have to check
# if the id is still known by the NodeManager.
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, store=1, **kw):
uuid_list = []
pt = app.pt
involved = self.involved_nodes
object_id = pt.getPartition(object_id)
for cell in pt.getCellList(object_id):
node = cell.getNode()
uuid = node.getUUID()
status = involved.get(uuid, -1)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = app.cp.getConnForNode(node)
if conn is not None:
try:
if status < 0 and self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because
# we don't need that for transaction metadata.
conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid)
continue
except ConnectionClosed:
pass
involved[uuid] = 2
if uuid_list:
return uuid_list
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid):
# When a node that is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
# conflict. Because we have no way to identify such case, we must keep
# the data in self.data_dict until all nodes have answered so we remain
# able to resolve conflicts.
try:
data, serial, uuid_list = self.data_dict[oid]
uuid_list.remove(uuid)
except KeyError:
# 1. store to S1 and S2
# 2. S2 reports a conflict
# 3. store to S1 and S2 # conflict resolution
# 4. S1 does not report a conflict (lockless)
# 5. S2 answers before S1 for the second store
return
except ValueError:
# The most common case for this exception is because nodeLost()
# tries all oids blindly. Other possible cases:
# - like above (KeyError), but with S2 answering last
# - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master
return
if uuid_list:
return
del self.data_dict[oid]
if type(data) is str:
size = len(data)
self.data_size -= size
size += self.cache_size
if size < app._cache._max_size:
self.cache_size = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
self.cache_dict[oid] = data
def nodeLost(self, app, uuid):
self.involved_nodes[uuid] = 2
for oid in list(self.data_dict):
self.written(app, uuid, oid)
class TransactionContainer(dict):
# IDEA: Drop this container and use the new set_data/data API on
# transactions (requires transaction >= 1.6).
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = Transaction(txn)
return context
#
# Copyright (C) 2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""NEO URI resolver for zodburi
URI format:
neo://name@master1,master2,...,masterN?options
"""
import ZODB.config
import ZConfig
from cStringIO import StringIO
from collections import OrderedDict
from urlparse import urlsplit, parse_qsl
# neo_zconf_options returns set of zconfig options supported by NEO storage
def neo_zconf_options():
neo_schema = """<schema>
<import package="ZODB" />
<import package="neo.client" />
</schema>"""
neo_schema = StringIO(neo_schema)
neo_schema = ZConfig.loadSchemaFile(neo_schema)
neo_storage_zconf = neo_schema.gettype('NeoStorage')
options = {k for k, _ in neo_storage_zconf}
assert 'master_nodes' in options
assert 'name' in options
return options
# canonical_opt_name returns "oPtion_nAme" as "option-name"
def canonical_opt_name(name):
return name.lower().replace('_', '-')
# worker entrypoint for resolve_uri and tests
def _resolve_uri(uri):
scheme, netloc, path, query, frag = urlsplit(uri)
if scheme != "neo":
raise ValueError("invalid uri: %s : expected neo:// scheme" % uri)
if path != "":
raise ValueError("invalid uri: %s : non-empty path" % uri)
if frag != "":
raise ValueError("invalid uri: %s : non-empty fragment" % uri)
# extract master list and name from netloc
name, masterloc = netloc.split('@', 1)
master_list = masterloc.split(',')
neokw = OrderedDict()
neokw['master_nodes'] = ' '.join(master_list)
neokw['name'] = name
# get options from query: only those that are defined by NEO schema go to
# storage - rest are returned as database options
dbkw = {}
neo_options = neo_zconf_options()
for k, v in OrderedDict(parse_qsl(query)).items():
if k in neo_options:
neokw[k] = v
else:
# it might be option for storage, but not in canonical form e.g.
# read_only -> read-only (zodburi world settled on using "_" and
# ZConfig world on "-" as separators)
k2 = canonical_opt_name(k)
if k2 in neo_options:
neokw[k2] = v
# else keep this kv as db option
else:
dbkw[k] = v
# now we have everything. Let ZConfig do actual work for validation options
# and borning the storage
neozconf = """%import neo.client
<NEOStorage>
"""
for k, v in neokw.items():
neozconf += " %s\t%s\n" % (k, v)
neozconf += "</NEOStorage>\n"
return neozconf, dbkw
# resolve_uri resolves uri according to neo:// schema.
# see module docstring for uri format.
def resolve_uri(uri):
neozconf, dbkw = _resolve_uri(uri)
def factory():
return ZODB.config.storageFromString(neozconf)
return factory, dbkw
"""Example of script starting a debugger on RTMIN+3 signal
The pdb is launched in a separate thread in order not to trigger timeouts.
The prompt is accessible through network in case that the process is daemonized:
$ socat READLINE TCP:127.0.0.1:54930
> neo/debug.py(63)pdb()
-> app # this is Application instance
(Pdb) app
<neo.master.app.Application object at 0x1fc9750>
"""
import sys
def app_set():
try:
return sys.modules['neo.lib.threaded_app'].app_set
except KeyError:
f = sys._getframe(4)
try:
while f.f_code.co_name != 'run' or \
f.f_locals.get('self').__class__.__name__ != 'Application':
f = f.f_back
return f.f_locals['self'],
except AttributeError:
return ()
def defer(task):
def wrapper():
from traceback import print_exc
try:
task(app)
except:
print_exc()
for app in app_set():
app.em.wakeup(wrapper)
break
IF = 'pdb'
if IF == 'pdb':
# List of (module, callables) to break at.
# If empty, a thread is started with a breakpoint.
# All breakpoints are automatically removed on the first break,
# or when this module is reloaded.
BP = (#('ZODB.Connection', 'Connection.setstate'),
#('ZPublisher.Publish', 'publish_module_standard'),
)
import errno, socket, threading, weakref
# Unfortunately, IPython does not always print to given stdout.
#from neo.lib.debug import getPdb
from pdb import Pdb as getPdb
class Socket(object):
def __init__(self, socket):
# In case that the default timeout is not None.
socket.settimeout(None)
self._socket = socket
self._buf = ''
def write(self, data):
self._socket.send(data)
def readline(self):
recv = self._socket.recv
data = self._buf
while True:
i = 1 + data.find('\n')
if i:
self._buf = data[i:]
return data[:i]
d = recv(4096)
data += d
if not d:
self._buf = ''
return data
def flush(self):
pass
def closed(self):
self._socket.setblocking(0)
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
def pdb():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# For better security, maybe we should use a unix socket.
s.settimeout(60)
s.bind(('127.0.0.1', 0))
s.listen(0)
print 'Listening to %u' % s.getsockname()[1]
sys.stdout.flush() # BBB: On Python 3, print() takes a 'flush' arg.
_socket = Socket(s.accept()[0])
finally:
s.close()
try:
app, = app_set
except ValueError:
app = None
getPdb(stdin=_socket, stdout=_socket).set_trace()
app # this is Application instance (see 'app_set' if there are several)
app_set = app_set()
class setupBreakPoints(list):
def __init__(self, bp_list):
self._lock = threading.Lock()
for o, name in bp_list:
o = __import__(o, fromlist=('*',), level=0)
x = name.split('.')
name = x.pop()
for x in x:
o = getattr(o, x)
orig = getattr(o, name)
if orig.__module__ == __name__:
orig.__closure__[1].cell_contents._revert()
orig = getattr(o, name)
assert orig.__module__ != __name__, (o, name)
orig = getattr(orig, '__func__', orig)
self.append((o, name, orig))
setattr(o, name, self._wrap(orig))
print 'BP set on', orig
sys.stdout.flush()
self._hold = weakref.ref(pdb, self._revert)
def _revert(self, *_):
for x in self:
setattr(*x)
print 'BP removed on', x[2]
sys.stdout.flush()
del self[:]
def _wrap(self, orig):
return lambda *args, **kw: self(orig, *args, **kw)
def __call__(self, orig, *args, **kw):
stop = False
with self._lock:
if self:
stop = True
self._revert()
if stop:
pdb()
return orig(*args, **kw)
if BP:
setupBreakPoints(BP)
else:
threading.Thread(target=pdb).start()
elif IF == 'frames':
import traceback
write = sys.stderr.write
for thread_id, frame in sys._current_frames().iteritems():
write("Thread %s:\n" % thread_id)
traceback.print_stack(frame)
write("End of dump\n")
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .logger import logging
#
# Copyright (C) 2015-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import logging
from .event import EventManager
from .node import NodeManager
class BaseApplication(object):
server = None
ssl = None
def __init__(self, ssl=None, dynamic_master_list=None):
if ssl:
if not all(ssl):
raise ValueError("To enable encryption, 3 files must be"
" provided: the CA certificate, and the certificate"
" of this node with its private key.")
ca, cert, key = ssl
import ssl
version, version_name = max((getattr(ssl, k), k)
for k in dir(ssl) if k.startswith("PROTOCOL_TLSv"))
self.ssl = context = ssl.SSLContext(version)
context.options |= (0
| ssl.OP_CIPHER_SERVER_PREFERENCE
| ssl.OP_NO_COMPRESSION
)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(ca)
context.load_cert_chain(cert, key)
context.verify_flags |= ssl.VERIFY_X509_STRICT | (
context.cert_store_stats()['crl'] and ssl.VERIFY_CRL_CHECK_LEAF)
logging.info("TLS %s enabled for %s",
float(version_name[13:].replace("_", ".")), self)
self._handlers = {}
self.em = EventManager()
self.nm = NodeManager(dynamic_master_list)
# XXX: Do not implement __del__ unless all references to the Application
# become weak.
# Due to cyclic references, Python < 3.4 would never call it unless
# it's closed explicitly, and in this case, there's nothing to do.
def close(self):
self.nm.close()
self.em.close()
self.__dict__.clear()
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
ATTRIBUTE_TRACKER_ENABLED = False
from .locking import LockUser
"""
Usage example:
from neo import attributeTracker
class Foo(object):
...
def assertBar(self, expected_value):
if self.bar_attr != expected_value:
attributeTracker.whoSet(self, 'bar_attr')
attributeTracker.track(Foo)
"""
MODIFICATION_CONTAINER_ID = '_attribute_tracker_dict'
def tracker_setattr(self, attr, value, setattr):
modification_container = getattr(self, MODIFICATION_CONTAINER_ID, None)
if modification_container is None:
modification_container = {}
setattr(self, MODIFICATION_CONTAINER_ID, modification_container)
modification_container[attr] = LockUser()
setattr(self, attr, value)
if ATTRIBUTE_TRACKER_ENABLED:
def track(klass):
original_setattr = klass.__setattr__
def klass_tracker_setattr(self, attr, value):
tracker_setattr(self, attr, value, original_setattr)
klass.__setattr__ = klass_tracker_setattr
else:
def track(klass):
pass
def whoSet(instance, attr):
result = getattr(instance, MODIFICATION_CONTAINER_ID, None)
if result is not None:
result = result.get(attr)
if result is not None:
result = result.formatStack()
return result
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class NeoException(Exception):
pass
class PrimaryElected(NeoException):
pass
class PrimaryFailure(NeoException):
pass
class StoppedOperation(NeoException):
pass
class DatabaseFailure(NeoException):
pass
class NodeNotReady(NeoException):
pass
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
-----BEGIN CERTIFICATE-----
MIIC7TCCAdWgAwIBAgIJAL8e44sA7PDMMA0GCSqGSIb3DQEBCwUAMA0xCzAJBgNV
BAMMAkNBMB4XDTE1MDkzMDEzNTQzMFoXDTIxMDMyMjEzNTQzMFowDTELMAkGA1UE
AwwCQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCgT7DVKM4ViQt3
B0oJC4RGi10dNfpNZZpgA5iC2UJ1u6AqqCf0PCQkrmuIzW3l1TenlOiLNdVASkkT
wf1lekIgg4tR8/22oGTAnfY6R9r1C6jAMV72v1sffz8D6qfkMPzKchJt55zywdhm
KscUsMGzXPGIeKrG20m83dSIO4RmCmq/f4BcuWJu6Kkq4n9Wc2IsvpKk+lqEUxI/
QoqdT6OvMXooGs3t892uvKDu++muBj2Y/yyaXt1tCCjDFsRMLWl3Skks+4PeMCZ4
wugyXEBk3d5Yzdv5NsFzFBjAuRCGxJXEOEcfHj4Xj9qTCErZ1jKzgnuxJCtgdqRC
r4beX1U3AgMBAAGjUDBOMB0GA1UdDgQWBBSFY/jKvo0iSTEzzOIcZZUZCT8JfTAf
BgNVHSMEGDAWgBSFY/jKvo0iSTEzzOIcZZUZCT8JfTAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4IBAQAB0LKDAuhodpyNVwEE9Yl+Q/IiPEPCaix6URJnRn1O
gQnXuZLo1xtJh6wJh1faG1/qNCFMxWEJ+0VkJ7r6v38cNXfYG9OcmD0S6YnNjSuO
VliAtqVVtj8MppJ4vMatLrNi4cvyYucebtNyBCzSIAi+6bkkHeaVgi1EtxXvq+AS
iZp3gl84oXv/gV7Bz4SXmVpFJnhsDMoQZG2KAULAgfZ2Am2I+ffG90cD/oEnS/3O
k3btqTvgIO8MWt8PY3sUOhJEoJYKnC9DppmhOhUTn4zzIIDSluKEOBHZiFb9AcmF
PvzL+8xiORCdUe1d6ANQQlUd0MM810BXZFYEXFbgKg8o
-----END CERTIFICATE-----
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment