Commit 4b132aac authored by Chuck Lever's avatar Chuck Lever

tools: Add xdrgen

Add a Python-based tool for translating XDR specifications into XDR
encoder and decoder functions written in the Linux kernel's C coding
style. The generator attempts to match the usual C coding style of
the Linux kernel's SunRPC consumers.

This approach is similar to the netlink code generator in
tools/net/ynl .

The maintainability benefits of machine-generated XDR code include:

- Stronger type checking
- Reduces the number of bugs introduced by human error
- Makes the XDR code easier to audit and analyze
- Enables rapid prototyping of new RPC-based protocols
- Hardens the layering between protocol logic and marshaling
- Makes it easier to add observability on demand
- Unit tests might be built for both the tool and (automatically)
  for the generated code

In addition, converting the XDR layer to use memory-safe languages
such as Rust will be easier if much of the code can be converted
automatically.
Tested-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 45bb63ed
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* This header defines XDR data type primitives specified in
* Section 4 of RFC 4506, used by RPC programs implemented
* in the Linux kernel.
*/
#ifndef _SUNRPC_XDRGEN__BUILTINS_H_
#define _SUNRPC_XDRGEN__BUILTINS_H_
#include <linux/sunrpc/xdr.h>
static inline bool
xdrgen_decode_void(struct xdr_stream *xdr)
{
return true;
}
static inline bool
xdrgen_encode_void(struct xdr_stream *xdr)
{
return true;
}
static inline bool
xdrgen_decode_bool(struct xdr_stream *xdr, bool *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*ptr = (*p != xdr_zero);
return true;
}
static inline bool
xdrgen_encode_bool(struct xdr_stream *xdr, bool val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*p = val ? xdr_one : xdr_zero;
return true;
}
static inline bool
xdrgen_decode_int(struct xdr_stream *xdr, s32 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*ptr = be32_to_cpup(p);
return true;
}
static inline bool
xdrgen_encode_int(struct xdr_stream *xdr, s32 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*p = cpu_to_be32(val);
return true;
}
static inline bool
xdrgen_decode_unsigned_int(struct xdr_stream *xdr, u32 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*ptr = be32_to_cpup(p);
return true;
}
static inline bool
xdrgen_encode_unsigned_int(struct xdr_stream *xdr, u32 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*p = cpu_to_be32(val);
return true;
}
static inline bool
xdrgen_decode_long(struct xdr_stream *xdr, s32 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*ptr = be32_to_cpup(p);
return true;
}
static inline bool
xdrgen_encode_long(struct xdr_stream *xdr, s32 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*p = cpu_to_be32(val);
return true;
}
static inline bool
xdrgen_decode_unsigned_long(struct xdr_stream *xdr, u32 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*ptr = be32_to_cpup(p);
return true;
}
static inline bool
xdrgen_encode_unsigned_long(struct xdr_stream *xdr, u32 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT);
if (unlikely(!p))
return false;
*p = cpu_to_be32(val);
return true;
}
static inline bool
xdrgen_decode_hyper(struct xdr_stream *xdr, s64 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT * 2);
if (unlikely(!p))
return false;
*ptr = get_unaligned_be64(p);
return true;
}
static inline bool
xdrgen_encode_hyper(struct xdr_stream *xdr, s64 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT * 2);
if (unlikely(!p))
return false;
put_unaligned_be64(val, p);
return true;
}
static inline bool
xdrgen_decode_unsigned_hyper(struct xdr_stream *xdr, u64 *ptr)
{
__be32 *p = xdr_inline_decode(xdr, XDR_UNIT * 2);
if (unlikely(!p))
return false;
*ptr = get_unaligned_be64(p);
return true;
}
static inline bool
xdrgen_encode_unsigned_hyper(struct xdr_stream *xdr, u64 val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT * 2);
if (unlikely(!p))
return false;
put_unaligned_be64(val, p);
return true;
}
static inline bool
xdrgen_decode_string(struct xdr_stream *xdr, string *ptr, u32 maxlen)
{
__be32 *p;
u32 len;
if (unlikely(xdr_stream_decode_u32(xdr, &len) != XDR_UNIT))
return false;
if (unlikely(maxlen && len > maxlen))
return false;
if (len != 0) {
p = xdr_inline_decode(xdr, len);
if (unlikely(!p))
return false;
ptr->data = (unsigned char *)p;
}
ptr->len = len;
return true;
}
static inline bool
xdrgen_encode_string(struct xdr_stream *xdr, string val, u32 maxlen)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT + xdr_align_size(val.len));
if (unlikely(!p))
return false;
xdr_encode_opaque(p, val.data, val.len);
return true;
}
static inline bool
xdrgen_decode_opaque(struct xdr_stream *xdr, opaque *ptr, u32 maxlen)
{
__be32 *p;
u32 len;
if (unlikely(xdr_stream_decode_u32(xdr, &len) != XDR_UNIT))
return false;
if (unlikely(maxlen && len > maxlen))
return false;
if (len != 0) {
p = xdr_inline_decode(xdr, len);
if (unlikely(!p))
return false;
ptr->data = (u8 *)p;
}
ptr->len = len;
return true;
}
static inline bool
xdrgen_encode_opaque(struct xdr_stream *xdr, opaque val)
{
__be32 *p = xdr_reserve_space(xdr, XDR_UNIT + xdr_align_size(val.len));
if (unlikely(!p))
return false;
xdr_encode_opaque(p, val.data, val.len);
return true;
}
#endif /* _SUNRPC_XDRGEN__BUILTINS_H_ */
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* This header defines XDR data type primitives specified in
* Section 4 of RFC 4506, used by RPC programs implemented
* in the Linux kernel.
*/
#ifndef _SUNRPC_XDRGEN__DEFS_H_
#define _SUNRPC_XDRGEN__DEFS_H_
#define TRUE (true)
#define FALSE (false)
typedef struct {
u32 len;
unsigned char *data;
} string;
typedef struct {
u32 len;
u8 *data;
} opaque;
#endif /* _SUNRPC_XDRGEN__DEFS_H_ */
__pycache__
generators/__pycache__
xdrgen - Linux Kernel XDR code generator
Introduction
------------
SunRPC programs are typically specified using a language defined by
RFC 4506. In fact, all IETF-published NFS specifications provide a
description of the specified protocol using this language.
Since the 1990's, user space consumers of SunRPC have had access to
a tool that could read such XDR specifications and then generate C
code that implements the RPC portions of that protocol. This tool is
called rpcgen.
This RPC-level code is code that handles input directly from the
network, and thus a high degree of memory safety and sanity checking
is needed to help ensure proper levels of security. Bugs in this
code can have significant impact on security and performance.
However, it is code that is repetitive and tedious to write by hand.
The C code generated by rpcgen makes extensive use of the facilities
of the user space TI-RPC library and libc. Furthermore, the dialect
of the generated code is very traditional K&R C.
The Linux kernel's implementation of SunRPC-based protocols hand-roll
their XDR implementation. There are two main reasons for this:
1. libtirpc (and its predecessors) operate only in user space. The
kernel's RPC implementation and its API are significantly
different than libtirpc.
2. rpcgen-generated code is believed to be less efficient than code
that is hand-written.
These days, gcc and its kin are capable of optimizing code better
than human authors. There are only a few instances where writing
XDR code by hand will make a measurable performance different.
In addition, the current hand-written code in the Linux kernel is
difficult to audit and prove that it implements exactly what is in
the protocol specification.
In order to accrue the benefits of machine-generated XDR code in the
kernel, a tool is needed that will output C code that works against
the kernel's SunRPC implementation rather than libtirpc.
Enter xdrgen.
Dependencies
------------
These dependencies are typically packaged by Linux distributions:
- python3
- python3-lark
- python3-jinja2
These dependencies are available via PyPi:
- pip install 'lark[interegular]'
XDR Specifications
------------------
When adding a new protocol implementation to the kernel, the XDR
specification can be derived by feeding a .txt copy of the RFC to
the script located in tools/net/sunrpc/extract.sh.
$ extract.sh < rfc0001.txt > new2.x
Operation
---------
Once a .x file is available, use xdrgen to generate source and
header files containing an implementation of XDR encoding and
decoding functions for the specified protocol.
$ ./xdrgen definitions new2.x > include/linux/sunrpc/xdrgen/new2.h
$ ./xdrgen declarations new2.x > new2xdr_gen.h
and
$ ./xdrgen source new2.x > new2xdr_gen.c
The files are ready to use for a server-side protocol implementation,
or may be used as a guide for implementing these routines by hand.
By default, the only comments added to this code are kdoc comments
that appear directly in front of the public per-procedure APIs. For
deeper introspection, specifying the "--annotate" flag will insert
additional comments in the generated code to help readers match the
generated code to specific parts of the XDR specification.
Because the generated code is targeted for the Linux kernel, it
is tagged with a GPLv2-only license.
The xdrgen tool can also provide lexical and syntax checking of
an XDR specification:
$ ./xdrgen lint xdr/new.x
How It Works
------------
xdrgen does not use machine learning to generate source code. The
translation is entirely deterministic.
RFC 4506 Section 6 contains a BNF grammar of the XDR specification
language. The grammar has been adapted for use by the Python Lark
module.
The xdr.ebnf file in this directory contains the grammar used to
parse XDR specifications. xdrgen configures Lark using the grammar
in xdr.ebnf. Lark parses the target XDR specification using this
grammar, creating a parse tree.
xdrgen then transforms the parse tree into an abstract syntax tree.
This tree is passed to a series of code generators.
The generators are implemented as Python classes residing in the
generators/ directory. Each generator emits code created from Jinja2
templates stored in the templates/ directory.
The source code is generated in the same order in which they appear
in the specification to ensure the generated code compiles. This
conforms with the behavior of rpcgen.
xdrgen assumes that the generated source code is further compiled by
a compiler that can optimize in a number of ways, including:
- Unused functions are discarded (ie, not added to the executable)
- Aggressive function inlining removes unnecessary stack frames
- Single-arm switch statements are replaced by a single conditional
branch
And so on.
Pragmas
-------
Pragma directives specify exceptions to the normal generation of
encoding and decoding functions. Currently one directive is
implemented: "public".
Pragma exclude
------ -------
pragma exclude <RPC procedure> ;
In some cases, a procedure encoder or decoder function might need
special processing that cannot be automatically generated. The
automatically-generated functions might conflict or interfere with
the hand-rolled function. To avoid editing the generated source code
by hand, a pragma can specify that the procedure's encoder and
decoder functions are not included in the generated header and
source.
For example:
pragma exclude NFSPROC3_READDIRPLUS;
Excludes the decoder function for the READDIRPLUS argument and the
encoder function for the READDIRPLUS result.
Note that because data item encoder and decoder functions are
defined "static __maybe_unused", subsequent compilation
automatically excludes data item encoder and decoder functions that
are used only by excluded procedure.
Pragma header
------ ------
pragma header <string> ;
Provide a name to use for the header file. For example:
pragma header nlm4;
Adds
#include "nlm4xdr_gen.h"
to the generated source file.
Pragma public
------ ------
pragma public <XDR data item> ;
Normally XDR encoder and decoder functions are "static". In case an
implementer wants to call these functions from other source code,
s/he can add a public pragma in the input .x file to indicate a set
of functions that should get a prototype in the generated header,
and the function definitions will not be declared static.
For example:
pragma public nfsstat3;
Adds these prototypes in the generated header:
bool xdrgen_decode_nfsstat3(struct xdr_stream *xdr, enum nfsstat3 *ptr);
bool xdrgen_encode_nfsstat3(struct xdr_stream *xdr, enum nfsstat3 value);
And, in the generated source code, both of these functions appear
without the "static __maybe_unused" modifiers.
Future Work
-----------
Finish implementing XDR pointer and list types.
Generate client-side procedure functions
Expand the README into a user guide similar to rpcgen(1)
Add more pragma directives:
* @pages -- use xdr_read/write_pages() for the specified opaque
field
* @skip -- do not decode, but rather skip, the specified argument
field
Enable something like a #include to dynamically insert the content
of other specification files
Properly support line-by-line pass-through via the "%" decorator
Build a unit test suite for verifying translation of XDR language
into compilable code
Add a command-line option to insert trace_printk call sites in the
generated source code, for improved (temporary) observability
Generate kernel Rust code as well as C code
# SPDX-License-Identifier: GPL-2.0
# Just to make sphinx-apidoc document this directory
# SPDX-License-Identifier: GPL-2.0
"""Define a base code generator class"""
import sys
from jinja2 import Environment, FileSystemLoader, Template
from xdr_ast import _XdrAst, Specification, _RpcProgram, _XdrTypeSpecifier
from xdr_ast import public_apis, pass_by_reference, get_header_name
from xdr_parse import get_xdr_annotate
def create_jinja2_environment(language: str, xdr_type: str) -> Environment:
"""Open a set of templates based on output language"""
match language:
case "C":
environment = Environment(
loader=FileSystemLoader(sys.path[0] + "/templates/C/" + xdr_type + "/"),
trim_blocks=True,
lstrip_blocks=True,
)
environment.globals["annotate"] = get_xdr_annotate()
environment.globals["public_apis"] = public_apis
environment.globals["pass_by_reference"] = pass_by_reference
return environment
case _:
raise NotImplementedError("Language not supported")
def get_jinja2_template(
environment: Environment, template_type: str, template_name: str
) -> Template:
"""Retrieve a Jinja2 template for emitting source code"""
return environment.get_template(template_type + "/" + template_name + ".j2")
def find_xdr_program_name(root: Specification) -> str:
"""Retrieve the RPC program name from an abstract syntax tree"""
raw_name = get_header_name()
if raw_name != "none":
return raw_name.lower()
for definition in root.definitions:
if isinstance(definition.value, _RpcProgram):
raw_name = definition.value.name
return raw_name.lower().removesuffix("_program").removesuffix("_prog")
return "noprog"
def header_guard_infix(filename: str) -> str:
"""Extract the header guard infix from the specification filename"""
basename = filename.split("/")[-1]
program = basename.replace(".x", "")
return program.upper()
def kernel_c_type(spec: _XdrTypeSpecifier) -> str:
"""Return name of C type"""
builtin_native_c_type = {
"bool": "bool",
"int": "s32",
"unsigned_int": "u32",
"long": "s32",
"unsigned_long": "u32",
"hyper": "s64",
"unsigned_hyper": "u64",
}
if spec.type_name in builtin_native_c_type:
return builtin_native_c_type[spec.type_name]
return spec.type_name
class Boilerplate:
"""Base class to generate boilerplate for source files"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
raise NotImplementedError("No language support defined")
def emit_declaration(self, filename: str, root: Specification) -> None:
"""Emit declaration header boilerplate"""
raise NotImplementedError("Header boilerplate generation not supported")
def emit_definition(self, filename: str, root: Specification) -> None:
"""Emit definition header boilerplate"""
raise NotImplementedError("Header boilerplate generation not supported")
def emit_source(self, filename: str, root: Specification) -> None:
"""Emit generic source code for this XDR type"""
raise NotImplementedError("Source boilerplate generation not supported")
class SourceGenerator:
"""Base class to generate header and source code for XDR types"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
raise NotImplementedError("No language support defined")
def emit_declaration(self, node: _XdrAst) -> None:
"""Emit one function declaration for this XDR type"""
raise NotImplementedError("Declaration generation not supported")
def emit_decoder(self, node: _XdrAst) -> None:
"""Emit one decoder function for this XDR type"""
raise NotImplementedError("Decoder generation not supported")
def emit_definition(self, node: _XdrAst) -> None:
"""Emit one definition for this XDR type"""
raise NotImplementedError("Definition generation not supported")
def emit_encoder(self, node: _XdrAst) -> None:
"""Emit one encoder function for this XDR type"""
raise NotImplementedError("Encoder generation not supported")
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR constants"""
from generators import SourceGenerator, create_jinja2_environment
from xdr_ast import _XdrConstant
class XdrConstantGenerator(SourceGenerator):
"""Generate source code for XDR constants"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "constants")
self.peer = peer
def emit_definition(self, node: _XdrConstant) -> None:
"""Emit one definition for a constant"""
template = self.environment.get_template("definition.j2")
print(template.render(name=node.name, value=node.value))
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR enum types"""
from generators import SourceGenerator, create_jinja2_environment
from xdr_ast import _XdrEnum, public_apis
class XdrEnumGenerator(SourceGenerator):
"""Generate source code for XDR enum types"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "enum")
self.peer = peer
def emit_declaration(self, node: _XdrEnum) -> None:
"""Emit one declaration pair for an XDR enum type"""
if node.name in public_apis:
template = self.environment.get_template("declaration/close.j2")
print(template.render(name=node.name))
def emit_definition(self, node: _XdrEnum) -> None:
"""Emit one definition for an XDR enum type"""
template = self.environment.get_template("definition/open.j2")
print(template.render(name=node.name))
template = self.environment.get_template("definition/enumerator.j2")
for enumerator in node.enumerators:
print(template.render(name=enumerator.name, value=enumerator.value))
template = self.environment.get_template("definition/close.j2")
print(template.render(name=node.name))
def emit_decoder(self, node: _XdrEnum) -> None:
"""Emit one decoder function for an XDR enum type"""
template = self.environment.get_template("decoder/enum.j2")
print(template.render(name=node.name))
def emit_encoder(self, node: _XdrEnum) -> None:
"""Emit one encoder function for an XDR enum type"""
template = self.environment.get_template("encoder/enum.j2")
print(template.render(name=node.name))
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate header bottom boilerplate"""
import os.path
import time
from generators import Boilerplate, header_guard_infix
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import Specification
class XdrHeaderBottomGenerator(Boilerplate):
"""Generate header boilerplate"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "header_bottom")
self.peer = peer
def emit_declaration(self, filename: str, root: Specification) -> None:
"""Emit the bottom header guard"""
template = get_jinja2_template(self.environment, "declaration", "header")
print(template.render(infix=header_guard_infix(filename)))
def emit_definition(self, filename: str, root: Specification) -> None:
"""Emit the bottom header guard"""
template = get_jinja2_template(self.environment, "definition", "header")
print(template.render(infix=header_guard_infix(filename)))
def emit_source(self, filename: str, root: Specification) -> None:
pass
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate header top boilerplate"""
import os.path
import time
from generators import Boilerplate, header_guard_infix
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import Specification
class XdrHeaderTopGenerator(Boilerplate):
"""Generate header boilerplate"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "header_top")
self.peer = peer
def emit_declaration(self, filename: str, root: Specification) -> None:
"""Emit the top header guard"""
template = get_jinja2_template(self.environment, "declaration", "header")
print(
template.render(
infix=header_guard_infix(filename),
filename=filename,
mtime=time.ctime(os.path.getmtime(filename)),
)
)
def emit_definition(self, filename: str, root: Specification) -> None:
"""Emit the top header guard"""
template = get_jinja2_template(self.environment, "definition", "header")
print(
template.render(
infix=header_guard_infix(filename),
filename=filename,
mtime=time.ctime(os.path.getmtime(filename)),
)
)
def emit_source(self, filename: str, root: Specification) -> None:
pass
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR pointer types"""
from jinja2 import Environment
from generators import SourceGenerator, kernel_c_type
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import _XdrBasic, _XdrVariableLengthString
from xdr_ast import _XdrFixedLengthOpaque, _XdrVariableLengthOpaque
from xdr_ast import _XdrFixedLengthArray, _XdrVariableLengthArray
from xdr_ast import _XdrOptionalData, _XdrPointer, _XdrDeclaration
from xdr_ast import public_apis
def emit_pointer_declaration(environment: Environment, node: _XdrPointer) -> None:
"""Emit a declaration pair for an XDR pointer type"""
if node.name in public_apis:
template = get_jinja2_template(environment, "declaration", "close")
print(template.render(name=node.name))
def emit_pointer_member_definition(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit a definition for one field in an XDR struct"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "definition", field.template)
print(template.render(name=field.name))
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "definition", field.template)
print(template.render(name=field.name))
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
def emit_pointer_definition(environment: Environment, node: _XdrPointer) -> None:
"""Emit a definition for an XDR pointer type"""
template = get_jinja2_template(environment, "definition", "open")
print(template.render(name=node.name))
for field in node.fields[0:-1]:
emit_pointer_member_definition(environment, field)
template = get_jinja2_template(environment, "definition", "close")
print(template.render(name=node.name))
def emit_pointer_member_decoder(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit a decoder for one field in an XDR pointer"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
size=field.size,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
maxsize=field.maxsize,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
def emit_pointer_decoder(environment: Environment, node: _XdrPointer) -> None:
"""Emit one decoder function for an XDR pointer type"""
template = get_jinja2_template(environment, "decoder", "open")
print(template.render(name=node.name))
for field in node.fields[0:-1]:
emit_pointer_member_decoder(environment, field)
template = get_jinja2_template(environment, "decoder", "close")
print(template.render())
def emit_pointer_member_encoder(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit an encoder for one field in a XDR pointer"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
def emit_pointer_encoder(environment: Environment, node: _XdrPointer) -> None:
"""Emit one encoder function for an XDR pointer type"""
template = get_jinja2_template(environment, "encoder", "open")
print(template.render(name=node.name))
for field in node.fields[0:-1]:
emit_pointer_member_encoder(environment, field)
template = get_jinja2_template(environment, "encoder", "close")
print(template.render())
class XdrPointerGenerator(SourceGenerator):
"""Generate source code for XDR pointer"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "pointer")
self.peer = peer
def emit_declaration(self, node: _XdrPointer) -> None:
"""Emit one declaration pair for an XDR pointer type"""
emit_pointer_declaration(self.environment, node)
def emit_definition(self, node: _XdrPointer) -> None:
"""Emit one declaration for an XDR pointer type"""
emit_pointer_definition(self.environment, node)
def emit_decoder(self, node: _XdrPointer) -> None:
"""Emit one decoder function for an XDR pointer type"""
emit_pointer_decoder(self.environment, node)
def emit_encoder(self, node: _XdrPointer) -> None:
"""Emit one encoder function for an XDR pointer type"""
emit_pointer_encoder(self.environment, node)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code for an RPC program's procedures"""
from jinja2 import Environment
from generators import SourceGenerator, create_jinja2_environment
from xdr_ast import _RpcProgram, _RpcVersion, excluded_apis
def emit_version_definitions(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit procedure numbers for each RPC version's procedures"""
template = environment.get_template("definition/open.j2")
print(template.render(program=program.upper()))
template = environment.get_template("definition/procedure.j2")
for procedure in version.procedures:
if procedure.name not in excluded_apis:
print(
template.render(
name=procedure.name,
value=procedure.number,
)
)
template = environment.get_template("definition/close.j2")
print(template.render())
def emit_version_declarations(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit declarations for each RPC version's procedures"""
arguments = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
arguments.add(procedure.argument.type_name)
if len(arguments) > 0:
print("")
template = environment.get_template("declaration/argument.j2")
for argument in arguments:
print(template.render(program=program, argument=argument))
results = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
results.add(procedure.result.type_name)
if len(results) > 0:
print("")
template = environment.get_template("declaration/result.j2")
for result in results:
print(template.render(program=program, result=result))
def emit_version_argument_decoders(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit server argument decoders for each RPC version's procedures"""
arguments = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
arguments.add(procedure.argument.type_name)
template = environment.get_template("decoder/argument.j2")
for argument in arguments:
print(template.render(program=program, argument=argument))
def emit_version_result_decoders(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit client result decoders for each RPC version's procedures"""
results = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
results.add(procedure.result.type_name)
template = environment.get_template("decoder/result.j2")
for result in results:
print(template.render(program=program, result=result))
def emit_version_argument_encoders(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit client argument encoders for each RPC version's procedures"""
arguments = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
arguments.add(procedure.argument.type_name)
template = environment.get_template("encoder/argument.j2")
for argument in arguments:
print(template.render(program=program, argument=argument))
def emit_version_result_encoders(
environment: Environment, program: str, version: _RpcVersion
) -> None:
"""Emit server result encoders for each RPC version's procedures"""
results = set()
for procedure in version.procedures:
if procedure.name not in excluded_apis:
results.add(procedure.result.type_name)
template = environment.get_template("encoder/result.j2")
for result in results:
print(template.render(program=program, result=result))
class XdrProgramGenerator(SourceGenerator):
"""Generate source code for an RPC program's procedures"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "program")
self.peer = peer
def emit_definition(self, node: _RpcProgram) -> None:
"""Emit procedure numbers for each of an RPC programs's procedures"""
raw_name = node.name
program = raw_name.lower().removesuffix("_program").removesuffix("_prog")
for version in node.versions:
emit_version_definitions(self.environment, program, version)
def emit_declaration(self, node: _RpcProgram) -> None:
"""Emit a declaration pair for each of an RPC programs's procedures"""
raw_name = node.name
program = raw_name.lower().removesuffix("_program").removesuffix("_prog")
for version in node.versions:
emit_version_declarations(self.environment, program, version)
def emit_decoder(self, node: _RpcProgram) -> None:
"""Emit all decoder functions for an RPC program's procedures"""
raw_name = node.name
program = raw_name.lower().removesuffix("_program").removesuffix("_prog")
match self.peer:
case "server":
for version in node.versions:
emit_version_argument_decoders(
self.environment, program, version,
)
case "client":
for version in node.versions:
emit_version_result_decoders(
self.environment, program, version,
)
def emit_encoder(self, node: _RpcProgram) -> None:
"""Emit all encoder functions for an RPC program's procedures"""
raw_name = node.name
program = raw_name.lower().removesuffix("_program").removesuffix("_prog")
match self.peer:
case "server":
for version in node.versions:
emit_version_result_encoders(
self.environment, program, version,
)
case "client":
for version in node.versions:
emit_version_argument_encoders(
self.environment, program, version,
)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate source code boilerplate"""
import os.path
import time
from generators import Boilerplate
from generators import find_xdr_program_name, create_jinja2_environment
from xdr_ast import _RpcProgram, Specification, get_header_name
class XdrSourceTopGenerator(Boilerplate):
"""Generate source code boilerplate"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "source_top")
self.peer = peer
def emit_source(self, filename: str, root: Specification) -> None:
"""Emit the top source boilerplate"""
name = find_xdr_program_name(root)
template = self.environment.get_template(self.peer + ".j2")
print(
template.render(
program=name,
filename=filename,
mtime=time.ctime(os.path.getmtime(filename)),
)
)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR struct types"""
from jinja2 import Environment
from generators import SourceGenerator, kernel_c_type
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import _XdrBasic, _XdrVariableLengthString
from xdr_ast import _XdrFixedLengthOpaque, _XdrVariableLengthOpaque
from xdr_ast import _XdrFixedLengthArray, _XdrVariableLengthArray
from xdr_ast import _XdrOptionalData, _XdrStruct, _XdrDeclaration
from xdr_ast import public_apis
def emit_struct_declaration(environment: Environment, node: _XdrStruct) -> None:
"""Emit one declaration pair for an XDR struct type"""
if node.name in public_apis:
template = get_jinja2_template(environment, "declaration", "close")
print(template.render(name=node.name))
def emit_struct_member_definition(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit a definition for one field in an XDR struct"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "definition", field.template)
print(template.render(name=field.name))
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "definition", field.template)
print(template.render(name=field.name))
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "definition", field.template)
print(
template.render(
name=field.name,
type=kernel_c_type(field.spec),
classifier=field.spec.c_classifier,
)
)
def emit_struct_definition(environment: Environment, node: _XdrStruct) -> None:
"""Emit one definition for an XDR struct type"""
template = get_jinja2_template(environment, "definition", "open")
print(template.render(name=node.name))
for field in node.fields:
emit_struct_member_definition(environment, field)
template = get_jinja2_template(environment, "definition", "close")
print(template.render(name=node.name))
def emit_struct_member_decoder(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit a decoder for one field in an XDR struct"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
size=field.size,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
maxsize=field.maxsize,
classifier=field.spec.c_classifier,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "decoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
def emit_struct_decoder(environment: Environment, node: _XdrStruct) -> None:
"""Emit one decoder function for an XDR struct type"""
template = get_jinja2_template(environment, "decoder", "open")
print(template.render(name=node.name))
for field in node.fields:
emit_struct_member_decoder(environment, field)
template = get_jinja2_template(environment, "decoder", "close")
print(template.render())
def emit_struct_member_encoder(
environment: Environment, field: _XdrDeclaration
) -> None:
"""Emit an encoder for one field in an XDR struct"""
if isinstance(field, _XdrBasic):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
)
)
elif isinstance(field, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrVariableLengthString):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
size=field.size,
)
)
elif isinstance(field, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
maxsize=field.maxsize,
)
)
elif isinstance(field, _XdrOptionalData):
template = get_jinja2_template(environment, "encoder", field.template)
print(
template.render(
name=field.name,
type=field.spec.type_name,
classifier=field.spec.c_classifier,
)
)
def emit_struct_encoder(environment: Environment, node: _XdrStruct) -> None:
"""Emit one encoder function for an XDR struct type"""
template = get_jinja2_template(environment, "encoder", "open")
print(template.render(name=node.name))
for field in node.fields:
emit_struct_member_encoder(environment, field)
template = get_jinja2_template(environment, "encoder", "close")
print(template.render())
class XdrStructGenerator(SourceGenerator):
"""Generate source code for XDR structs"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "struct")
self.peer = peer
def emit_declaration(self, node: _XdrStruct) -> None:
"""Emit one declaration pair for an XDR struct type"""
emit_struct_declaration(self.environment, node)
def emit_definition(self, node: _XdrStruct) -> None:
"""Emit one definition for an XDR struct type"""
emit_struct_definition(self.environment, node)
def emit_decoder(self, node: _XdrStruct) -> None:
"""Emit one decoder function for an XDR struct type"""
emit_struct_decoder(self.environment, node)
def emit_encoder(self, node: _XdrStruct) -> None:
"""Emit one encoder function for an XDR struct type"""
emit_struct_encoder(self.environment, node)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR typedefs"""
from jinja2 import Environment
from generators import SourceGenerator, kernel_c_type
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import _XdrBasic, _XdrTypedef, _XdrVariableLengthString
from xdr_ast import _XdrFixedLengthOpaque, _XdrVariableLengthOpaque
from xdr_ast import _XdrFixedLengthArray, _XdrVariableLengthArray
from xdr_ast import _XdrOptionalData, _XdrVoid, _XdrDeclaration
from xdr_ast import public_apis
def emit_typedef_declaration(environment: Environment, node: _XdrDeclaration) -> None:
"""Emit a declaration pair for one XDR typedef"""
if node.name not in public_apis:
return
if isinstance(node, _XdrBasic):
template = get_jinja2_template(environment, "declaration", node.template)
print(
template.render(
name=node.name,
type=kernel_c_type(node.spec),
classifier=node.spec.c_classifier,
)
)
elif isinstance(node, _XdrVariableLengthString):
template = get_jinja2_template(environment, "declaration", node.template)
print(template.render(name=node.name))
elif isinstance(node, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "declaration", node.template)
print(template.render(name=node.name, size=node.size))
elif isinstance(node, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "declaration", node.template)
print(template.render(name=node.name))
elif isinstance(node, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "declaration", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
size=node.size,
)
)
elif isinstance(node, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "declaration", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
classifier=node.spec.c_classifier,
)
)
elif isinstance(node, _XdrOptionalData):
raise NotImplementedError("<optional_data> typedef not yet implemented")
elif isinstance(node, _XdrVoid):
raise NotImplementedError("<void> typedef not yet implemented")
else:
raise NotImplementedError("typedef: type not recognized")
def emit_type_definition(environment: Environment, node: _XdrDeclaration) -> None:
"""Emit a definition for one XDR typedef"""
if isinstance(node, _XdrBasic):
template = get_jinja2_template(environment, "definition", node.template)
print(
template.render(
name=node.name,
type=kernel_c_type(node.spec),
classifier=node.spec.c_classifier,
)
)
elif isinstance(node, _XdrVariableLengthString):
template = get_jinja2_template(environment, "definition", node.template)
print(template.render(name=node.name))
elif isinstance(node, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "definition", node.template)
print(template.render(name=node.name, size=node.size))
elif isinstance(node, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "definition", node.template)
print(template.render(name=node.name))
elif isinstance(node, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "definition", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
size=node.size,
)
)
elif isinstance(node, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "definition", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
classifier=node.spec.c_classifier,
)
)
elif isinstance(node, _XdrOptionalData):
raise NotImplementedError("<optional_data> typedef not yet implemented")
elif isinstance(node, _XdrVoid):
raise NotImplementedError("<void> typedef not yet implemented")
else:
raise NotImplementedError("typedef: type not recognized")
def emit_typedef_decoder(environment: Environment, node: _XdrDeclaration) -> None:
"""Emit a decoder function for one XDR typedef"""
if isinstance(node, _XdrBasic):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
)
)
elif isinstance(node, _XdrVariableLengthString):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
size=node.size,
)
)
elif isinstance(node, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
size=node.size,
classifier=node.spec.c_classifier,
)
)
elif isinstance(node, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "decoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrOptionalData):
raise NotImplementedError("<optional_data> typedef not yet implemented")
elif isinstance(node, _XdrVoid):
raise NotImplementedError("<void> typedef not yet implemented")
else:
raise NotImplementedError("typedef: type not recognized")
def emit_typedef_encoder(environment: Environment, node: _XdrDeclaration) -> None:
"""Emit an encoder function for one XDR typedef"""
if isinstance(node, _XdrBasic):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
)
)
elif isinstance(node, _XdrVariableLengthString):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrFixedLengthOpaque):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
size=node.size,
)
)
elif isinstance(node, _XdrVariableLengthOpaque):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrFixedLengthArray):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
size=node.size,
)
)
elif isinstance(node, _XdrVariableLengthArray):
template = get_jinja2_template(environment, "encoder", node.template)
print(
template.render(
name=node.name,
type=node.spec.type_name,
maxsize=node.maxsize,
)
)
elif isinstance(node, _XdrOptionalData):
raise NotImplementedError("<optional_data> typedef not yet implemented")
elif isinstance(node, _XdrVoid):
raise NotImplementedError("<void> typedef not yet implemented")
else:
raise NotImplementedError("typedef: type not recognized")
class XdrTypedefGenerator(SourceGenerator):
"""Generate source code for XDR typedefs"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "typedef")
self.peer = peer
def emit_declaration(self, node: _XdrTypedef) -> None:
"""Emit one declaration pair for an XDR enum type"""
emit_typedef_declaration(self.environment, node.declaration)
def emit_definition(self, node: _XdrTypedef) -> None:
"""Emit one definition for an XDR typedef"""
emit_type_definition(self.environment, node.declaration)
def emit_decoder(self, node: _XdrTypedef) -> None:
"""Emit one decoder function for an XDR typedef"""
emit_typedef_decoder(self.environment, node.declaration)
def emit_encoder(self, node: _XdrTypedef) -> None:
"""Emit one encoder function for an XDR typedef"""
emit_typedef_encoder(self.environment, node.declaration)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Generate code to handle XDR unions"""
from jinja2 import Environment
from generators import SourceGenerator
from generators import create_jinja2_environment, get_jinja2_template
from xdr_ast import _XdrBasic, _XdrUnion, _XdrVoid
from xdr_ast import _XdrDeclaration, _XdrCaseSpec, public_apis
def emit_union_declaration(environment: Environment, node: _XdrUnion) -> None:
"""Emit one declaration pair for an XDR union type"""
if node.name in public_apis:
template = get_jinja2_template(environment, "declaration", "close")
print(template.render(name=node.name))
def emit_union_switch_spec_definition(
environment: Environment, node: _XdrDeclaration
) -> None:
"""Emit a definition for an XDR union's discriminant"""
assert isinstance(node, _XdrBasic)
template = get_jinja2_template(environment, "definition", "switch_spec")
print(
template.render(
name=node.name,
type=node.spec.type_name,
classifier=node.spec.c_classifier,
)
)
def emit_union_case_spec_definition(
environment: Environment, node: _XdrDeclaration
) -> None:
"""Emit a definition for an XDR union's case arm"""
if isinstance(node.arm, _XdrVoid):
return
assert isinstance(node.arm, _XdrBasic)
template = get_jinja2_template(environment, "definition", "case_spec")
print(
template.render(
name=node.arm.name,
type=node.arm.spec.type_name,
classifier=node.arm.spec.c_classifier,
)
)
def emit_union_definition(environment: Environment, node: _XdrUnion) -> None:
"""Emit one XDR union definition"""
template = get_jinja2_template(environment, "definition", "open")
print(template.render(name=node.name))
emit_union_switch_spec_definition(environment, node.discriminant)
for case in node.cases:
emit_union_case_spec_definition(environment, case)
if node.default is not None:
emit_union_case_spec_definition(environment, node.default)
template = get_jinja2_template(environment, "definition", "close")
print(template.render(name=node.name))
def emit_union_switch_spec_decoder(
environment: Environment, node: _XdrDeclaration
) -> None:
"""Emit a decoder for an XDR union's discriminant"""
assert isinstance(node, _XdrBasic)
template = get_jinja2_template(environment, "decoder", "switch_spec")
print(template.render(name=node.name, type=node.spec.type_name))
def emit_union_case_spec_decoder(environment: Environment, node: _XdrCaseSpec) -> None:
"""Emit decoder functions for an XDR union's case arm"""
if isinstance(node.arm, _XdrVoid):
return
template = get_jinja2_template(environment, "decoder", "case_spec")
for case in node.values:
print(template.render(case=case))
assert isinstance(node.arm, _XdrBasic)
template = get_jinja2_template(environment, "decoder", node.arm.template)
print(
template.render(
name=node.arm.name,
type=node.arm.spec.type_name,
classifier=node.arm.spec.c_classifier,
)
)
template = get_jinja2_template(environment, "decoder", "break")
print(template.render())
def emit_union_default_spec_decoder(environment: Environment, node: _XdrUnion) -> None:
"""Emit a decoder function for an XDR union's default arm"""
default_case = node.default
# Avoid a gcc warning about a default case with boolean discriminant
if default_case is None and node.discriminant.spec.type_name == "bool":
return
template = get_jinja2_template(environment, "decoder", "default_spec")
print(template.render())
if default_case is None or isinstance(default_case.arm, _XdrVoid):
template = get_jinja2_template(environment, "decoder", "break")
print(template.render())
return
assert isinstance(default_case.arm, _XdrBasic)
template = get_jinja2_template(environment, "decoder", default_case.arm.template)
print(
template.render(
name=default_case.arm.name,
type=default_case.arm.spec.type_name,
classifier=default_case.arm.spec.c_classifier,
)
)
def emit_union_decoder(environment: Environment, node: _XdrUnion) -> None:
"""Emit one XDR union decoder"""
template = get_jinja2_template(environment, "decoder", "open")
print(template.render(name=node.name))
emit_union_switch_spec_decoder(environment, node.discriminant)
for case in node.cases:
emit_union_case_spec_decoder(environment, case)
emit_union_default_spec_decoder(environment, node)
template = get_jinja2_template(environment, "decoder", "close")
print(template.render())
def emit_union_switch_spec_encoder(
environment: Environment, node: _XdrDeclaration
) -> None:
"""Emit an encoder for an XDR union's discriminant"""
assert isinstance(node, _XdrBasic)
template = get_jinja2_template(environment, "encoder", "switch_spec")
print(template.render(name=node.name, type=node.spec.type_name))
def emit_union_case_spec_encoder(environment: Environment, node: _XdrCaseSpec) -> None:
"""Emit encoder functions for an XDR union's case arm"""
if isinstance(node.arm, _XdrVoid):
return
template = get_jinja2_template(environment, "encoder", "case_spec")
for case in node.values:
print(template.render(case=case))
assert isinstance(node.arm, _XdrBasic)
template = get_jinja2_template(environment, "encoder", node.arm.template)
print(
template.render(
name=node.arm.name,
type=node.arm.spec.type_name,
)
)
template = get_jinja2_template(environment, "encoder", "break")
print(template.render())
def emit_union_default_spec_encoder(environment: Environment, node: _XdrUnion) -> None:
"""Emit an encoder function for an XDR union's default arm"""
default_case = node.default
# Avoid a gcc warning about a default case with boolean discriminant
if default_case is None and node.discriminant.spec.type_name == "bool":
return
template = get_jinja2_template(environment, "encoder", "default_spec")
print(template.render())
if default_case is None or isinstance(default_case.arm, _XdrVoid):
template = get_jinja2_template(environment, "encoder", "break")
print(template.render())
return
assert isinstance(default_case.arm, _XdrBasic)
template = get_jinja2_template(environment, "encoder", default_case.arm.template)
print(
template.render(
name=default_case.arm.name,
type=default_case.arm.spec.type_name,
)
)
def emit_union_encoder(environment, node: _XdrUnion) -> None:
"""Emit one XDR union encoder"""
template = get_jinja2_template(environment, "encoder", "open")
print(template.render(name=node.name))
emit_union_switch_spec_encoder(environment, node.discriminant)
for case in node.cases:
emit_union_case_spec_encoder(environment, case)
emit_union_default_spec_encoder(environment, node)
template = get_jinja2_template(environment, "encoder", "close")
print(template.render())
class XdrUnionGenerator(SourceGenerator):
"""Generate source code for XDR unions"""
def __init__(self, language: str, peer: str):
"""Initialize an instance of this class"""
self.environment = create_jinja2_environment(language, "union")
self.peer = peer
def emit_declaration(self, node: _XdrUnion) -> None:
"""Emit one declaration pair for an XDR union"""
emit_union_declaration(self.environment, node)
def emit_definition(self, node: _XdrUnion) -> None:
"""Emit one definition for an XDR union"""
emit_union_definition(self.environment, node)
def emit_decoder(self, node: _XdrUnion) -> None:
"""Emit one decoder function for an XDR union"""
emit_union_decoder(self.environment, node)
def emit_encoder(self, node: _XdrUnion) -> None:
"""Emit one encoder function for an XDR union"""
emit_union_encoder(self.environment, node)
// A Lark grammar for the XDR specification language based on
// https://tools.ietf.org/html/rfc4506 Section 6.3
declaration : "opaque" identifier "[" value "]" -> fixed_length_opaque
| "opaque" identifier "<" [ value ] ">" -> variable_length_opaque
| "string" identifier "<" [ value ] ">" -> variable_length_string
| type_specifier identifier "[" value "]" -> fixed_length_array
| type_specifier identifier "<" [ value ] ">" -> variable_length_array
| type_specifier "*" identifier -> optional_data
| type_specifier identifier -> basic
| "void" -> void
value : decimal_constant
| hexadecimal_constant
| octal_constant
| identifier
constant : decimal_constant | hexadecimal_constant | octal_constant
type_specifier : unsigned_hyper
| unsigned_long
| unsigned_int
| hyper
| long
| int
| float
| double
| quadruple
| bool
| enum_type_spec
| struct_type_spec
| union_type_spec
| identifier
unsigned_hyper : "unsigned" "hyper"
unsigned_long : "unsigned" "long"
unsigned_int : "unsigned" "int"
hyper : "hyper"
long : "long"
int : "int"
float : "float"
double : "double"
quadruple : "quadruple"
bool : "bool"
enum_type_spec : "enum" enum_body
enum_body : "{" ( identifier "=" value ) ( "," identifier "=" value )* "}"
struct_type_spec : "struct" struct_body
struct_body : "{" ( declaration ";" )+ "}"
union_type_spec : "union" union_body
union_body : switch_spec "{" case_spec+ [ default_spec ] "}"
switch_spec : "switch" "(" declaration ")"
case_spec : ( "case" value ":" )+ declaration ";"
default_spec : "default" ":" declaration ";"
constant_def : "const" identifier "=" value ";"
type_def : "typedef" declaration ";" -> typedef
| "enum" identifier enum_body ";" -> enum
| "struct" identifier struct_body ";" -> struct
| "union" identifier union_body ";" -> union
specification : definition*
definition : constant_def
| type_def
| program_def
| pragma_def
//
// RPC program definitions not specified in RFC 4506
//
program_def : "program" identifier "{" version_def+ "}" "=" constant ";"
version_def : "version" identifier "{" procedure_def+ "}" "=" constant ";"
procedure_def : type_specifier identifier "(" type_specifier ")" "=" constant ";"
pragma_def : "pragma" directive identifier [ identifier ] ";"
directive : exclude_directive
| header_directive
| pages_directive
| public_directive
| skip_directive
exclude_directive : "exclude"
header_directive : "header"
pages_directive : "pages"
public_directive : "public"
skip_directive : "skip"
//
// XDR language primitives
//
identifier : /([a-z]|[A-Z])(_|[a-z]|[A-Z]|[0-9])*/
decimal_constant : /[\+-]?(0|[1-9][0-9]*)/
hexadecimal_constant : /0x([a-f]|[A-F]|[0-9])+/
octal_constant : /0[0-7]+/
PASSTHRU : "%" | "%" /.+/
%ignore PASSTHRU
%import common.C_COMMENT
%ignore C_COMMENT
%import common.WS
%ignore WS
# SPDX-License-Identifier: GPL-2.0
# Just to make sphinx-apidoc document this directory
#!/usr/bin/env python3
# ex: set filetype=python:
"""Translate an XDR specification into executable code that
can be compiled for the Linux kernel."""
import logging
from argparse import Namespace
from lark import logger
from lark.exceptions import UnexpectedInput
from generators.constant import XdrConstantGenerator
from generators.enum import XdrEnumGenerator
from generators.header_bottom import XdrHeaderBottomGenerator
from generators.header_top import XdrHeaderTopGenerator
from generators.pointer import XdrPointerGenerator
from generators.program import XdrProgramGenerator
from generators.typedef import XdrTypedefGenerator
from generators.struct import XdrStructGenerator
from generators.union import XdrUnionGenerator
from xdr_ast import transform_parse_tree, _RpcProgram, Specification
from xdr_ast import _XdrConstant, _XdrEnum, _XdrPointer
from xdr_ast import _XdrTypedef, _XdrStruct, _XdrUnion
from xdr_parse import xdr_parser, set_xdr_annotate
logger.setLevel(logging.INFO)
def emit_header_declarations(
root: Specification, language: str, peer: str
) -> None:
"""Emit header declarations"""
for definition in root.definitions:
if isinstance(definition.value, _XdrEnum):
gen = XdrEnumGenerator(language, peer)
elif isinstance(definition.value, _XdrPointer):
gen = XdrPointerGenerator(language, peer)
elif isinstance(definition.value, _XdrTypedef):
gen = XdrTypedefGenerator(language, peer)
elif isinstance(definition.value, _XdrStruct):
gen = XdrStructGenerator(language, peer)
elif isinstance(definition.value, _XdrUnion):
gen = XdrUnionGenerator(language, peer)
elif isinstance(definition.value, _RpcProgram):
gen = XdrProgramGenerator(language, peer)
else:
continue
gen.emit_declaration(definition.value)
def handle_parse_error(e: UnexpectedInput) -> bool:
"""Simple parse error reporting, no recovery attempted"""
print(e)
return True
def subcmd(args: Namespace) -> int:
"""Generate definitions and declarations"""
set_xdr_annotate(args.annotate)
parser = xdr_parser()
with open(args.filename, encoding="utf-8") as f:
parse_tree = parser.parse(f.read(), on_error=handle_parse_error)
ast = transform_parse_tree(parse_tree)
gen = XdrHeaderTopGenerator(args.language, args.peer)
gen.emit_declaration(args.filename, ast)
emit_header_declarations(ast, args.language, args.peer)
gen = XdrHeaderBottomGenerator(args.language, args.peer)
gen.emit_declaration(args.filename, ast)
return 0
#!/usr/bin/env python3
# ex: set filetype=python:
"""Translate an XDR specification into executable code that
can be compiled for the Linux kernel."""
import logging
from argparse import Namespace
from lark import logger
from lark.exceptions import UnexpectedInput
from generators.constant import XdrConstantGenerator
from generators.enum import XdrEnumGenerator
from generators.header_bottom import XdrHeaderBottomGenerator
from generators.header_top import XdrHeaderTopGenerator
from generators.pointer import XdrPointerGenerator
from generators.program import XdrProgramGenerator
from generators.typedef import XdrTypedefGenerator
from generators.struct import XdrStructGenerator
from generators.union import XdrUnionGenerator
from xdr_ast import transform_parse_tree, Specification
from xdr_ast import _RpcProgram, _XdrConstant, _XdrEnum, _XdrPointer
from xdr_ast import _XdrTypedef, _XdrStruct, _XdrUnion
from xdr_parse import xdr_parser, set_xdr_annotate
logger.setLevel(logging.INFO)
def emit_header_definitions(
root: Specification, language: str, peer: str
) -> None:
"""Emit header definitions"""
for definition in root.definitions:
if isinstance(definition.value, _XdrConstant):
gen = XdrConstantGenerator(language, peer)
elif isinstance(definition.value, _XdrEnum):
gen = XdrEnumGenerator(language, peer)
elif isinstance(definition.value, _XdrPointer):
gen = XdrPointerGenerator(language, peer)
elif isinstance(definition.value, _RpcProgram):
gen = XdrProgramGenerator(language, peer)
elif isinstance(definition.value, _XdrTypedef):
gen = XdrTypedefGenerator(language, peer)
elif isinstance(definition.value, _XdrStruct):
gen = XdrStructGenerator(language, peer)
elif isinstance(definition.value, _XdrUnion):
gen = XdrUnionGenerator(language, peer)
else:
continue
gen.emit_definition(definition.value)
def handle_parse_error(e: UnexpectedInput) -> bool:
"""Simple parse error reporting, no recovery attempted"""
print(e)
return True
def subcmd(args: Namespace) -> int:
"""Generate definitions"""
set_xdr_annotate(args.annotate)
parser = xdr_parser()
with open(args.filename, encoding="utf-8") as f:
parse_tree = parser.parse(f.read(), on_error=handle_parse_error)
ast = transform_parse_tree(parse_tree)
gen = XdrHeaderTopGenerator(args.language, args.peer)
gen.emit_definition(args.filename, ast)
emit_header_definitions(ast, args.language, args.peer)
gen = XdrHeaderBottomGenerator(args.language, args.peer)
gen.emit_definition(args.filename, ast)
return 0
#!/usr/bin/env python3
# ex: set filetype=python:
"""Translate an XDR specification into executable code that
can be compiled for the Linux kernel."""
import logging
from argparse import Namespace
from lark import logger
from lark.exceptions import UnexpectedInput
from xdr_parse import xdr_parser
from xdr_ast import transform_parse_tree
logger.setLevel(logging.DEBUG)
def handle_parse_error(e: UnexpectedInput) -> bool:
"""Simple parse error reporting, no recovery attempted"""
print(e)
return True
def subcmd(args: Namespace) -> int:
"""Lexical and syntax check of an XDR specification"""
parser = xdr_parser()
with open(args.filename, encoding="utf-8") as f:
parse_tree = parser.parse(f.read(), on_error=handle_parse_error)
transform_parse_tree(parse_tree)
return 0
#!/usr/bin/env python3
# ex: set filetype=python:
"""Translate an XDR specification into executable code that
can be compiled for the Linux kernel."""
import logging
from argparse import Namespace
from lark import logger
from lark.exceptions import UnexpectedInput
from generators.source_top import XdrSourceTopGenerator
from generators.enum import XdrEnumGenerator
from generators.pointer import XdrPointerGenerator
from generators.program import XdrProgramGenerator
from generators.typedef import XdrTypedefGenerator
from generators.struct import XdrStructGenerator
from generators.union import XdrUnionGenerator
from xdr_ast import transform_parse_tree, _RpcProgram, Specification
from xdr_ast import _XdrAst, _XdrEnum, _XdrPointer
from xdr_ast import _XdrStruct, _XdrTypedef, _XdrUnion
from xdr_parse import xdr_parser, set_xdr_annotate
logger.setLevel(logging.INFO)
def emit_source_decoder(node: _XdrAst, language: str, peer: str) -> None:
"""Emit one XDR decoder function for a source file"""
if isinstance(node, _XdrEnum):
gen = XdrEnumGenerator(language, peer)
elif isinstance(node, _XdrPointer):
gen = XdrPointerGenerator(language, peer)
elif isinstance(node, _XdrTypedef):
gen = XdrTypedefGenerator(language, peer)
elif isinstance(node, _XdrStruct):
gen = XdrStructGenerator(language, peer)
elif isinstance(node, _XdrUnion):
gen = XdrUnionGenerator(language, peer)
elif isinstance(node, _RpcProgram):
gen = XdrProgramGenerator(language, peer)
else:
return
gen.emit_decoder(node)
def emit_source_encoder(node: _XdrAst, language: str, peer: str) -> None:
"""Emit one XDR encoder function for a source file"""
if isinstance(node, _XdrEnum):
gen = XdrEnumGenerator(language, peer)
elif isinstance(node, _XdrPointer):
gen = XdrPointerGenerator(language, peer)
elif isinstance(node, _XdrTypedef):
gen = XdrTypedefGenerator(language, peer)
elif isinstance(node, _XdrStruct):
gen = XdrStructGenerator(language, peer)
elif isinstance(node, _XdrUnion):
gen = XdrUnionGenerator(language, peer)
elif isinstance(node, _RpcProgram):
gen = XdrProgramGenerator(language, peer)
else:
return
gen.emit_encoder(node)
def generate_server_source(filename: str, root: Specification, language: str) -> None:
"""Generate server-side source code"""
gen = XdrSourceTopGenerator(language, "server")
gen.emit_source(filename, root)
for definition in root.definitions:
emit_source_decoder(definition.value, language, "server")
for definition in root.definitions:
emit_source_encoder(definition.value, language, "server")
def generate_client_source(filename: str, root: Specification, language: str) -> None:
"""Generate server-side source code"""
gen = XdrSourceTopGenerator(language, "client")
gen.emit_source(filename, root)
# cel: todo: client needs XDR size macros
for definition in root.definitions:
emit_source_encoder(definition.value, language, "client")
for definition in root.definitions:
emit_source_decoder(definition.value, language, "client")
# cel: todo: client needs PROC macros
def handle_parse_error(e: UnexpectedInput) -> bool:
"""Simple parse error reporting, no recovery attempted"""
print(e)
return True
def subcmd(args: Namespace) -> int:
"""Generate encoder and decoder functions"""
set_xdr_annotate(args.annotate)
parser = xdr_parser()
with open(args.filename, encoding="utf-8") as f:
parse_tree = parser.parse(f.read(), on_error=handle_parse_error)
ast = transform_parse_tree(parse_tree)
match args.peer:
case "server":
generate_server_source(args.filename, ast, args.language)
case "client":
generate_client_source(args.filename, ast, args.language)
case _:
print("Code generation for", args.peer, "is not yet supported")
return 0
{# SPDX-License-Identifier: GPL-2.0 #}
enum { {{ name }} = {{ value }} };
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, enum {{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, enum {{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* enum {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, enum {{ name }} *ptr)
{
u32 val;
if (xdr_stream_decode_u32(xdr, &val) < 0)
return false;
*ptr = val;
return true;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{{ name }} = {{ value }},
{# SPDX-License-Identifier: GPL-2.0 #}
enum {{ name }} {
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* enum {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, enum {{ name }} value)
{
return xdr_stream_encode_u32(xdr, value) == XDR_UNIT;
}
{# SPDX-License-Identifier: GPL-2.0 #}
#endif /* _LINUX_XDRGEN_{{ infix }}_DECL_H */
{# SPDX-License-Identifier: GPL-2.0 #}
#endif /* _LINUX_XDRGEN_{{ infix }}_DEF_H */
/* SPDX-License-Identifier: GPL-2.0 */
/* Generated by xdrgen. Manual edits will be lost. */
/* XDR specification file: {{ filename }} */
/* XDR specification modification time: {{ mtime }} */
#ifndef _LINUX_XDRGEN_{{ infix }}_DECL_H
#define _LINUX_XDRGEN_{{ infix }}_DECL_H
#include <linux/types.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/xdrgen/_defs.h>
#include <linux/sunrpc/xdrgen/_builtins.h>
#include <linux/sunrpc/xdrgen/{{ infix.lower() }}.h>
/* SPDX-License-Identifier: GPL-2.0 */
/* Generated by xdrgen. Manual edits will be lost. */
/* XDR specification file: {{ filename }} */
/* XDR specification modification time: {{ mtime }} */
#ifndef _LINUX_XDRGEN_{{ infix }}_DEF_H
#define _LINUX_XDRGEN_{{ infix }}_DEF_H
#include <linux/types.h>
#include <linux/sunrpc/xdrgen/_defs.h>
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *value);
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
if (xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}.items[i]) < 0)
return false;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length opaque) */
{% endif %}
if (xdr_stream_decode_opaque_fixed(xdr, ptr->{{ name }}, {{ size }}) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* pointer {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr)
{
bool opted;
{% if annotate %}
/* opted */
{% endif %}
if (!xdrgen_decode_bool(xdr, &opted))
return false;
if (!opted)
return true;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (optional data) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, ptr->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length array) */
{% endif %}
if (xdr_stream_decode_u32(xdr, &ptr->{{ name }}.count) != XDR_UNIT)
return false;
{% if maxsize != "0" %}
if (ptr->{{ name }}.count > {{ maxsize }})
return false;
{% endif %}
for (u32 i = 0; i < ptr->{{ name }}.count; i++)
if (!xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}.element[i]))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length opaque) */
{% endif %}
if (!xdrgen_decode_opaque(xdr, (opaque *)ptr, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length string) */
{% endif %}
if (!xdrgen_decode_string(xdr, (string *)ptr, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (basic) */
{% endif %}
{{ classifier }}{{ type }} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (fixed-length array) */
{% endif %}
{{ type }} {{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (fixed-length opaque) */
{% endif %}
u8 {{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* pointer {{ name }} */
{% endif %}
struct {{ name }} {
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (optional data) */
{% endif %}
{{ classifier }}{{ type }} *{{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length array) */
{% endif %}
struct {
u32 count;
{{ classifier }}{{ type }} *element;
} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length opaque) */
{% endif %}
opaque {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length string) */
{% endif %}
string {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &value->{{ name }}))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}))
{% endif %}
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
{% if type in pass_by_reference %}
if (xdrgen_encode_{{ type }}(xdr, &value->items[i]) < 0)
{% else %}
if (xdrgen_encode_{{ type }}(xdr, value->items[i]) < 0)
{% endif %}
return false;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length opaque) */
{% endif %}
if (xdr_stream_encode_opaque_fixed(xdr, value->{{ name }}, {{ size }}) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* pointer {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *value)
{
{% if annotate %}
/* opted */
{% endif %}
if (!xdrgen_encode_bool(xdr, value != NULL))
return false;
if (!value)
return true;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (optional data) */
{% endif %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length array) */
{% endif %}
if (value->{{ name }}.count > {{ maxsize }})
return false;
if (xdr_stream_encode_u32(xdr, value->{{ name }}.count) != XDR_UNIT)
return false;
for (u32 i = 0; i < value->{{ name }}.count; i++)
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &value->{{ name }}.element[i]))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}.element[i]))
{% endif %}
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length opaque) */
{% endif %}
if (value->{{ name }}.len > {{ maxsize }})
return false;
if (xdr_stream_encode_opaque(xdr, value->{{ name }}.data, value->{{ name }}.len) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length string) */
{% endif %}
if (value->{{ name }}.len > {{ maxsize }})
return false;
if (xdr_stream_encode_opaque(xdr, value->{{ name }}.data, value->{{ name }}.len) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
bool {{ program }}_svc_decode_{{ argument }}(struct svc_rqst *rqstp, struct xdr_stream *xdr);
{# SPDX-License-Identifier: GPL-2.0 #}
bool {{ program }}_svc_encode_{{ result }}(struct svc_rqst *rqstp, struct xdr_stream *xdr);
{# SPDX-License-Identifier: GPL-2.0 #}
/**
* {{ program }}_svc_decode_{{ argument }} - Decode a {{ argument }} argument
* @rqstp: RPC transaction context
* @xdr: source XDR data stream
*
* Return values:
* %true: procedure arguments decoded successfully
* %false: decode failed
*/
bool {{ program }}_svc_decode_{{ argument }}(struct svc_rqst *rqstp, struct xdr_stream *xdr)
{
{% if argument == 'void' %}
return xdrgen_decode_void(xdr);
{% else %}
struct {{ argument }} *argp = rqstp->rq_argp;
return xdrgen_decode_{{ argument }}(xdr, argp);
{% endif %}
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* Decode {{ result }} results */
{% endif %}
static int {{ program }}_xdr_dec_{{ result }}(struct rpc_rqst *req,
struct xdr_stream *xdr, void *data)
{
{% if result == 'void' %}
xdrgen_decode_void(xdr);
{% else %}
struct {{ result }} *result = data;
if (!xdrgen_decode_{{ result }}(xdr, result))
return -EIO;
if (result->stat != nfs_ok) {
trace_nfs_xdr_status(xdr, (int)result->stat);
return {{ program }}_stat_to_errno(result->stat);
}
{% endif %}
return 0;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* procedure numbers for {{ program }} */
{% endif %}
enum {
{# SPDX-License-Identifier: GPL-2.0 #}
{{ name }} = {{ value }},
{# SPDX-License-Identifier: GPL-2.0 #}
{%if annotate %}
/* Encode {{ argument }} arguments */
{% endif %}
static void {{ program }}_xdr_enc_{{ argument }}(struct rpc_rqst *req,
struct xdr_stream *xdr, const void *data)
{
{% if argument == 'void' %}
xdrgen_encode_void(xdr);
{% else %}
const struct {{ argument }} *args = data;
xdrgen_encode_{{ argument }}(xdr, args);
{% endif %}
}
{# SPDX-License-Identifier: GPL-2.0 #}
/**
* {{ program }}_svc_encode_{{ result }} - Encode a {{ result }} result
* @rqstp: RPC transaction context
* @xdr: target XDR data stream
*
* Return values:
* %true: procedure results encoded successfully
* %false: encode failed
*/
bool {{ program }}_svc_encode_{{ result }}(struct svc_rqst *rqstp, struct xdr_stream *xdr)
{
{% if result == 'void' %}
return xdrgen_encode_void(xdr);
{% else %}
struct {{ result }} *resp = rqstp->rq_resp;
return xdrgen_encode_{{ result }}(xdr, resp);
{% endif %}
}
// SPDX-License-Identifier: GPL-2.0
// Generated by xdrgen. Manual edits will be lost.
// XDR specification file: {{ filename }}
// XDR specification modification time: {{ mtime }}
#include <linux/sunrpc/xprt.h>
#include "{{ program }}xdr_gen.h"
// SPDX-License-Identifier: GPL-2.0
// Generated by xdrgen. Manual edits will be lost.
// XDR specification file: {{ filename }}
// XDR specification modification time: {{ mtime }}
#include <linux/sunrpc/svc.h>
#include "{{ program }}xdr_gen.h"
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *value);
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
if (xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}.items[i]) < 0)
return false;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length opaque) */
{% endif %}
if (xdr_stream_decode_opaque_fixed(xdr, ptr->{{ name }}, {{ size }}) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* struct {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr)
{
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (optional data) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, ptr->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length array) */
{% endif %}
if (xdr_stream_decode_u32(xdr, &ptr->{{ name }}.count) != XDR_UNIT)
return false;
{% if maxsize != "0" %}
if (ptr->{{ name }}.count > {{ maxsize }})
return false;
{% endif %}
for (u32 i = 0; i < ptr->{{ name }}.count; i++)
if (!xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}.element[i]))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length opaque) */
{% endif %}
if (!xdrgen_decode_opaque(xdr, (opaque *)ptr, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length string) */
{% endif %}
if (!xdrgen_decode_string(xdr, (string *)ptr, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (basic) */
{% endif %}
{{ classifier }}{{ type }} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (fixed-length array) */
{% endif %}
{{ type }} {{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (fixed-length opaque) */
{% endif %}
u8 {{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* struct {{ name }} */
{% endif %}
struct {{ name }} {
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (optional data) */
{% endif %}
{{ classifier }}{{ type }} *{{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length array) */
{% endif %}
struct {
u32 count;
{{ classifier }}{{ type }} *element;
} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length opaque) */
{% endif %}
opaque {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* (variable-length string) */
{% endif %}
string {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &value->{{ name }}))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}))
{% endif %}
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
{% if type in pass_by_reference %}
if (xdrgen_encode_{{ type }}(xdr, &value->items[i]) < 0)
{% else %}
if (xdrgen_encode_{{ type }}(xdr, value->items[i]) < 0)
{% endif %}
return false;
}
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (fixed-length opaque) */
{% endif %}
if (xdr_stream_encode_opaque_fixed(xdr, value->{{ name }}, {{ size }}) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* struct {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *value)
{
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (optional data) */
{% endif %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length array) */
{% endif %}
if (value->{{ name }}.count > {{ maxsize }})
return false;
if (xdr_stream_encode_u32(xdr, value->{{ name }}.count) != XDR_UNIT)
return false;
for (u32 i = 0; i < value->{{ name }}.count; i++)
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &value->{{ name }}.element[i]))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, value->{{ name }}.element[i]))
{% endif %}
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length opaque) */
{% endif %}
if (value->{{ name }}.len > {{ maxsize }})
return false;
if (xdr_stream_encode_opaque(xdr, value->{{ name }}.data, value->{{ name }}.len) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length string) */
{% endif %}
if (value->{{ name }}.len > {{ maxsize }})
return false;
if (xdr_stream_encode_opaque(xdr, value->{{ name }}.data, value->{{ name }}.len) < 0)
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ name }} *ptr);
{% if name in pass_by_reference %}
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ name }} *value);
{%- else -%}
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ name }} value);
{% endif %}
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value);
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ name }} *ptr)
{
{% if annotate %}
/* (basic) */
{% endif %}
return xdrgen_decode_{{ type }}(xdr, ptr);
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr)
{
{% if annotate %}
/* (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
{%- if classifier == '' %}
if (xdrgen_decode_{{ type }}(xdr, ptr->items[i]) < 0)
{% else %}
if (xdrgen_decode_{{ type }}(xdr, &ptr->items[i]) < 0)
{% endif %}
return false;
}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr)
{
{% if annotate %}
/* (fixed-length opaque) */
{% endif %}
return xdr_stream_decode_opaque_fixed(xdr, ptr, {{ size }}) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr)
{
{% if annotate %}
/* (variable-length array) */
{% endif %}
if (xdr_stream_decode_u32(xdr, &ptr->count) < 0)
return false;
{% if maxsize != "0" %}
if (ptr->count > {{ maxsize }})
return false;
{% endif %}
for (u32 i = 0; i < ptr->count; i++)
if (!xdrgen_decode_{{ type }}(xdr, &ptr->element[i]))
return false;
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr)
{
{% if annotate %}
/* (variable-length opaque) */
{% endif %}
return xdr_stream_decode_opaque(xdr, ptr->data, ptr->len) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, {{ classifier }}{{ name }} *ptr)
{
{% if annotate %}
/* (variable-length string) */
{% endif %}
return xdr_stream_decode_opaque(xdr, ptr->data, ptr->len) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (basic) */
{% endif %}
typedef {{ classifier }}{{ type }} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (fixed-length array) */
{% endif %}
typedef {{ type }}{{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (fixed-length opaque) */
{% endif %}
typedef u8 {{ name }}[{{ size }}];
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (variable-length array) */
{% endif %}
typedef struct {
u32 count;
{{ classifier }}{{ type }} *element;
} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (variable-length opaque) */
{% endif %}
typedef opaque {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} (variable-length string) */
{% endif %}
typedef string {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
{% if name in pass_by_reference %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} *value)
{% else %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{% endif %}
{
{% if annotate %}
/* (basic) */
{% endif %}
return xdrgen_encode_{{ type }}(xdr, value);
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{
{% if annotate %}
/* (fixed-length array) */
{% endif %}
for (u32 i = 0; i < {{ size }}; i++) {
{% if type in pass_by_reference %}
if (xdrgen_encode_{{ type }}(xdr, &value->items[i]) < 0)
{% else %}
if (xdrgen_encode_{{ type }}(xdr, value->items[i]) < 0)
{% endif %}
return false;
}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{
{% if annotate %}
/* (fixed-length opaque) */
{% endif %}
return xdr_stream_encode_opaque_fixed(xdr, value, {{ size }}) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{
{% if annotate %}
/* (variable-length array) */
{% endif %}
{% if maxsize != "0" %}
if (unlikely(value.count > {{ maxsize }}))
return false;
{% endif %}
if (xdr_stream_encode_u32(xdr, value.count) != XDR_UNIT)
return false;
for (u32 i = 0; i < value.count; i++)
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &value.element[i]))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, value.element[i]))
{% endif %}
return false;
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{
{% if annotate %}
/* (variable-length opaque) */
{% endif %}
return xdr_stream_encode_opaque(xdr, value.data, value.len) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* typedef {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const {{ classifier }}{{ name }} value)
{
{% if annotate %}
/* (variable-length string) */
{% endif %}
return xdr_stream_encode_opaque(xdr, value.data, value.len) >= 0;
};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, &ptr->u.{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
break;
{# SPDX-License-Identifier: GPL-2.0 #}
case {{ case }}:
{# SPDX-License-Identifier: GPL-2.0 #}
}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
default:
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* union {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr)
{
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (optional data) */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, &ptr->u.{{ name }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* discriminant {{ name }} */
{% endif %}
if (!xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}))
return false;
switch (ptr->{{ name }}) {
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length array) */
{% endif %}
if (xdr_stream_decode_u32(xdr, &count) != XDR_UNIT)
return false;
if (count > {{ maxsize }})
return false;
for (u32 i = 0; i < count; i++) {
if (xdrgen_decode_{{ type }}(xdr, &ptr->{{ name }}.items[i]) < 0)
return false;
}
ptr->{{ name }}.len = count;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length opaque) */
{% endif %}
if (!xdrgen_decode_opaque(xdr, (struct opaque *)ptr->u.{{ name }}, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (variable-length string) */
{% endif %}
if (!xdrgen_decode_string(xdr, (struct string *)ptr->u.{{ name }}, {{ maxsize }}))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
if (!xdrgen_decode_void(xdr))
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
{{ classifier }}{{ type }} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
} u;
};
{%- if name in public_apis %}
bool xdrgen_decode_{{ name }}(struct xdr_stream *xdr, struct {{ name }} *ptr);
bool xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *ptr);
{%- endif -%}
{# SPDX-License-Identifier: GPL-2.0 #}
{{ classifier }}{{ type }} {{ name }};
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* union {{ name }} */
{% endif %}
struct {{ name }} {
{# SPDX-License-Identifier: GPL-2.0 #}
{{ classifier }}{{ type }} {{ name }};
union {
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* member {{ name }} (basic) */
{% endif %}
{% if type in pass_by_reference %}
if (!xdrgen_encode_{{ type }}(xdr, &ptr->u.{{ name }}))
{% else %}
if (!xdrgen_encode_{{ type }}(xdr, ptr->u.{{ name }}))
{% endif %}
return false;
{# SPDX-License-Identifier: GPL-2.0 #}
break;
{# SPDX-License-Identifier: GPL-2.0 #}
case {{ case }}:
{# SPDX-License-Identifier: GPL-2.0 #}
}
return true;
};
{# SPDX-License-Identifier: GPL-2.0 #}
default:
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* union {{ name }} */
{% endif %}
{% if name in public_apis %}
bool
{% else %}
static bool __maybe_unused
{% endif %}
xdrgen_encode_{{ name }}(struct xdr_stream *xdr, const struct {{ name }} *ptr)
{
{# SPDX-License-Identifier: GPL-2.0 #}
{% if annotate %}
/* discriminant {{ name }} */
{% endif %}
if (!xdrgen_encode_{{ type }}(xdr, ptr->{{ name }}))
return false;
switch (ptr->{{ name }}) {
{# SPDX-License-Identifier: GPL-2.0 #}
if (!xdrgen_encode_void(xdr))
return false;
/* Sample XDR specification from RFC 1832 Section 5.5 */
const MAXUSERNAME = 32; /* max length of a user name */
const MAXFILELEN = 65535; /* max length of a file */
const MAXNAMELEN = 255; /* max length of a file name */
/*
* Types of files:
*/
enum filekind {
TEXT = 0, /* ascii data */
DATA = 1, /* raw data */
EXEC = 2 /* executable */
};
/*
* File information, per kind of file:
*/
union filetype switch (filekind kind) {
case TEXT:
void; /* no extra information */
case DATA:
string creator<MAXNAMELEN>; /* data creator */
case EXEC:
string interpretor<MAXNAMELEN>; /* program interpretor */
};
/*
* A complete file:
*/
struct file {
string filename<MAXNAMELEN>; /* name of file */
filetype type; /* info about file */
string owner<MAXUSERNAME>; /* owner of file */
opaque data<MAXFILELEN>; /* file data */
};
#!/usr/bin/env python3
# ex: set filetype=python:
"""Define and implement the Abstract Syntax Tree for the XDR language."""
import sys
from typing import List
from dataclasses import dataclass
from lark import ast_utils, Transformer
from lark.tree import Meta
this_module = sys.modules[__name__]
excluded_apis = []
header_name = "none"
public_apis = []
enums = set()
structs = set()
pass_by_reference = set()
@dataclass
class _XdrAst(ast_utils.Ast):
"""Base class for the XDR abstract syntax tree"""
@dataclass
class _XdrIdentifier(_XdrAst):
"""Corresponds to 'identifier' in the XDR language grammar"""
symbol: str
@dataclass
class _XdrValue(_XdrAst):
"""Corresponds to 'value' in the XDR language grammar"""
value: str
@dataclass
class _XdrConstantValue(_XdrAst):
"""Corresponds to 'constant' in the XDR language grammar"""
value: int
@dataclass
class _XdrTypeSpecifier(_XdrAst):
"""Corresponds to 'type_specifier' in the XDR language grammar"""
type_name: str
c_classifier: str
@dataclass
class _XdrDefinedType(_XdrTypeSpecifier):
"""Corresponds to a type defined by the input specification"""
@dataclass
class _XdrBuiltInType(_XdrTypeSpecifier):
"""Corresponds to a built-in XDR type"""
@dataclass
class _XdrDeclaration(_XdrAst):
"""Base class of XDR type declarations"""
@dataclass
class _XdrFixedLengthOpaque(_XdrDeclaration):
"""A fixed-length opaque declaration"""
name: str
size: str
template: str = "fixed_length_opaque"
@dataclass
class _XdrVariableLengthOpaque(_XdrDeclaration):
"""A variable-length opaque declaration"""
name: str
maxsize: str
template: str = "variable_length_opaque"
@dataclass
class _XdrVariableLengthString(_XdrDeclaration):
"""A (NUL-terminated) variable-length string declaration"""
name: str
maxsize: str
template: str = "variable_length_string"
@dataclass
class _XdrFixedLengthArray(_XdrDeclaration):
"""A fixed-length array declaration"""
name: str
spec: _XdrTypeSpecifier
size: str
template: str = "fixed_length_array"
@dataclass
class _XdrVariableLengthArray(_XdrDeclaration):
"""A variable-length array declaration"""
name: str
spec: _XdrTypeSpecifier
maxsize: str
template: str = "variable_length_array"
@dataclass
class _XdrOptionalData(_XdrDeclaration):
"""An 'optional_data' declaration"""
name: str
spec: _XdrTypeSpecifier
template: str = "optional_data"
@dataclass
class _XdrBasic(_XdrDeclaration):
"""A 'basic' declaration"""
name: str
spec: _XdrTypeSpecifier
template: str = "basic"
@dataclass
class _XdrVoid(_XdrDeclaration):
"""A void declaration"""
template: str = "void"
@dataclass
class _XdrConstant(_XdrAst):
"""Corresponds to 'constant_def' in the grammar"""
name: str
value: str
@dataclass
class _XdrEnumerator(_XdrAst):
"""An 'identifier = value' enumerator"""
name: str
value: str
@dataclass
class _XdrEnum(_XdrAst):
"""An XDR enum definition"""
name: str
minimum: int
maximum: int
enumerators: List[_XdrEnumerator]
@dataclass
class _XdrStruct(_XdrAst):
"""An XDR struct definition"""
name: str
fields: List[_XdrDeclaration]
@dataclass
class _XdrPointer(_XdrAst):
"""An XDR pointer definition"""
name: str
fields: List[_XdrDeclaration]
@dataclass
class _XdrTypedef(_XdrAst):
"""An XDR typedef"""
declaration: _XdrDeclaration
@dataclass
class _XdrCaseSpec(_XdrAst):
"""One case in an XDR union"""
values: List[str]
arm: _XdrDeclaration
template: str = "case_spec"
@dataclass
class _XdrDefaultSpec(_XdrAst):
"""Default case in an XDR union"""
arm: _XdrDeclaration
template: str = "default_spec"
@dataclass
class _XdrUnion(_XdrAst):
"""An XDR union"""
name: str
discriminant: _XdrDeclaration
cases: List[_XdrCaseSpec]
default: _XdrDeclaration
@dataclass
class _RpcProcedure(_XdrAst):
"""RPC procedure definition"""
name: str
number: str
argument: _XdrTypeSpecifier
result: _XdrTypeSpecifier
@dataclass
class _RpcVersion(_XdrAst):
"""RPC version definition"""
name: str
number: str
procedures: List[_RpcProcedure]
@dataclass
class _RpcProgram(_XdrAst):
"""RPC program definition"""
name: str
number: str
versions: List[_RpcVersion]
@dataclass
class _Pragma(_XdrAst):
"""Empty class for pragma directives"""
@dataclass
class Definition(_XdrAst, ast_utils.WithMeta):
"""Corresponds to 'definition' in the grammar"""
meta: Meta
value: _XdrAst
@dataclass
class Specification(_XdrAst, ast_utils.AsList):
"""Corresponds to 'specification' in the grammar"""
definitions: List[Definition]
class ParseToAst(Transformer):
"""Functions that transform productions into AST nodes"""
def identifier(self, children):
"""Instantiate one _XdrIdentifier object"""
return _XdrIdentifier(children[0].value)
def value(self, children):
"""Instantiate one _XdrValue object"""
if isinstance(children[0], _XdrIdentifier):
return _XdrValue(children[0].symbol)
return _XdrValue(children[0].children[0].value)
def constant(self, children):
"""Instantiate one _XdrConstantValue object"""
match children[0].data:
case "decimal_constant":
value = int(children[0].children[0].value, base=10)
case "hexadecimal_constant":
value = int(children[0].children[0].value, base=16)
case "octal_constant":
value = int(children[0].children[0].value, base=8)
return _XdrConstantValue(value)
def type_specifier(self, children):
"""Instantiate one type_specifier object"""
c_classifier = ""
if isinstance(children[0], _XdrIdentifier):
name = children[0].symbol
if name in enums:
c_classifier = "enum "
if name in structs:
c_classifier = "struct "
return _XdrDefinedType(
type_name=name,
c_classifier=c_classifier,
)
token = children[0].data
return _XdrBuiltInType(
type_name=token.value,
c_classifier=c_classifier,
)
def constant_def(self, children):
"""Instantiate one _XdrConstant object"""
name = children[0].symbol
value = children[1].value
return _XdrConstant(name, value)
# cel: Python can compute a min() and max() for the enumerator values
# so that the generated code can perform proper range checking.
def enum(self, children):
"""Instantiate one _XdrEnum object"""
enum_name = children[0].symbol
enums.add(enum_name)
i = 0
enumerators = []
body = children[1]
while i < len(body.children):
name = body.children[i].symbol
value = body.children[i + 1].value
enumerators.append(_XdrEnumerator(name, value))
i = i + 2
return _XdrEnum(enum_name, 0, 0, enumerators)
def fixed_length_opaque(self, children):
"""Instantiate one _XdrFixedLengthOpaque declaration object"""
name = children[0].symbol
size = children[1].value
return _XdrFixedLengthOpaque(name, size)
def variable_length_opaque(self, children):
"""Instantiate one _XdrVariableLengthOpaque declaration object"""
name = children[0].symbol
if children[1] is not None:
maxsize = children[1].value
else:
maxsize = "0"
return _XdrVariableLengthOpaque(name, maxsize)
def variable_length_string(self, children):
"""Instantiate one _XdrVariableLengthString declaration object"""
name = children[0].symbol
if children[1] is not None:
maxsize = children[1].value
else:
maxsize = "0"
return _XdrVariableLengthString(name, maxsize)
def fixed_length_array(self, children):
"""Instantiate one _XdrFixedLengthArray declaration object"""
spec = children[0]
name = children[1].symbol
size = children[2].value
return _XdrFixedLengthArray(name, spec, size)
def variable_length_array(self, children):
"""Instantiate one _XdrVariableLengthArray declaration object"""
spec = children[0]
name = children[1].symbol
if children[2] is not None:
maxsize = children[2].value
else:
maxsize = "0"
return _XdrVariableLengthArray(name, spec, maxsize)
def optional_data(self, children):
"""Instantiate one _XdrOptionalData declaration object"""
spec = children[0]
name = children[1].symbol
structs.add(name)
pass_by_reference.add(name)
return _XdrOptionalData(name, spec)
def basic(self, children):
"""Instantiate one _XdrBasic object"""
spec = children[0]
name = children[1].symbol
return _XdrBasic(name, spec)
def void(self, children):
"""Instantiate one _XdrVoid declaration object"""
return _XdrVoid()
def struct(self, children):
"""Instantiate one _XdrStruct object"""
name = children[0].symbol
structs.add(name)
pass_by_reference.add(name)
fields = children[1].children
last_field = fields[-1]
if (
isinstance(last_field, _XdrOptionalData)
and name == last_field.spec.type_name
):
return _XdrPointer(name, fields)
return _XdrStruct(name, fields)
def typedef(self, children):
"""Instantiate one _XdrTypedef object"""
new_type = children[0]
if isinstance(new_type, _XdrBasic) and isinstance(
new_type.spec, _XdrDefinedType
):
if new_type.spec.type_name in pass_by_reference:
pass_by_reference.add(new_type.name)
return _XdrTypedef(new_type)
def case_spec(self, children):
"""Instantiate one _XdrCaseSpec object"""
values = []
for item in children[0:-1]:
values.append(item.value)
arm = children[-1]
return _XdrCaseSpec(values, arm)
def default_spec(self, children):
"""Instantiate one _XdrDefaultSpec object"""
arm = children[0]
return _XdrDefaultSpec(arm)
def union(self, children):
"""Instantiate one _XdrUnion object"""
name = children[0].symbol
structs.add(name)
pass_by_reference.add(name)
body = children[1]
discriminant = body.children[0].children[0]
cases = body.children[1:-1]
default = body.children[-1]
return _XdrUnion(name, discriminant, cases, default)
def procedure_def(self, children):
"""Instantiate one _RpcProcedure object"""
result = children[0]
name = children[1].symbol
argument = children[2]
number = children[3].value
return _RpcProcedure(name, number, argument, result)
def version_def(self, children):
"""Instantiate one _RpcVersion object"""
name = children[0].symbol
number = children[-1].value
procedures = children[1:-1]
return _RpcVersion(name, number, procedures)
def program_def(self, children):
"""Instantiate one _RpcProgram object"""
name = children[0].symbol
number = children[-1].value
versions = children[1:-1]
return _RpcProgram(name, number, versions)
def pragma_def(self, children):
"""Instantiate one _Pragma object"""
directive = children[0].children[0].data
match directive:
case "exclude_directive":
excluded_apis.append(children[1].symbol)
case "header_directive":
global header_name
header_name = children[1].symbol
case "public_directive":
public_apis.append(children[1].symbol)
case _:
raise NotImplementedError("Directive not supported")
return _Pragma()
transformer = ast_utils.create_transformer(this_module, ParseToAst())
def transform_parse_tree(parse_tree):
"""Transform productions into an abstract syntax tree"""
return transformer.transform(parse_tree)
def get_header_name() -> str:
"""Return header name set by pragma header directive"""
return header_name
#!/usr/bin/env python3
# ex: set filetype=python:
"""Common parsing code for xdrgen"""
from lark import Lark
# Set to True to emit annotation comments in generated source
annotate = False
def set_xdr_annotate(set_it: bool) -> None:
"""Set 'annotate' if --annotate was specified on the command line"""
global annotate
annotate = set_it
def get_xdr_annotate() -> bool:
"""Return True if --annotate was specified on the command line"""
return annotate
def xdr_parser() -> Lark:
"""Return a Lark parser instance configured with the XDR language grammar"""
return Lark.open(
"grammars/xdr.lark",
rel_to=__file__,
start="specification",
debug=True,
strict=True,
propagate_positions=True,
parser="lalr",
lexer="contextual",
)
#!/usr/bin/env python3
# ex: set filetype=python:
"""Translate an XDR specification into executable code that
can be compiled for the Linux kernel."""
__author__ = "Chuck Lever"
__copyright__ = "Copyright (c) 2024 Oracle and/or its affiliates."
__license__ = "GPL-2.0 only"
__version__ = "0.2"
import sys
import argparse
from subcmds import definitions
from subcmds import declarations
from subcmds import lint
from subcmds import source
sys.path.insert(1, "@pythondir@")
def main() -> int:
"""Parse command-line options"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Convert an XDR specification to Linux kernel source code",
epilog="""\
Copyright (c) 2024 Oracle and/or its affiliates.
License GPLv2: <http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt>
This is free software. You are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.""",
)
parser.add_argument(
"--version",
help="Display the version of this tool",
action="version",
version=__version__,
)
subcommands = parser.add_subparsers(title="Subcommands", required=True)
definitions_parser = subcommands.add_parser(
"definitions", help="Generate XDR definitions"
)
definitions_parser.add_argument(
"--annotate",
action="store_true",
default=False,
help="Add annotation comments",
)
definitions_parser.add_argument(
"--language",
action="store_true",
default="C",
help="Output language",
)
definitions_parser.add_argument(
"--peer",
choices=["server", "client",],
default="server",
help="Generate header code for client or server side",
type=str,
)
definitions_parser.add_argument("filename", help="File containing an XDR specification")
definitions_parser.set_defaults(func=definitions.subcmd)
declarations_parser = subcommands.add_parser(
"declarations", help="Generate function declarations"
)
declarations_parser.add_argument(
"--annotate",
action="store_true",
default=False,
help="Add annotation comments",
)
declarations_parser.add_argument(
"--language",
action="store_true",
default="C",
help="Output language",
)
declarations_parser.add_argument(
"--peer",
choices=["server", "client",],
default="server",
help="Generate code for client or server side",
type=str,
)
declarations_parser.add_argument("filename", help="File containing an XDR specification")
declarations_parser.set_defaults(func=declarations.subcmd)
linter_parser = subcommands.add_parser("lint", help="Check an XDR specification")
linter_parser.add_argument("filename", help="File containing an XDR specification")
linter_parser.set_defaults(func=lint.subcmd)
source_parser = subcommands.add_parser(
"source", help="Generate XDR encoder and decoder source code"
)
source_parser.add_argument(
"--annotate",
action="store_true",
default=False,
help="Add annotation comments",
)
source_parser.add_argument(
"--language",
action="store_true",
default="C",
help="Output language",
)
source_parser.add_argument(
"--peer",
choices=["server", "client",],
default="server",
help="Generate code for client or server side",
type=str,
)
source_parser.add_argument("filename", help="File containing an XDR specification")
source_parser.set_defaults(func=source.subcmd)
args = parser.parse_args()
return args.func(args)
try:
if __name__ == "__main__":
sys.exit(main())
except (SystemExit, KeyboardInterrupt, BrokenPipeError):
sys.exit(1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment