Skip to content
Snippets Groups Projects
Commit 2b729ce7 authored by Martin Mareš's avatar Martin Mareš
Browse files

The first bits

parents
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
from dbus_next.aio import MessageBus, ProxyInterface
import dbus_next.introspection as intr
from dbus_next.message import Message
from dbus_next.service import ServiceInterface, method, dbus_property, signal
from dbus_next import Variant, DBusError, BusType, MessageType, RequestNameReply
import asyncio
import os
from typing import Union, Optional, Callable, Dict, List, Set
async def main():
bus = await MessageBus(negotiate_unix_fd=True).connect()
ins = await bus.introspect('cz.ucw.test', '/cz/ucw/Machine')
pxy = bus.get_proxy_object('cz.ucw.test', '/cz/ucw/Machine', ins)
pif = pxy.get_interface('cz.ucw.TestInterface')
# fd = os.open('/dev/null', os.O_RDWR)
xx = await pif.call_shell(0, 1, 2)
# xx = await pif.call_frobate(42, 'brum')
print(xx)
asyncio.get_event_loop().run_until_complete(main())
server.py 0 → 100755
#!/usr/bin/python3
from dbus_next.aio import MessageBus, ProxyInterface
import dbus_next.introspection as intr
from dbus_next.message import Message
from dbus_next.service import ServiceInterface, method, dbus_property, signal
from dbus_next import Variant, DBusError, BusType, MessageType, RequestNameReply
import asyncio
import os
import sys
from typing import Union, Optional, Callable, Dict, List, Set, Any
from shipcat.config import ContainerConfig, ConfigError
class BusLogic:
bus: MessageBus
msg_tasks: Set[asyncio.Task]
dbus_iface: ProxyInterface
object_tree: Any
method_list: List[intr.Method]
method_dict: Dict[str, intr.Method]
interface_intro: intr.Interface
def __init__(self) -> None:
self.msg_tasks = set()
self.object_tree = {
'cz': {
'ucw': {
'ShipCat': True,
}
}
}
self.method_list = [
intr.Method(
name='Frobate',
in_args=[
intr.Arg('i', intr.ArgDirection.IN, 'foo'),
intr.Arg('s', intr.ArgDirection.IN, 'bar'),
],
out_args=[
intr.Arg('a{us}', intr.ArgDirection.OUT),
],
),
intr.Method(
name='Shell',
in_args=[
intr.Arg('h', intr.ArgDirection.IN, 'stdin_fd'),
intr.Arg('h', intr.ArgDirection.IN, 'stdout_fd'),
intr.Arg('h', intr.ArgDirection.IN, 'stderr_fd'),
],
out_args=[
intr.Arg('i', intr.ArgDirection.OUT),
],
),
]
self.method_dict = {m.name: m for m in self.method_list}
self.interface_intro = intr.Interface(
name='cz.ucw.ShipCat',
methods=self.method_list,
)
async def loop(self) -> None:
self.bus = await MessageBus(negotiate_unix_fd=True).connect()
# Obtain proxy interface for org.freedesktop.DBus
dbus_intro = await self.bus.introspect(bus_name='org.freedesktop.DBus', path='/org/freedesktop/DBus')
dbus_proxy = self.bus.get_proxy_object(bus_name='org.freedesktop.DBus', path='/org/freedesktop/DBus', introspection=dbus_intro)
self.dbus_iface = dbus_proxy.get_interface('org.freedesktop.DBus')
self.bus.add_message_handler(self.handle_message)
rn = await self.bus.request_name('cz.ucw.shipcat')
assert rn == RequestNameReply.PRIMARY_OWNER
await self.bus.wait_for_disconnect()
def handle_message(self, msg: Message) -> Optional[Union[Message, bool]]:
print('=====================')
print('type', msg.message_type)
print('dest', msg.destination)
print('path', msg.path)
print('iface', msg.interface)
print('member', msg.member)
print('sender', msg.sender)
print('fds', msg.unix_fds)
print('serials', msg.serial, msg.reply_serial)
print('body', msg.body)
if (msg.destination == 'cz.ucw.shipcat'
and msg.interface == 'org.freedesktop.DBus.Introspectable'
and msg.member == 'Introspect'):
self.close_fds(msg)
result = self.handle_introspect(msg)
elif (msg.destination == 'cz.ucw.shipcat'
and msg.interface == 'cz.ucw.ShipCat'
and msg.message_type == MessageType.METHOD_CALL
and msg.member in self.method_dict
and msg.signature == self.method_dict[msg.member].in_signature):
result = self.handle_method_call(msg)
else:
self.close_fds(msg)
result = None
return result
def close_fds(self, msg: Message) -> None:
# XXX: The dbus-next library leaks file descriptors. Work around it.
# (see https://github.com/altdesktop/python-dbus-next/issues/162)
for fd in msg.unix_fds:
os.close(fd)
msg.unix_fds = []
def handle_method_call(self, msg: Message) -> bool:
print('Scheduling method call:', msg.member)
async def async_method_call(msg):
try:
reply = await getattr(self, 'handle_' + msg.member)(msg)
except DBusError as e:
print(f'DBusError: {e}')
reply = Message.new_error(msg, e.type, e.text)
except Exception as e:
print(f'Internal error: {e}')
reply = Message.new_error(msg, 'cz.ucw.Test.Error.Internal', 'Internal error')
self.bus.send(reply)
self.close_fds(msg)
task = asyncio.create_task(async_method_call(msg))
self.msg_tasks.add(task)
task.add_done_callback(self.msg_tasks.discard)
return True
def handle_introspect(self, msg: Message) -> Message:
intro = intr.Node.default(msg.path)
# We do not want to speak ObjectManager-ish
intro.interfaces = [i for i in intro.interfaces if i.name != 'org.freedesktop.DBus.ObjectManager']
if msg.path == '/':
path = [""]
else:
path = msg.path.split('/')
assert path[0] == ""
i = 1
node = self.object_tree
while i < len(path) and isinstance(node, dict) and path[i] in node:
node = node[path[i]]
i += 1
print(path, i, node)
if i == len(path):
if isinstance(node, dict):
intro.nodes = [intr.Node(n) for n in sorted(node.keys())]
else:
intro.interfaces.append(self.interface_intro)
print('INTRO', intro.tostring())
return Message.new_method_return(msg, signature='s', body=[intro.tostring()])
async def handle_Frobate(self, msg: Message) -> Message:
print('Inside handle_Frobate:', msg)
cred = await self.dbus_iface.call_get_connection_credentials(msg.sender)
print('Creds:', cred)
return Message.new_method_return(
msg,
signature='a{us}',
body=[{1: 'one', 2: 'two'}],
)
async def handle_Shell(self, msg: Message) -> Message:
stdin_fd = msg.unix_fds[msg.body[0]]
stdout_fd = msg.unix_fds[msg.body[1]]
stderr_fd = msg.unix_fds[msg.body[2]]
print('FDs:', stdin_fd, stdout_fd, stderr_fd)
cred = await self.dbus_iface.call_get_connection_credentials(msg.sender)
print('Creds:', cred)
proc = await asyncio.create_subprocess_exec('/bin/cat', '/etc/profile', stdin=stdin_fd, stdout=stdout_fd, stderr=stderr_fd)
await proc.wait()
return Message.new_method_return(msg, signature='i', body=[proc.returncode])
bl = BusLogic()
asyncio.get_event_loop().run_until_complete(bl.loop())
pass
# The Ship's Cat -- Container Configuration
# (c) 2024 Martin Mareš <mj@ucw.cz>
from pwd import getpwnam
from grp import getgrnam
import tomllib
from typing import List
from .json_walker import Walker, WalkerError
class ConfigError(RuntimeError):
pass
class ContainerConfig:
root_dir: str
image: str
allowed_users: List[int]
allowed_groups: List[int]
@classmethod
def load(cls, filename: str) -> 'ContainerConfig':
try:
with open(filename, 'rb') as f:
root = tomllib.load(f)
except FileNotFoundError:
raise ConfigError('Cannot open {filename}')
except tomllib.TOMLDecodeError as e:
raise ConfigError(f'Cannot parse {filename}: {e}')
cc = ContainerConfig()
try:
cc.parse(Walker(root))
except WalkerError as e:
raise ConfigError(f'Cannot parse {filename}: {e}')
return cc
def parse(self, walker: Walker) -> None:
with walker.enter_object() as w:
self.root_dir = w['root_dir'].as_str()
self.image = w['image'].as_str()
self.allowed_users = []
for wu in w['allowed_users'].default_to([]).array_values():
name = wu.as_str()
try:
pwd = getpwnam(name)
except KeyError:
wu.raise_error(f'Unknown user {name}')
self.allowed_users.append(pwd.pw_uid)
self.allowed_groups = []
for wu in w['allowed_groups'].default_to([]).array_values():
name = wu.as_str()
try:
grp = getgrnam(name)
except KeyError:
wu.raise_error(f'Unknown group {name}')
self.allowed_groups.append(grp.gr_gid)
# A simple module for walking through a parsed JSON file
# (c) 2023 Martin Mareš <mj@ucw.cz>
from collections.abc import Iterator
from enum import Enum
import re
from typing import Any, Optional, NoReturn, Tuple, Set, Type, TypeVar
T = TypeVar('T')
E = TypeVar('E', bound=Enum)
class MissingValue:
pass
class Walker:
obj: Any
parent: Optional['Walker'] = None
custom_context: str = ""
def __init__(self, root: Any) -> None:
self.obj = root
def context(self) -> str:
return 'root'
def raise_error(self, msg) -> NoReturn:
raise WalkerError(self, msg)
def is_null(self) -> bool:
return self.obj is None
def is_str(self) -> bool:
return isinstance(self.obj, str)
def is_int(self) -> bool:
return isinstance(self.obj, int)
def is_missing(self) -> bool:
return isinstance(self.obj, MissingValue)
def is_present(self) -> bool:
return not isinstance(self.obj, MissingValue)
def is_bool(self) -> bool:
return isinstance(self.obj, bool)
def is_array(self) -> bool:
return isinstance(self.obj, list)
def is_object(self) -> bool:
return isinstance(self.obj, dict)
def expect_present(self):
if self.is_missing():
self.raise_error('Mandatory key is missing')
def as_type(self, typ: Type[T], msg: str, default: Optional[T] = None) -> T:
if isinstance(self.obj, typ):
return self.obj
elif self.is_missing():
if default is None:
self.raise_error('Mandatory key is missing')
else:
return default
else:
self.raise_error(msg)
def as_optional_type(self, typ: Type[T], msg: str) -> Optional[T]:
if isinstance(self.obj, typ):
return self.obj
elif self.is_missing():
return None
else:
self.raise_error(msg)
def as_str(self, default: Optional[str] = None) -> str:
return self.as_type(str, 'Expected a string', default)
def as_int(self, default: Optional[int] = None) -> int:
return self.as_type(int, 'Expected an integer', default)
def as_bool(self, default: Optional[bool] = None) -> bool:
return self.as_type(bool, 'Expected a Boolean value', default)
def as_enum(self, enum: Type[E], default: Optional[E] = None) -> E:
if self.is_missing() and default is not None:
return default
try:
return enum(self.as_str())
except ValueError:
self.raise_error('Must be one of ' + '/'.join(sorted(enum.__members__.values()))) # FIXME: type
def as_optional_str(self) -> Optional[str]:
return self.as_optional_type(str, 'Expected a string')
def as_optional_int(self) -> Optional[int]:
return self.as_optional_type(int, 'Expected an integer')
def as_optional_bool(self) -> Optional[bool]:
return self.as_optional_type(bool, 'Expected a Boolean value')
def array_values(self) -> Iterator['WalkerInArray']:
ary = self.as_type(list, 'Expected an array')
for i, obj in enumerate(ary):
yield WalkerInArray(obj, self, i)
def object_values(self) -> Iterator['WalkerInObject']:
dct = self.as_type(dict, 'Expected an object')
for key, obj in dct.items():
yield WalkerInObject(obj, self, key)
def object_items(self) -> Iterator[Tuple[str, 'WalkerInObject']]:
dct = self.as_type(dict, 'Expected an object')
for key, obj in dct.items():
yield key, WalkerInObject(obj, self, key)
def enter_object(self) -> 'ObjectWalker':
dct = self.as_type(dict, 'Expected an object')
return ObjectWalker(dct, self)
def default_to(self, default) -> 'Walker': # XXX: Use Self when available
if self.is_missing():
self.obj = default
return self
def set_custom_context(self, ctx: str) -> None:
self.custom_context = ctx
class WalkerInArray(Walker):
index: int
def __init__(self, obj: Any, parent: Walker, index: int) -> None:
super().__init__(obj)
self.parent = parent
self.index = index
def context(self) -> str:
return f'[{self.index}]'
class WalkerInObject(Walker):
key: str
def __init__(self, obj: Any, parent: Walker, key: str) -> None:
super().__init__(obj)
self.parent = parent
self.key = key
def context(self) -> str:
if re.fullmatch(r'\w+', self.key):
return f'.{self.key}'
else:
quoted_key = re.sub(r'(\\|")', r'\\\1', self.key)
return f'."{quoted_key}"'
def unexpected(self) -> NoReturn:
self.raise_error('Unexpected key')
class ObjectWalker(Walker):
referenced_keys: Set[str]
def __init__(self, obj: Any, parent: Walker) -> None:
super().__init__(obj)
assert isinstance(obj, dict)
self.parent = parent
self.referenced_keys = set()
def __enter__(self) -> 'ObjectWalker':
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
if exc_type is None:
self.assert_no_other_keys()
def context(self) -> str:
return ""
def __contains__(self, key: str) -> bool:
return key in self.obj
def __getitem__(self, key: str) -> WalkerInObject:
if key in self.obj:
self.referenced_keys.add(key)
return WalkerInObject(self.obj[key], self, key)
else:
return WalkerInObject(MissingValue(), self, key)
def assert_no_other_keys(self) -> None:
for key, val in self.obj.items():
if key not in self.referenced_keys:
WalkerInObject(val, self, key).unexpected()
class WalkerError(Exception):
walker: Walker
msg: str
def __init__(self, walker: Walker, msg: str) -> None:
self.walker = walker
self.msg = msg
def __str__(self) -> str:
contexts = []
w: Optional[Walker] = self.walker
while w is not None:
contexts.append(w.context())
contexts.append(w.custom_context)
w = w.parent
return "".join(reversed(contexts)) + ": " + self.msg
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment