Commit e9af596e authored by Kirill Smelkov's avatar Kirill Smelkov

X go/neo: tests: Terminate spawned processses with SIGTERM instead of SIGKILL

For example NEOCluster needs to shutdown gracefully, else there are
processes left for e.g. storage nodes and they dump somthing as below on
the terminal after tests completion:

    === RUN   TestLoad
    2020/10/21 14:33:00 zodb: FIXME: open ../zodb/storage/fs1/testdata/1.fs: raw cache is not ready for invalidations -> NoCache forced
    === RUN   TestLoad/py
    I: runneo.py: /tmp/neo445013868/1: Started master(s): 127.0.0.1:24661
    WARNING: This is not the recommended way to import data to NEO: you should use the Importer backend instead.
    NEO also does not implement IStorageRestoreable interface, which means that undo information is not preserved when using this tool: conflict resolution could happen when undoing an old transaction.
    Migrating from ../zodb/storage/fs1/testdata/1.fs to 127.0.0.1:24661
    Migration done in 0.19877
    --- PASS: TestLoad (0.75s)
        --- PASS: TestLoad/py (0.74s)
    PASS
    ok      lab.nexedi.com/kirr/neo/go/neo  0.749s
    (neo) (z-dev) (g.env) kirr@deco:~/src/neo/src/lab.nexedi.com/kirr/neo/go/neo$ Traceback (most recent call last):
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/tests/functional/__init__.py", line 182, in start
        getattr(neo.scripts,  command).main()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/scripts/neostorage.py", line 66, in main
        app.run()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/app.py", line 147, in run
        self._run()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/app.py", line 178, in _run
        self.doOperation()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/app.py", line 266, in doOperation
        poll()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/app.py", line 87, in _poll
        self.em.poll(1)
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/lib/event.py", line 155, in poll
        self._poll(blocking)
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/lib/event.py", line 253, in _poll
        timeout_object.onTimeout()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/lib/event.py", line 259, in onTimeout
        on_timeout()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/database/manager.py", line 207, in _deferredCommit
        self.commit()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/database/manager.py", line 193, in commit
        self._commit()
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/database/sqlite.py", line 90, in _commit
        retry_if_locked(self.conn.commit)
      File "/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/neo/storage/database/sqlite.py", line 45, in retry_if_locked
        return f(*args)
    OperationalError: disk I/O error
parent 2c2885a1
// Copyright (C) 2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xexec complements stdlib package os/exec.
// TODO move -> go123
package xexec
import (
"context"
"os/exec"
"syscall"
)
// Cmd is similar to exec.Cmd and is created by Command.
type Cmd struct {
*exec.Cmd
done chan struct{} // after wait completes
}
// Command is similar to exec.Command.
func Command(name string, argv ...string) *Cmd {
cmd := &Cmd{
Cmd: exec.Command(name, argv...),
done: make(chan struct{}),
}
return cmd
}
// XXX Cmd.CombinedOutput
// XXX Cmd.Output
// XXX Cmd.Run
// Start is similar to exec.Command.Start - it starts the specified command.
// Started command will be signalled with SIGTERM upon ctx cancel.
func (cmd *Cmd) Start(ctx context.Context) error {
err := cmd.Cmd.Start()
if err != nil {
return err
}
// program started - propagate ctx.cancel -> SIGTERM
go func() {
select {
case <-ctx.Done():
cmd.Process.Signal(syscall.SIGTERM) // XXX err
case <-cmd.done:
// ok
}
}()
return nil
}
// Wait is the same as exec.Command.Wait.
func (cmd *Cmd) Wait() error {
defer close(cmd.done)
return cmd.Cmd.Wait()
}
// WaitOrKill it wait for spawned process to exit, but kills it with SIGKILL on killCtx cancel.
func (cmd *Cmd) WaitOrKill(ctxKill context.Context) error {
// `kill -9` on ctx cancel
go func() {
select {
case <-ctxKill.Done():
_ = cmd.Process.Kill()
case <-cmd.done:
//ok
}
}()
err := cmd.Wait()
// return "context canceled" if we were forced to kill the child
if ectx := ctxKill.Err(); ectx != nil {
err = ectx
}
return err
}
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"testing" "testing"
"time" "time"
"lab.nexedi.com/kirr/neo/go/internal/xexec"
"lab.nexedi.com/kirr/neo/go/internal/xtesting" "lab.nexedi.com/kirr/neo/go/internal/xtesting"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -47,7 +48,7 @@ type NEOSrv interface { ...@@ -47,7 +48,7 @@ type NEOSrv interface {
// //
// Create it with StartNEOPySrv(XXX). // Create it with StartNEOPySrv(XXX).
type NEOPySrv struct { type NEOPySrv struct {
pysrv *exec.Cmd // spawned `runneo.py` pysrv *xexec.Cmd // spawned `runneo.py`
workdir string // location for database and log files workdir string // location for database and log files
clusterName string // name of the cluster clusterName string // name of the cluster
opt NEOPyOptions // options for spawned server opt NEOPyOptions // options for spawned server
...@@ -89,14 +90,15 @@ func StartNEOPySrv(workdir, clusterName string, opt NEOPyOptions) (_ *NEOPySrv, ...@@ -89,14 +90,15 @@ func StartNEOPySrv(workdir, clusterName string, opt NEOPyOptions) (_ *NEOPySrv,
} }
n := &NEOPySrv{workdir: workdir, clusterName: clusterName, cancel: cancel, done: make(chan struct{})} n := &NEOPySrv{workdir: workdir, clusterName: clusterName, cancel: cancel, done: make(chan struct{})}
n.pysrv = exec.CommandContext(ctx, "python", "./py/runneo.py", workdir, clusterName) // XXX +opt // XXX $PYTHONPATH to top, so that `import neo` works?
n.pysrv = xexec.Command("./py/runneo.py", workdir, clusterName) // XXX +opt
n.opt = opt n.opt = opt
// $TEMP -> workdir (else NEO/py creates another one for e.g. coverage) // $TEMP -> workdir (else NEO/py creates another one for e.g. coverage)
n.pysrv.Env = append(os.Environ(), "TEMP="+workdir) n.pysrv.Env = append(os.Environ(), "TEMP="+workdir)
n.pysrv.Stdin = nil n.pysrv.Stdin = nil
n.pysrv.Stdout = os.Stdout n.pysrv.Stdout = os.Stdout
n.pysrv.Stderr = os.Stderr n.pysrv.Stderr = os.Stderr
err = n.pysrv.Start() err = n.pysrv.Start(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
#!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2020 Nexedi SA and Contributors. # Copyright (C) 2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
...@@ -20,7 +21,6 @@ ...@@ -20,7 +21,6 @@
"""runneo.py runs NEO/py cluster for NEO/go testing. """runneo.py runs NEO/py cluster for NEO/go testing.
Usage: runneo.py <workdir> <cluster-name> XXX + (**kw for NEOCluster) Usage: runneo.py <workdir> <cluster-name> XXX + (**kw for NEOCluster)
XXX
""" """
from neo.tests.functional import NEOCluster from neo.tests.functional import NEOCluster
...@@ -28,6 +28,7 @@ from golang import func, defer ...@@ -28,6 +28,7 @@ from golang import func, defer
import sys, os import sys, os
from time import sleep from time import sleep
from signal import signal, SIGTERM
@func @func
...@@ -36,12 +37,20 @@ def main(): ...@@ -36,12 +37,20 @@ def main():
clusterName = sys.argv[2] clusterName = sys.argv[2]
readyf = workdir + "/ready" readyf = workdir + "/ready"
def sinfo(msg): return "I: runneo.py: %s/%s: %s" % (workdir, clusterName, msg)
def info(msg): print(sinfo(msg))
# SIGTERM -> exit gracefully, so that defers are run
def _(sig, frame):
raise SystemExit(sinfo("terminated"))
signal(SIGTERM, _)
cluster = NEOCluster([clusterName], adapter='SQLite', name=clusterName, temp_dir=workdir) # XXX +kw cluster = NEOCluster([clusterName], adapter='SQLite', name=clusterName, temp_dir=workdir) # XXX +kw
cluster.start() cluster.start()
defer(cluster.stop) defer(cluster.stop)
cluster.expectClusterRunning() cluster.expectClusterRunning()
print("I: runneo.py: %s/%s: Started master(s): %s" % (workdir, clusterName, cluster.master_nodes)) info("started master(s): %s" % (cluster.master_nodes,))
# dump information about ready cluster into readyfile # dump information about ready cluster into readyfile
with open("%s.tmp" % readyf, "w") as f: with open("%s.tmp" % readyf, "w") as f:
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"testing" "testing"
"time" "time"
"lab.nexedi.com/kirr/neo/go/internal/xexec"
"lab.nexedi.com/kirr/neo/go/internal/xtesting" "lab.nexedi.com/kirr/neo/go/internal/xtesting"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" _ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
...@@ -50,7 +51,7 @@ type ZEOSrv interface { ...@@ -50,7 +51,7 @@ type ZEOSrv interface {
// //
// Create it with StartZEOPySrv(fs1path). // Create it with StartZEOPySrv(fs1path).
type ZEOPySrv struct { type ZEOPySrv struct {
pysrv *exec.Cmd // spawned `runzeo -f fs1path` pysrv *xexec.Cmd // spawned `runzeo -f fs1path`
fs1path string // filestorage location fs1path string // filestorage location
opt ZEOPyOptions // options for spawned server opt ZEOPyOptions // options for spawned server
cancel func() // to stop pysrv cancel func() // to stop pysrv
...@@ -78,7 +79,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -78,7 +79,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})} z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})}
z.pysrv = exec.CommandContext(ctx, "python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.Addr()) z.pysrv = xexec.Command("python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.Addr())
z.opt = opt z.opt = opt
msgpack := "" msgpack := ""
if opt.msgpack { if opt.msgpack {
...@@ -88,7 +89,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -88,7 +89,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
z.pysrv.Stdin = nil z.pysrv.Stdin = nil
z.pysrv.Stdout = os.Stdout z.pysrv.Stdout = os.Stdout
z.pysrv.Stderr = os.Stderr z.pysrv.Stderr = os.Stderr
err = z.pysrv.Start() err = z.pysrv.Start(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -44,12 +44,14 @@ def main(args=None): ...@@ -44,12 +44,14 @@ def main(args=None):
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from neo.client.Storage import Storage as NEOStorage from neo.client.Storage import Storage as NEOStorage
if os.path.exists(source): if os.path.exists(source):
""" (remove vvv warning from neo/go test output)
print("WARNING: This is not the recommended way to import data to NEO:" print("WARNING: This is not the recommended way to import data to NEO:"
" you should use the Importer backend instead.\n" " you should use the Importer backend instead.\n"
"NEO also does not implement IStorageRestoreable interface," "NEO also does not implement IStorageRestoreable interface,"
" which means that undo information is not preserved when using" " which means that undo information is not preserved when using"
" this tool: conflict resolution could happen when undoing an" " this tool: conflict resolution could happen when undoing an"
" old transaction.") " old transaction.")
"""
src = FileStorage(file_name=source, read_only=True) src = FileStorage(file_name=source, read_only=True)
dst = NEOStorage(master_nodes=destination, name=cluster, dst = NEOStorage(master_nodes=destination, name=cluster,
logfile=options.logfile) logfile=options.logfile)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment