Select Git revision
-
Martin Mareš authored
Od nynějška je vždy inicializováno mo.now a je nastavené české locale.
Martin Mareš authoredOd nynějška je vždy inicializováno mo.now a je nastavené české locale.
vm.py 48.65 KiB
#!/bin/python3
import subprocess
import sys, os, pathlib
import argparse
import time
import json
from dataclasses import dataclass
import functools
from typing import Optional, Any
import traceback
########################
# Global configuration #
########################
socket_path = '.socket'
is_daemon = False
if __name__ == "__main__":
if len(sys.argv)>=2 and sys.argv[1]=="server":
is_daemon = True
root_folder="/mnt/virtual/"
backend_qemu = "qemu"
backend_vbox = "vbox"
backend = os.environ.get("VM_BACKEND", backend_qemu)
assert backend in [backend_qemu, backend_vbox]
net_prefix = "10.37" if backend == backend_vbox else "10.38"
boot_id = open("/proc/sys/kernel/random/boot_id").read().strip()+"-"+backend
force=False
no_daemon=False
verbose=1 # daemon is verbose
########################
# Utils #
########################
class S():
'''
Class for nice formated long text area.
Use S-"""
Text
"""
It will remove all tailing and leading empty lines.
Then it will remove as many posiible leading spaces
from each lines (from each the same number of spaces).
'''
def __sub__(_, a):
lines = a.split("\n")
while len(lines) and lines[0].strip() == "":
lines.pop(0)
while len(lines) and lines[-1].strip() == "":
lines.pop(-1)
def space_count(s):
r = 0
while r < len(s) and s[r]==' ':
r += 1
return r
to_remove = min(space_count(l) for l in lines if l.strip() != "")
return "\n".join("" if len(l) < to_remove else l[to_remove:] for l in lines)
S=S()
def escape_sh(*arg):
return " ".join("'" + s.replace("'", "'\"'\"'") + "'" for s in arg)
def r(*arg, check=None, stdin=None):
if check is None:
check = not force
if verbose: print(">", " ".join(arg))
if stdin is None:
subprocess.run(arg, check=check)
else:
subprocess.run(arg, check=check, input=stdin)
def nft(rules):
if verbose: print("\n".join("@ "+i for i in rules.split("\n")))
subprocess.run(["nft", rules], check=not force)
def get_spec(f):
import inspect
if 'spec' not in f.__dict__:
f.spec = inspect.getfullargspec(f)
return f.spec
def internal_cmd(f):
return cmd(f, internal=True)
def cmd(f, internal=False):
if f is None: return f
import inspect
spec = get_spec(f)
(subcommands_internal if internal else subcommands)[f.__name__] = f
f.parser = (subparsers_internal if internal else subparsers).add_parser(f.__name__)
# print()
# print(f)
#fprint(spec)
def process_arg(name, has_default, default):
annotation = spec.annotations.get(name, None)
if annotation in [str, int, float]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=annotation,
)
if annotation in [list[str], list[int], list[float]]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=annotation.__args__[0],
action="append",
)
if annotation in [bool]:
if has_default and default is True:
f.parser.add_argument(
"--no_"+arg,
action="store_false",
dest=arg,
default=True,
)
else:
f.parser.add_argument(
"--"+arg,
action="store_true",
)
if annotation in [Identification]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=str,
)
for i, arg in enumerate(spec.args):
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
default = None
if has_default:
default = spec.defaults[i - len(spec.args) + len(spec.defaults)]
process_arg(arg, has_default, default)
for i, arg in enumerate(spec.kwonlyargs):
default = spec.kwonlydefaults[arg]
process_arg(arg, True, default)
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
f.parser.add_argument(
arg,
type=str, nargs=argparse.REMAINDER,
)
return f
def random_passwd():
passwd_chars = "0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM,.;?!/\\<>(){}[]'\"`|~@#$%^&*-_+-="
return "".join(passwd_chars[i%len(passwd_chars)] for i in os.urandom(50))
@dataclass
class Ucred:
pid: int
uid: int
gid: int
def my_ucred():
return Ucred(os.getpid(), os.getuid(), os.getgid())
def recvall(sock):
BUFF_SIZE = 4096
data = bytearray()
while True:
packet = sock.recv(BUFF_SIZE)
if len(packet) == 0:
break
data.extend(packet)
return data
daemon_funcs = {}
def ask_server(in_struct):
import socket
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.connect(root_folder+socket_path)
if verbose: print("ASK", in_struct)
in_data = json.dumps(in_struct).encode('utf-8')
connection.sendall(in_data)
connection.shutdown(socket.SHUT_WR)
out_data = recvall(connection)
out_struct = json.loads(out_data)
if verbose: print("->", out_struct)
return out_struct
def daemon(root_only=False):
def ll(f):
spec = get_spec(f)
assert spec.args[0] == 'ucred'
spec = spec._replace(args=spec.args[1:])
if root_only:
@functools.wraps(f)
def l(ucred, *arg, **kvarg):
assert ucred.uid == 0
f(ucred, *arg, **kvarg)
l.spec = get_spec(f)
daemon_funcs[f.__name__] = l
else:
daemon_funcs[f.__name__] = f
if is_daemon:
return f
# TODO validate types
if root_only and my_ucred().uid != 0:
return None
@functools.wraps(f)
def l(*arg, **kvarg):
if no_daemon:
f(my_ucred(), *arg, **kvarg)
r = ask_server({"fname":f.__name__, "arg": arg, "kvarg": kvarg, "backend": backend})
if "exception" in r:
print(r["exception"], file=sys.stderr)
exit(1)
return r["return"]
l.spec = spec
return l
return ll
class Identification:
vm: str
user: Optional[str]
display: Optional[str]
wayland_display: Optional[str]
sway_socket: Optional[str]
vnc_port: Optional[int]
def __init__(self, vm: str, user: Optional[str] = None, display: Optional[str] = None, wayland_display: Optional[str] = None, sway_socket: Optional[str] = None, vnc_port: Optional[int] = None):
self.vm = vm
self.user = user
self.display = display
self.wayland_display = wayland_display
self.sway_socket = sway_socket
self.vnc_port = vnc_port
def __getitem__(self, key):
return [self.vm, self.user][key]
def __str__(self):
out = self.vm
if self.user is not None:
out = f"{self.user}@{out}"
if self.vnc_port:
out += f":{self.vnc_port}"
if self.display:
out += f":[{self.display}]"
if self.wayland_display:
out += f":wayland[{self.wayland_display}]"
if self.sway_socket:
out += f":sway[{self.sway_socket}]"
return out
def set_default_user(self, user: str = "u"):
if self.user is None:
self.user = user
return self
########################
# Argparser #
########################
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="commands", dest="subcommand")
subcommands = {}
parser_internal = subparsers.add_parser("internal")
subparsers_internal = parser_internal.add_subparsers(help="internal_commands", dest="subcommand_internal")
subcommands_internal = {}
parser.add_argument("-f", "--force", action='store_true')
parser.add_argument("-r", "--root_folder", type=str)
parser.add_argument("-v", "--verbose", action='count')
state_not_registered = "not registered"
state_powered_off = "powered off"
state_aborted = "aborted"
state_saved = "saved"
state_running = "running"
state_paused = "paused"
#########################
# Name, id, dir … tools #
#########################
def vm_dir(vm: str):
return f"{root_folder}/{vm}.vm/"
@internal_cmd
def name_to_id(name: str) -> str:
assert is_valid_id(name) or is_valid_name(name)
if is_valid_name(name):
l = str(pathlib.Path(root_folder+name+".vm").readlink())
assert l[-3:] == ".vm"
name = l[:-3]
assert is_valid_id(name)
assert not os.path.islink(name+".vm")
return name
@internal_cmd
def name(vm: str) -> str:
vm = name_to_id(vm)
return open(vm_dir(vm)+"name").read().strip()
def is_valid_name(name):
return all(i.isalpha() or i.isnumeric() or i in "-_" for i in name) and any(i.isalpha() for i in name)
def is_valid_id(id):
return all(i.isnumeric() for i in id)
def all_virtuals():
for f in os.listdir(root_folder):
if f.endswith(".vm"):
vm = f[:-3]
if is_valid_id(vm):
yield vm
@internal_cmd
def is_android(vm: str):
return os.path.exists(vm_dir(vm)+"is_android")
@daemon()
def has_read_acces(ucred, vm: str):
# TODO!
return True
@daemon()
def has_write_acces(ucred, vm: str):
# TODO!
return True
########################
# Low-level API #
########################
@internal_cmd
def get_ip(vm: str) -> str:
network_dir = vm_dir(vm)+"network/"
net_id = open(network_dir+"net_id").read().strip()
return f'{net_prefix}.{net_id}.150'
@internal_cmd
def get_permanency(vm: str):
try:
return open(vm_dir(vm)+"permanency").read().strip()
except FileNotFoundError:
return "undef"
@internal_cmd
@daemon()
def set_permanency(ucred, vm: str, permanency: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
assert permanency in ["tmp", "stable", "prepared"]
if ucred.uid != 0:
assert permanency in ["tmp", "stable"]
with open(vm_dir(vm)+"permanency", "w") as f: f.write(permanency)
@cmd
def give_to_user(vm: str, uid: int, gid: Optional[int] = None):
if gid is None:
import pwd
gid = pwd.getpwuid(uid).pw_gid
vm = name_to_id(vm)
if not is_android(vm):
os.chown(vm_dir(vm)+"id_ed25519", uid, gid)
@internal_cmd
@daemon()
def start(ucred, vm: str, display: bool = None):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
if backend == backend_vbox:
if state(ucred, vm) == state_not_registered:
register_vm(ucred, vm)
if not os.path.exists(vm_dir(vm)+"network/boot_id") or open(vm_dir(vm)+"network/boot_id", "r").read().strip() != boot_id:
create_net(ucred, vm)
if backend == backend_vbox:
r("VBoxManage", "startvm", vm, "--type=headless")
elif backend == backend_qemu:
if not os.path.exists(vm_dir(vm)+"OVMF.fd"):
r("cp", "/usr/share/edk2-ovmf/x64/OVMF.fd", vm_dir(vm)+"OVMF.fd")
with open(vm_dir(vm)+"run-qemu", "w") as f:
cmd = S-f"""
qemu-system-x86_64 -enable-kvm \\
-device usb-ehci \\
-m 5000M -cpu host -smp 4 -cpu host -smp 4 \\
-monitor unix:qemu-monitor,server,nowait \\
-drive if=pflash,format=raw,file=OVMF.fd \\
-drive file=img,index=0,media=disk,format=raw \\
-nographic \\
"""+'\n'
cmd += f'-device virtio-net,netdev=network0 -netdev tap,id=network0,ifname={open(vm_dir(vm)+"network/interface").read().strip()},script=network/up.sh,downscript=network/down.sh \\\n'
if display:
cmd += "-vnc unix:qemu-vnc,power-control=on \\\n"
if is_android(vm):
cmd += f'-vga vmware\\\n'
f.write(cmd+"\n")
r("systemctl", "start", f"vm-qemu@{vm}")
else:
raise NotImplementedError()
if get_permanency(vm).startswith("init "):
set_permanency(ucred, vm, "tmp")
########################
# Backend control #
########################
@internal_cmd
@daemon()
def state(ucred, vm: str):
vm = name_to_id(vm)
assert has_read_acces(ucred, vm)
if backend == backend_vbox:
p = subprocess.run(["VBoxManage", "showvminfo", vm], capture_output=True, encoding='utf8')
if "Could not find a registered machine named " in p.stderr:
return state_not_registered
for l in p.stdout.split('\n'):
if l.startswith("State:"):
return l[10:].split("(")[0].strip()
raise RuntimeError(p.stderr)
elif backend == backend_qemu:
p = subprocess.run(["systemctl", "show", f"vm-qemu@{vm}.service", "--property=ActiveState"], capture_output=True, encoding='utf8', check=True)
return p.stdout.split("=")[1].strip()
else:
raise NotImplementedError()
@internal_cmd
@daemon()
def register_vm(ucred: Ucred, vm: str):
vm = name_to_id(vm)
assert has_read_acces(ucred, vm)
if backend == backend_vbox:
r('VBoxManage', 'internalcommands', 'sethduuid', target_dir+"disk.vmdk")
r('VBoxManage', 'createvm', f'--name={target}', f"--basefolder={vm_dir(target)}", "--register")
r('VBoxManage', 'modifyvm', target, '--firmware=efi')
r('VBoxManage', 'modifyvm', target, '--memory=4096')
r("VBoxManage", "storagectl", target, '--name', "SATA Controller", '--add', 'sata', '--bootable', 'on')
r("VBoxManage", "storageattach", target, "--storagectl", "SATA Controller", "--port", "0", "--device", "0", "--type", "hdd", "--medium", f"{vm_dir(target)}/disk.vmdk")
@internal_cmd
@daemon()
def unregister_vm(ucred: Ucred, vm: str):
if backend == backend_vbox:
r("VBoxManage", "unregistervm", vm, "--delete-all")
@cmd
@daemon()
def poweroff(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
if backend == backend_vbox:
r("VBoxManage", "controlvm", vm, "acpipowerbutton")
elif backend == backend_qemu:
qemu_cmd(vm, "system_powerdown")
else:
raise NotImplementedError()
@cmd
@daemon()
def kill(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
if backend == backend_qemu:
r("systemctl", "stop", f"vm-qemu@{vm}")
else:
raise NotImplementedError()
@cmd
@daemon()
def pause(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
if backend == backend_vbox:
r("VBoxManage", "controlvm", vm, "pause")
elif backend == backend_qemu:
qemu_cmd(vm, "stop")
else:
raise NotImplementedError()
@cmd
@daemon()
def resume(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
if backend == backend_vbox:
r("VBoxManage", "controlvm", vm, "resume")
elif backend == backend_qemu:
qemu_cmd(vm, "cont")
else:
raise NotImplementedError()
@cmd
def poweroff_and_wait(vm: str):
vm = name_to_id(vm)
if state(vm) in [state_powered_off, state_not_registered, state_aborted, state_saved, 'inactive', 'failed']: return
poweroff(vm)
while True:
time.sleep(0.1)
s = state(vm)
print("state", s)
if s in [state_powered_off, 'inactive']:
time.sleep(1)
return
def poweroff_and_wait_daemon(ucred, vm: str):
vm = name_to_id(vm)
if state(ucred, vm) in [state_powered_off, state_not_registered, state_aborted, state_saved, 'inactive']: return
poweroff(ucred, vm)
while True:
time.sleep(0.1)
s = state(ucred, vm)
print("state", s)
if s == state_powered_off:
time.sleep(1)
return
########################
# Init vm #
########################
@internal_cmd
@daemon(root_only=True)
def create_from_img(ucred: Ucred, target: str, new_ssh: bool = True, target_name = None):
target_name = target_name or target
target = name_to_id(target)
target_dir = vm_dir(target)
vnc_passwd = random_passwd()
with open(target_dir+"permanency", "w") as f: f.write(f"init {int(time.time())}")
with open(target_dir+"vnc_passwd", "w") as f: f.write(vnc_passwd)
if not is_android(target):
mount_dir = target_dir+"mount/"
os.mkdir(mount_dir)
r('mount', '-o', 'nosymfollow,loop,offset=210763776', '--type', 'ext4', target_dir+'img', mount_dir)
try:
with open(mount_dir+"/etc/hostname", "w") as f: f.write(target_name+"\n")
if new_ssh:
r("ssh-keygen", "-t", "ed25519", "-C", f"virtual for root,u@vm_{target}", "-f", target_dir+"id_ed25519", "-P", "")
for place in ["/root/.ssh/", "/home/u/.ssh/"]:
if os.path.isfile(mount_dir+place+"/authorized_keys"):
with open(mount_dir+place+"/authorized_keys", "w") as f:
f.write(open(target_dir+"id_ed25519.pub", "r").read())
r("ssh-keygen", "-t", "ed25519", "-C", f"hostkey of root,u@vm_{target}", "-f", target_dir+"ssh_host_ed25519_key", "-P", "")
for suffix in ["", ".pub"]:
with open(mount_dir+"/etc/ssh/ssh_host_ed25519_key"+suffix, "w") as f:
f.write(open(target_dir+"ssh_host_ed25519_key"+suffix, "r").read())
with open(target_dir+"known_hosts", "w") as f:
f.write(f"vm_{target} "+" ".join(open(target_dir+"ssh_host_ed25519_key"+suffix, "r").read().split()[:2]))
with open(mount_dir+"vnc_passwd", "w") as f: f.write(vnc_passwd)
finally:
r('umount', mount_dir)
os.rmdir(mount_dir)
with open(target_dir+"disk.vmdk", "w") as f:
f.write(S-f"""
# Disk DescriptorFile
version=1
CID=076a5ce7
parentCID=ffffffff
createType="fullDevice"
# Extent description
RW 209715200 FLAT "{target_dir}/img" 0
# The disk Data Base
#DDB
ddb.virtualHWVersion = "4"
ddb.adapterType="ide"
ddb.geometry.cylinders="16383"
ddb.geometry.heads="16"
ddb.geometry.sectors="63"
ddb.uuid.image="24823fb3-a4b8-4c04-a2f8-ef70fa38e6ab"
ddb.uuid.parent="00000000-0000-0000-0000-000000000000"
ddb.uuid.modification="41a03768-211c-4638-b47e-ce1ee3fe7200"
ddb.uuid.parentmodification="00000000-0000-0000-0000-000000000000"
ddb.geometry.biosCylinders="1024"
ddb.geometry.biosHeads="255"
ddb.geometry.biosSectors="63"
""")
register_vm(my_ucred(), target)
create_net(my_ucred(), target)
@internal_cmd
@daemon()
def clone(ucred, target: str, base: str = "base") -> str:
base = name_to_id(base)
assert has_read_acces(ucred, base)
andr = is_android(base)
target = clone_copy(ucred, target, vm_dir(base)+"img", vm_dir(base)+"OVMF.fd" if andr else None, andr)
give_to_user(target, ucred.uid, ucred.gid)
return target
def create_vm_dir(target: str) -> str:
import random
target_id = f"{random.randint(0, 999999):06}"
assert not os.path.exists(target_id+".vm")
assert not os.path.exists(target+".vm")
target_dir = target_id + ".vm/"
os.mkdir(target_dir)
r('ln', '-sr', target_id+".vm", target+".vm")
with open(vm_dir(target)+"name", "w") as f: f.write(target)
return target_id
@internal_cmd
@daemon(root_only=True)
def clone_copy(ucred, target: str, img_path: str, ovmf_path: Optional[str] = None, is_android: bool = False) -> str:
assert is_valid_name(target)
target_id = create_vm_dir(target)
r('cp', '--reflink', "-n", img_path, vm_dir(target_id)+"img")
if ovmf_path:
r('cp', '--reflink', "-n", ovmf_path, vm_dir(target_id)+"OVMF.fd")
if is_android:
r('touch', vm_dir(target_id)+"is_android")
create_from_img(ucred, target_id, target_name=target)
return target_id
########################
# Networking #
########################
@internal_cmd
@daemon()
def remove_net(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
network_dir = vm_dir(vm)+"network/"
if os.path.isdir(network_dir):
if backend == backend_vbox:
if os.path.isfile(network_dir+"interface"):
interface = open(network_dir+"interface").read().strip()
if open(network_dir+"boot_id", "r").read().strip() == boot_id:
if os.path.isfile(network_dir+"dhcp.pid"):
r("kill", open(network_dir+"dhcp.pid").read().strip(), check=False)
net_id = int(open(network_dir+"net_id", "r").read())
r("ip", "addr", "del", f"{net_prefix}.{net_id}.1/24", "dev", interface)
r("VBoxManage", "hostonlyif", "remove", interface)
elif backend == backend_qemu:
...
else:
raise NotImplementedError()
r('rm', '-r', network_dir)
@internal_cmd
@daemon()
def create_net(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
remove_net(ucred, vm)
network_dir = vm_dir(vm)+"network/"
os.mkdir(network_dir)
if backend == backend_vbox:
p = subprocess.run(["VBoxManage", "hostonlyif", "create"], capture_output=True, encoding='utf8')
if p.returncode:
print(p.stderr, file=sys.stderr)
raise RuntimeError()
interface = p.stdout.split("'")[1]
net_id = int(interface[7:])
if verbose: print("interface", interface)
r("VBoxManage", "hostonlyif", "ipconfig", interface, f"--ip={net_prefix}.{net_id}.1", "--netmask=255.255.255.0")
if backend == backend_qemu:
import random
net_id = random.randint(0, 255) # TODO allocation
interface = f"qemu{net_id}"
with open(network_dir+"interface", "w") as f:
f.write(interface)
r("sysctl", "net.ipv4.ip_forward=1")
with open(network_dir+"boot_id", "w") as f: f.write(boot_id)
with open(network_dir+"net_id", "w") as f: f.write(str(net_id))
#r("ip", "link", "add", f"v{net_id}h", "type", "veth", "peer", "name", f"v{net_id}g")
#r("ip", "link", "add", f"v{net_id}b", "type", "bridge")
#r("ifconfig", f"v{net_id}h", "up")
#r("ip", "link", "set", f"v{net_id}g", "master", f"v{net_id}b")
nft(f"")
nft(S-f"""
add chain inet filter input_from_{interface}
add chain inet filter forward_from_{interface}
add chain inet filter forward_to_{interface}
insert rule inet filter input iifname {interface} jump input_from_{interface}
insert rule inet filter forward iifname {interface} jump forward_from_{interface}
insert rule inet filter forward oifname {interface} jump forward_to_{interface}
""")
modify_net(ucred, vm)
#nft("add rule inet filter forward iifname wlp1s0 accept")
#r("VBoxManage", "modifyvm", vm, "--nic1=bridged", f"--bridgeadapter1=v{net_id}b")
with open(network_dir+"dhcp.lp", "w") as f: f.write("authoring-byte-order little-endian;")
with open(network_dir+"dhcp.pid", "w") as f: f.write("")
with open(network_dir+"dhcp.config", "w") as f:
f.write(S-f"""
option domain-name-servers 8.8.8.8, 8.8.4.4;
option subnet-mask 255.255.255.0;
option routers {net_prefix}.{net_id}.1;
subnet {net_prefix}.{net_id}.0 netmask 255.255.255.0 {{
range {net_prefix}.{net_id}.150 {net_prefix}.{net_id}.250;
}}
""")
if backend == backend_vbox:
r("VBoxManage", "modifyvm", vm, "--nic1=hostonly", f"--host-only-adapter1={interface}")
r("ifconfig", interface, f"{net_prefix}.{net_id}.1", "netmask", "255.255.255.0", "up");
r("dhcpd", "-4", "-cf", network_dir+"dhcp.config", "-pf", network_dir+"dhcp.pid", "-lf", network_dir+"dhcp.lp", interface)
if backend == backend_qemu:
with open(network_dir+"up.sh", "w") as f:
f.write(S-f"""
#!/bin/sh
ifconfig {interface} {net_prefix}.{net_id}.1 netmask 255.255.255.0 up
dhcpd -4 -cf network/dhcp.config -pf network/dhcp.pid -lf network/dhcp.lp {interface}
""")
with open(network_dir+"down.sh", "w") as f:
f.write(S-f"""
#!/bin/sh
""")
r("chmod", "+x", network_dir+"up.sh", network_dir+"down.sh")
@internal_cmd
@daemon()
def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool = False, pc_all: bool = False):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
assert not (pc_all and not pc)
network_dir = vm_dir(vm)+"network/"
interface = open(network_dir+"interface").read().strip()
net_id = open(network_dir+"net_id").read().strip()
assert open(network_dir+"boot_id", "r").read().strip() == boot_id
local_ips = "{10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16}"
if wan:
pass
todo = [f"flush chain inet filter input_from_{interface}",
f"flush chain inet filter forward_from_{interface}",
f"flush chain inet filter forward_to_{interface}"]
todo.append(f"add rule inet filter input_from_{interface} ct state {{ established, related }} accept")
if not pc:
todo.append(f"add rule inet filter input_from_{interface} drop")
if pc_all:
todo.append(f"add rule inet filter input_from_{interface} accept")
if lan:
todo.append(f"add rule inet filter forward_from_{interface} ip daddr {local_ips} accept")
todo.append(f"add rule inet filter forward_to_{interface} ip saddr {local_ips} ct state {{ established, related }} accept")
todo.append(f"add rule inet filter forward_to_{interface} ip saddr {local_ips} drop")
else:
todo.append(f"add rule inet filter forward_from_{interface} ip daddr {local_ips} drop")
todo.append(f"add rule inet filter forward_to_{interface} ip saddr {local_ips} drop")
if wan:
todo.append(f"add rule inet filter forward_from_{interface} accept")
todo.append(f"add rule inet filter forward_to_{interface} ct state {{ established, related }} accept")
else:
todo.append(f"add rule inet filter forward_from_{interface} drop")
todo.append(f"add rule inet filter forward_to_{interface} drop")
nft("\n".join(todo))
########################
# Using vm #
########################
def ssh_args(ident: Identification, *arg_in: tuple[str, ...]):
ident.set_default_user()
vm, user = ident
vm = name_to_id(vm)
set_env = []
if ident.display:
set_env.append(f"export DISPLAY={ident.display};")
if ident.wayland_display:
set_env.append(f"export WAYLAND_DISPLAY={ident.wayland_display};")
if ident.sway_socket:
set_env.append(f"export SWAYSOCK={ident.sway_socket};")
arg = ['ssh', "-i", vm_dir(vm)+"id_ed25519",
"-o", f"UserKnownHostsFile={vm_dir(vm)}/known_hosts", "-o", "HostKeyAlgorithms=ssh-ed25519", "-o", f"HostKeyAlias=vm_{vm}",
f"{user}@{get_ip(vm)}"]
while len(arg_in) and arg_in[0].startswith("-"):
tmp, *arg_in = arg_in
if tmp == '--':
break
arg.append(tmp)
if len(arg_in):
arg += ["--"] + set_env + list(arg_in)
else:
if set_env:
arg += ["-t", "--"] + set_env + ["bash"]
if verbose: print(">>", " ".join(arg))
return arg
@cmd
def ssh(ident: Identification, *arg: tuple[str, ...]):
subprocess.run(ssh_args(ident, *arg))
@cmd
def terminal_ssh(ident: Identification, *arg: tuple[str, ...], terminal: str = "alacritty"):
subprocess.run([terminal, "-e"] + ssh_args(ident, *arg))
sshfs_root = lambda: os.environ["HOME"]+f"/m/vm/"
@internal_cmd
def sshfs_mountdir(ident: Identification):
return sshfs_root()+f"/{ident.user or 'u'}@{name(ident.vm)}"
@cmd
def sshfs(ident: Identification):
vm, user = ident
if user is None:
sshfs(Identification(vm, "root"))
sshfs(Identification(vm, "u"))
return
mount_dir = sshfs_mountdir(ident)
if os.path.isdir(mount_dir) and len(os.listdir(mount_dir)) != 0:
return
r("mkdir", "-p", mount_dir)
r("sshfs", "-o", "transform_symlinks", f"{user}@{get_ip(vm)}:/", mount_dir, "-o", f"ssh_command=ssh -i {vm_dir(vm)}/id_ed25519 -o UserKnownHostsFile={vm_dir(vm)}/known_hosts -o HostKeyAlgorithms=ssh-ed25519 -o HostKeyAlias=vm_{vm}")
if not os.path.islink(mount_dir+'~'):
home_dir = "/root" if user == "root" else f"/home/{user}"
r("ln", "-sr", mount_dir+home_dir, mount_dir+"~")
@cmd
def sshfs_clean():
root = sshfs_root()
for f in os.listdir(root):
if os.path.isdir(root+f) and len(os.listdir(root+f)) == 0:
r("rmdir", root+f)
for f in os.listdir(root):
if os.path.islink(root+f) and not pathlib.Path(root+f+"/").absolute().exists():
r("rm", root+f)
def get_vnc_client_env(ident):
vnc_client_env = os.environ.copy()
vnc_client_env["VNC_PASSWORD"] = open(vm_dir(ident.vm)+"vnc_passwd", "r").read().strip()
vnc_client_env["VNC_USERNAME"] = ident.user
vnc_client_env["VM_IDENT"] = str(ident)
print(vnc_client_env)
return vnc_client_env
vncviewer_args = ["-FullscreenSystemKeys=0", "-AcceptClipboard=0", "-SendClipboard=0"]
def start_vnc_server(ident: Identification, unit_name: str, wayland: bool = False) -> (Any, Identification):
import random
import psutil
ident.set_default_user()
if wayland:
# echo enable_auth=true; echo \"password=$(cat /vnc_passwd)\"; echo username=u
vnc_server = subprocess.Popen(ssh_args(ident, f"systemd-run --unit {unit_name} --user -P bash -c '(echo enable_auth=true; echo \"password=$(cat /vnc_passwd)\"; echo username=u) > ~/.config/wayvnc/config ; WLR_BACKENDS=headless WLR_LIBINPUT_NO_DEVICES=1 sway -c ~/.config/sway/vnc.config'"), stdout=subprocess.PIPE)
ident.wayland_display = vnc_server.stdout.readline().decode("utf-8").strip()
ident.sway_socket = vnc_server.stdout.readline().decode("utf-8").strip()
display_id = int(ident.wayland_display[8:])
ident.vnc_port = 5800+display_id
print(f"WAYLAND_DISPLAY={ident.wayland_display}")
print(f"SWAYSOCK={ident.sway_socket}")
else:
display_id = int(ident.display[:1]) if ident.display else random.randint(10, 50)
vnc_server = subprocess.Popen(ssh_args(ident, f"systemd-run --unit {unit_name} --user -P bash -c '(cat /vnc_passwd;echo; cat /vnc_passwd; echo;echo n) | vncpasswd; vncserver :{display_id}'"))
ident.display = f":{display_id}"
ident.vnc_port = 5900 + display_id
return vnc_server, ident
def start_vnc_client(ident: Identification, variant="vncviewer"):
#vnc_client = subprocess.Popen(["remmina", "vnc://"+get_ip(vm)+f"::{port}"], env=get_vnc_client_env(vm))
return subprocess.Popen(["vncviewer", get_ip(ident.vm)+f"::{ident.vnc_port}", *vncviewer_args], env=get_vnc_client_env(ident))
@cmd
def vncapp(ident: Identification, cmd: str, wayland: bool = False):
import random
import psutil
unit_id = random.randint(100000, 999999)
vnc_server, ident = start_vnc_server(ident, f"vncapp-vnc-{unit_id}", wayland=wayland)
time.sleep(1)
app = subprocess.Popen(ssh_args(ident, f"systemd-run --unit vncapp-app-{unit_id} --user -P -E DISPLAY={ident.display} -E WAYLAND_DISPLAY={ident.wayland_display} bash -c {escape_sh(cmd)}"));
vnc_client = start_vnc_client(ident)
def on_terminate(proc):
if verbose: print(f"KILLING ALL APPS because {proc} terminated")
vnc_server.send_signal(15)
vnc_client.send_signal(15)
app.send_signal(15)
psutil.wait_procs([vnc_client, app, vnc_server], callback=on_terminate)
ssh(ident, f"systemctl --user stop vncapp-vnc-{unit_id} vncapp-app-{unit_id}")
# WAYLAND_DISPLAY=wayland-1 SWAYSOCK=/run/user/1000/sway-ipc.1000.6544.sock swaymsg output HEADLESS-1 pos 0 0 res 1920x1080
@cmd
def waydroid(ident: Identification, *apps: tuple[str, ...]):
if len(apps):
return vncapp(ident, f"bin/waydroid-run {apps[0]}", wayland=True)
# return vncapp(vm, f"(sleep 14; waydroid app launch {apps[0]}) & waydroid show-full-ui", wayland=True)
else:
return vncapp(ident, f"bin/waydroid-run", wayland=True)
@cmd
def vncsession(ident: Identification, wayland: bool = False):
import random
import psutil
unit_id = random.randint(100000, 999999)
vnc_server, ident = start_vnc_server(ident, f"vncsession-{unit_id}", wayland=wayland)
time.sleep(1)
vnc_client = start_vnc_client(ident)
def on_terminate(proc):
if verbose: print("KILLING ALL APPS")
vnc_server.send_signal(15)
vnc_client.send_signal(15)
psutil.wait_procs([vnc_client, vnc_server], callback=on_terminate)
ssh(ident, f"systemctl --user stop vncsession-{unit_id}")
@cmd
def resize_wayland(ident: Identification, x: int = None, y: int = None):
if x is None or y is None:
win_place = dict(v.split('=') for v in subprocess.check_output(["xdotool", "getactivewindow", "getwindowgeometry", "--shell"]).decode("utf-8").split('\n') if v)
if x is None: x = int(win_place["WIDTH"]) - 2
if y is None: y = int(win_place["HEIGHT"]) - 2
ssh(ident, f"swaymsg output HEADLESS-1 pos 0 0 res {x}x{y}; pgrep waydroid-run | while read p; do echo reload > /proc/$p/fd/0; done");
@daemon()
def chown_qemu_vnc_sock(ucred, vm):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
r("chown", f"{ucred.uid}", vm_dir(vm)+"qemu-vnc")
@cmd
def vnc(ident: Identification):
vm, _ = ident
chown_qemu_vnc_sock(vm)
r("vncviewer", f"{vm_dir(vm)}/qemu-vnc", *vncviewer_args)
def str_remove_prefix(s, prefix):
assert s.startswith(prefix)
return s[len(prefix):]
@internal_cmd
def get_ident_by_window(win_id: int = None):
import psutil
if win_id is None:
win_id = int(subprocess.check_output(["xdotool", "getactivewindow"]).decode("utf-8"))
win_class = str_remove_prefix(subprocess.check_output(["xprop", "-id", str(win_id), "WM_CLASS"]).decode("utf-8"), "WM_CLASS(STRING) = ")
pid = int(str_remove_prefix(subprocess.check_output(["xprop", "-id", str(win_id), "_NET_WM_PID"]).decode("utf-8"), "_NET_WM_PID(CARDINAL) = "))
if '"Alacritty"' in win_class:
pass
else:
process = psutil.Process(pid=pid)
process_env: Dict = process.environ()
return extended_name(process_env["VM_IDENT"])
########################
# High-level api #
########################
def terminal_len(val: str) -> int:
# TODO
return len(val)
def format_table(table):
lengths = []
for r in table:
for i, c in enumerate(r):
if len(lengths) <= i:
lengths.append(0)
lengths[i] = max(lengths[i], terminal_len(c))
for r in table:
print(" ".join(c + " "*(clen-terminal_len(c)) for c, clen in zip(r, lengths)))
@cmd
def index(color: bool = True):
out = []
for vm in all_virtuals():
out_state = state(vm)
if out_state in [state_running, "active"]:
if try_ping(vm):
out_state += " (pinging)"
else:
out_state += " (NO PING)"
out_rw = ('w' if has_write_acces(vm) else 'r') if has_read_acces(vm) else '-'
out.append([vm, out_rw, name(vm), out_state, get_permanency(vm)])
return format_table(out)
@cmd
@daemon()
def clean(ucred):
for vm in all_virtuals():
if has_write_acces(ucred, vm):
permanency = get_permanency(vm)
if permanency in ["tmp", "prepared"]:
if state(ucred, vm) in [state_powered_off, state_not_registered, state_aborted, state_saved, "inactive"]:
remove_force(ucred, vm)
if permanency.startswith("init "):
init_time = int(permanency[5:])
if time.time() - init_time > 100:
remove_force(ucred, vm)
@internal_cmd
@daemon()
def try_ping(ucred, vm: str) -> bool:
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
from pythonping import ping
ip = get_ip(vm)
r = ping(ip, verbose=True, count=1, timeout=0.1)
return r.packets_lost == 0
@internal_cmd
def wait_started(vm: str):
while True:
if try_ping(vm):
break
time.sleep(0.5)
@internal_cmd
def start_and_wait(vm: str, display: bool = None):
vm = name_to_id(vm)
start(vm, display=display)
wait_started(vm)
@internal_cmd
@daemon()
def get_tmp_name(ucred) -> str:
last_id = int(open("last_tmp_id").read().strip())
last_id = (last_id+1) % 100
while os.path.exists(f"tmp{last_id}.vm"):
last_id = (last_id+1) % 1000
with open("last_tmp_id", "w") as f: f.write(str(last_id))
return f"tmp{last_id}"
if is_daemon:
class PreparedFork:
def __init__(self, vm):
self.vm = vm
self.start_monotonic_time = time.monotonic()
self.is_paused = False
prepared_forks = {}
@internal_cmd
@daemon()
def get_prepared_fork(ucred, base: str = "base") -> Optional[str]:
base = name_to_id(base)
assert has_read_acces(ucred, base)
if base in prepared_forks and len(prepared_forks[base]):
pf = prepared_forks[base][0]
prepared_forks[base] = prepared_forks[base][1:]
if pf.is_paused:
resume(ucred, pf.vm)
give_to_user(pf.vm, ucred.uid, ucred.gid)
set_permanency(ucred, pf.vm, "tmp")
return pf.vm
@cmd
@daemon(root_only=True)
def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]:
base = name_to_id(base)
if not base in prepared_forks:
prepared_forks[base] = []
if len(prepared_forks[base]) < count:
target = get_tmp_name(ucred)
target = clone(ucred, target, base=base)
start(ucred, target)
pf = PreparedFork(target)
prepared_forks[base].append(pf)
set_permanency(ucred, target, "prepared")
return target
@cmd
@daemon(root_only=True)
def pause_prepared_forks(ucred, base: str = "base", runtime: int = 30):
base = name_to_id(base)
for pf in prepared_forks[base]:
if not pf.is_paused and pf.start_monotonic_time + runtime <= time.monotonic():
pause(ucred, pf.vm)
pf.is_paused = True
@internal_cmd
def get_tmp_vm(base: str = "base"):
target = get_prepared_fork(base)
if not target:
target = get_tmp_name()
target = clone(target, base)
start(target)
return target
def multisplit_toplevel(s, *separators, brackets={'[':']'}):
out = [(None, None)]
stack = []
i = 0
begin = 0
while i < len(s):
if s[i] in brackets:
stack.append(s[i])
if s[i] in brackets.values():
assert s[i] == brackets[stack[-1]]
stack.pop()
elif not stack:
for separator in separators:
if s[i:].startswith(separator):
out[-1] = (out[-1][0], s[begin: i])
out.append((separator, None))
begin = i+1
i += 1
out[-1] = (out[-1][0], s[begin: i])
assert not stack
return out
def split_toplevel(s, separator):
return [v for sep, v in multisplit_toplevel(s, separator)]
def valid_bracket(s, brackets={'[':']'}):
stack = []
for c in s:
if c in brackets:
stack.append(c)
if c in brackets.values():
if c != brackets[stack[-1]]: return False
stack.pop()
return not stack
def is_in_bracket(s, left='[', right=']'):
return s[0] == left and s[-1] == right and valid_bracket(s[1:-1], {left: right})
@cmd
def extended_name(name: str) -> tuple[str, str]:
assert not is_daemon
vm = name
user = None
if len(split_toplevel(vm, "@"))==2:
user, vm = vm.split("@")
do_power_on = False
do_power_on_display= False
net_options = None
permanency = None
if len(multisplit_toplevel(vm, "!", "$"))==2:
(_, vm), (mark, tmp) = multisplit_toplevel(vm, "!", "$")
assert tmp == ""
do_power_on = True
do_power_on_display = mark = '$'
(_, vm), *modifires = multisplit_toplevel(vm, "~", "^", ":")
if is_in_bracket(vm, '[', ']'):
ident = get_ident_by_window(int(vm[1:-1]) if len(vm) >= 3 else None)
vm = ident.vm
if user is None: user = ident.user
elif len(split_toplevel(vm, "+"))==2:
base, vm = split_toplevel(vm, "+")
base_ident = extended_name(base or "base")
if not vm:
vm = get_tmp_vm(base_ident.vm)
else:
vm = clone(vm, base_ident.vm)
start(vm)
wait_started(vm)
ident = Identification(vm)
else:
vm = name_to_id(vm)
ident = Identification(vm)
for key, val in modifires:
if key == '~':
assert net_options is None
net_options = val
if key == '^':
assert permanency is None
permanency = val
if key == ':':
if val.isnumeric():
ident.vnc_port = int(val)
elif val.startswith("[") and val.endswith("]"):
ident.display = val[len("["):-len("]")]
elif val.startswith("wayland[") and val.endswith("]"):
ident.wayland_display = val[len("wayland["):-len("]")]
elif val.startswith("sway[") and val.endswith("]"):
ident.sway_socket = val[len("sway["):-len("]")]
else: assert False
if do_power_on:
if state(vm) != state_running:
if state(vm) != state_paused:
start_and_wait(vm, display=do_power_on_display)
else:
resume(vm)
if net_options is not None:
modify_net(vm, wan="w" in net_options, lan="l" in net_options, pc="p" in net_options or "P" in net_options, pc_all="P" in net_options)
if permanency is not None:
set_permanency(vm, permanency or "stable")
ident.vm = vm
ident.user = user
return ident
@cmd
def eval(ident: Identification):
return str(ident)
@daemon()
def remove_force(ucred, vm: str, keep_image: bool = False):
vm = name_to_id(vm)
if os.path.isfile(f"{vm}.vm/no_remove"):
raise RuntimeError("Delete foribidden")
assert has_write_acces(ucred, vm)
if state(ucred, vm) != state_not_registered:
unregister_vm(ucred, vm)
remove_net(ucred, vm)
if keep_image:
for f in os.listdir(vm+".vm"):
if f != 'img':
r("rm", "-r", vm+".vm/"+f)
else:
r("rm", name(vm)+".vm")
r("rm", "-r", vm+".vm")
@cmd
def remove(vm: str, keep_image: bool = False):
vm = name_to_id(vm)
poweroff_and_wait(vm)
remove_force(vm, keep_image=keep_image)
@cmd
@daemon(root_only=True)
def exit_server(ucred):
for vm in all_virtuals():
poweroff_and_wait_daemon(ucred, vm)
remove_net(ucred, vm)
unregister_vm(ucred, vm)
exit(0)
@cmd
def run(vm: str, prog: str, *arg: tuple[str, ...], gui: bool = False, out_file: list[str] = []):
arg = [prog, *arg]
import shutil
vm, user = extended_name(vm)
sshfs(vm, user=user)
mountdir = sshfs_mountdir(vm, user=user)
import tempfile
tmp_dir = tempfile.mkdtemp(prefix="run-", dir=mountdir+'~')
orig_file_to_copy = {}
used_files = set()
for i, it in enumerate(arg):
if len(it) and it[0] == '@':
if len(it) > 1 and it[1] == '@':
arg[i] = it[2:]
else:
orig_file = it[1:]
if orig_file not in orig_file_to_copy:
vm_file = orig_file.split("/")[-1]
assert vm_file not in used_files
used_files.add(vm_file)
orig_file_to_copy[orig_file] = vm_file
shutil.copy(orig_file, tmp_dir+"/"+vm_file)
arg[i] = vm_file
tmp_dir_name = tmp_dir.split('/')[-1]
if gui:
vncapp(vm, f"cd {tmp_dir_name}; {escape_sh(*arg)}", user=user)
else:
ssh(vm, "-t", f"cd {tmp_dir_name}; {escape_sh(*arg)}", user=user)
for it in out_file:
if ':' in it:
vm_file = it.split(':')[0]
host_file = it.split(':')[1]
else:
vm_file = it
host_file = it
shutil.copy(tmp_dir+"/"+vm_file, host_file)
@internal_cmd
def qemu_monitor(vm: str):
vm, _ = extended_name(vm)
r("socat", "-,echo=0,icanon=0", f"unix-connect:{vm_dir(vm)}qemu-monitor")
@internal_cmd
def qemu_cmd(vm: str, cmd: str):
import socket
client = socket.socket( socket.AF_UNIX)
print(f"{vm_dir(vm)}qemu-monitor")
client.connect(f"{vm_dir(vm)}qemu-monitor")
client.send((cmd+'\n').encode('utf-8'))
time.sleep(1)
r = client.recv(1024).decode('utf-8')
client.close()
print(r)
return "\n".join(r.split("\n")[2:-1])
##########################################################
def run_args(args):
import inspect
if verbose: print(args)
if not args.subcommand:
parser.print_help()
return
if args.subcommand == "internal":
if not args.subcommand_internal:
parser_internal.print_help()
return
f = subcommands_internal[args.subcommand_internal]
else:
f = subcommands[args.subcommand]
spec = get_spec(f)
f_kvarg = {}
f_arg = []
def process_arg(name, has_default, default):
if has_default and args.__dict__[name] is None:
return
val = args.__dict__[name]
annotation = spec.annotations.get(name, None)
if annotation in [Identification]:
val = extended_name(val)
if has_default:
if args.__dict__[name] is not None:
f_kvarg[name] = val
else:
f_arg.append(val)
for i, arg in enumerate(spec.args):
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
default = None
if has_default:
default = spec.defaults[i - len(spec.args) + len(spec.defaults)]
process_arg(arg, has_default, default)
for i, arg in enumerate(spec.kwonlyargs):
default = spec.kwonlydefaults[arg]
process_arg(arg, True, default)
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
if annotation == tuple[str, ...]:
f_arg += args.__dict__[arg]
if verbose: print(f_arg, f_kvarg)
r = f(*f_arg, **f_kvarg)
if r is not None:
if isinstance(r, tuple) or isinstance(r, list):
for i in r:
print(i)
else:
print(r)
@cmd
def run_periodically(delay: int, *argv: tuple[str, ...]):
print("RUN PERIODICALLY", argv)
args = parser.parse_args(argv)
while True:
time.sleep(delay)
try:
run_args(args)
except Exception as e:
traceback.print_exception(e)
def main_daemon():
import socket
import struct
try:
os.unlink(socket_path)
except OSError:
if os.path.exists(socket_path):
raise
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(socket_path)
os.chmod(socket_path, 0o777)
for arg in sys.argv[2:]:
p = subprocess.Popen([sys.argv[0], *arg.split(" ")])
try:
while True:
server.listen(1)
print('Server is listening for incoming connections...')
connection, client_address = server.accept()
print('Connection from', str(connection), connection, client_address)
_ucred = struct.Struct("=iII")
pid, uid, gid = _ucred.unpack(connection.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, _ucred.size))
ucred = Ucred(pid, uid, gid)
in_data = recvall(connection)
in_struct = json.loads(in_data)
print("IN", in_struct)
assert in_struct["backend"] == backend
f = daemon_funcs[in_struct["fname"]]
try:
res = f(ucred, *in_struct["arg"], **in_struct["kvarg"])
except Exception as e:
traceback.print_exception(e)
out_struct = {'exception': str(type(e))}
else:
out_struct = {'return': res}
print("OUT", out_struct)
out_data = json.dumps(out_struct).encode('utf-8')
sys.stdout.flush()
sys.stderr.flush()
try:
connection.sendall(out_data)
except Exception as e:
traceback.print_exception(e)
try:
connection.close()
except Exception as e:
traceback.print_exception(e)
finally:
os.unlink(socket_path)
exit(1)
def main():
args = parser.parse_args()
global verbose
verbose = args.verbose
force = args.force
if args.root_folder is not None:
root_folder = args.root_folder+"/"
run_args(args)
if __name__ == "__main__":
if is_daemon:
main_daemon()
else:
main()