Skip to content
Snippets Groups Projects
Commit 723fd1e3 authored by Jiří Kalvoda's avatar Jiří Kalvoda
Browse files

vm run and much more

parent 7968cbf0
No related branches found
No related tags found
No related merge requests found
#!/bin/python3
from subprocess import run, PIPE
import subprocess
import sys, os, pathlib
import argparse
......@@ -56,13 +55,13 @@ def r(*arg, check=None, stdin=None):
check = not force
if verbose: print(">", " ".join(arg))
if stdin is None:
run(arg, check=check)
subprocess.run(arg, check=check)
else:
run(arg, check=check, input=stdin)
subprocess.run(arg, check=check, input=stdin)
def nft(rules):
if verbose: print("\n".join("@ "+i for i in rules.split("\n")))
run(["nft", rules], check=not force)
subprocess.run(["nft", rules], check=not force)
parser = argparse.ArgumentParser()
......@@ -81,22 +80,28 @@ def get_spec(f):
f.spec = inspect.getfullargspec(f)
return f.spec
def cmd(f, ):
def cmd(f):
if f is None: return f
import inspect
spec = get_spec(f)
subcommands[f.__name__] = f
f.parser = subparsers.add_parser(f.__name__)
for i, arg in enumerate(spec.args):
annotation = spec.annotations.get(arg, None)
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
if has_default:
default = spec.defaults[i - len(spec.args) + len(spec.defaults)]
# print()
# print(f)
#fprint(spec)
def process_arg(name, has_default, default):
annotation = spec.annotations.get(name, None)
if annotation in [str, int, float]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=annotation,
)
if annotation in [list[str], list[int], list[float]]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=annotation.__args__[0],
action="append",
)
if annotation in [bool]:
if has_default and default is True:
f.parser.add_argument(
......@@ -110,10 +115,21 @@ def cmd(f, ):
"--"+arg,
action="store_true",
)
for i, arg in enumerate(spec.args):
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
default = None
if has_default:
default = spec.defaults[i - len(spec.args) + len(spec.defaults)]
process_arg(arg, has_default, default)
for i, arg in enumerate(spec.kwonlyargs):
default = spec.kwonlydefaults[arg]
process_arg(arg, True, default)
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
if annotation == tuple[str, ...]:
f.parser.add_argument(
arg,
type=str, nargs=argparse.REMAINDER,
......@@ -275,7 +291,7 @@ def give_to_user(vm: str, uid: int, gid: Optional[int] = None):
def state(ucred, vm: str):
vm = name_to_id(vm)
assert has_read_acces(ucred, vm)
p = run(["VBoxManage", "showvminfo", vm], capture_output=True, encoding='utf8')
p = subprocess.run(["VBoxManage", "showvminfo", vm], capture_output=True, encoding='utf8')
if "Could not find a registered machine named " in p.stderr:
return state_not_registered
for l in p.stdout.split('\n'):
......@@ -302,7 +318,7 @@ def create_from_img(ucred: Ucred, target: str, new_ssh: bool = True, target_name
os.mkdir(mount_dir)
r('mount', '-o', 'loop,offset=210763776', '--type', 'ext4', target_dir+'img', mount_dir)
try:
with open(mount_dir+"/etc/hostname", "w") as f: f.write(target_name)
with open(mount_dir+"/etc/hostname", "w") as f: f.write(target_name+"\n")
if new_ssh:
r("ssh-keygen", "-t", "ed25519", "-C", f"virtual for root,u@vm_{target}", "-f", target_dir+"id_ed25519", "-P", "")
for place in ["/root/.ssh/", "/home/u/.ssh/"]:
......@@ -405,7 +421,7 @@ def ssh_args(vm: str, *arg: tuple[str, ...], user: str = "u"):
@cmd
def ssh(vm: str, *arg: tuple[str, ...], user: str = "u"):
run(ssh_args(vm, *arg, user=user))
subprocess.run(ssh_args(vm, *arg, user=user))
sshfs_root = lambda: os.environ["HOME"]+f"/m/vm/"
......@@ -424,7 +440,7 @@ def sshfs(vm: str, user: str = None):
if os.path.isdir(mount_dir) and len(os.listdir(mount_dir)) != 0:
return
r("mkdir", "-p", mount_dir)
r("sshfs", f"{user}@{get_ip(vm)}:/", mount_dir, "-o", f"ssh_command=ssh -i {vm_dir(vm)}/id_ed25519 -o UserKnownHostsFile={vm_dir(vm)}/known_hosts -o HostKeyAlgorithms=ssh-ed25519 -o HostKeyAlias=vm_{vm}")
r("sshfs", "-o", "follow_symlinks", f"{user}@{get_ip(vm)}:/", mount_dir, "-o", f"ssh_command=ssh -i {vm_dir(vm)}/id_ed25519 -o UserKnownHostsFile={vm_dir(vm)}/known_hosts -o HostKeyAlgorithms=ssh-ed25519 -o HostKeyAlias=vm_{vm}")
if not os.path.islink(mount_dir+'~'):
home_dir = "/root" if user == "root" else f"/home/{user}"
r("ln", "-sr", mount_dir+home_dir, mount_dir+"~")
......@@ -440,25 +456,25 @@ def sshfs_clean():
r("rm", root+f)
def escape_sh(s):
return "'" + s.replace("'", "'\"'\"'") + "'"
def escape_sh(*arg):
return " ".join("'" + s.replace("'", "'\"'\"'") + "'" for s in arg)
@cmd
def vncapp(vm: str, cmd: str):
def vncapp(vm: str, cmd: str, user: str = "u"):
import random
import psutil
unit_id = random.randint(100000, 999999)
vm, user = extended_name(vm)
vm, user = extended_name(vm, user=user)
vm = name_to_id(vm)
display_id=random.randint(10, 50)
vnc_server = subprocess.Popen(ssh_args(vm, f"systemd-run --unit vncapp-vnc-{display_id}-{unit_id} --user -P bash -c '(cat /vnc_passwd;echo; cat /vnc_passwd; echo;echo n) | vncpasswd; vncserver :{display_id}'", user=user))
time.sleep(1)
vnc_client_env = os.environ.copy()
vnc_client_env["VNC_PASSWORD"] = open(vm_dir(vm)+"vnc_passwd", "r").read().strip()
app = subprocess.Popen(ssh_args(vm, f"systemd-run --unit vncapp-app-{display_id}-{unit_id} --user -P -E DISPLAY=:{display_id} bash -c {escape_sh(cmd)}", user=user));
time.sleep(1)
vnc_client = subprocess.Popen(["vncviewer", get_ip(vm)+f":{display_id}"], env=vnc_client_env)
def on_terminate(proc):
if verbose: print("KILLING ALL APPS")
if verbose: print(f"KILLING ALL APPS because {proc} terminated")
vnc_server.send_signal(15)
vnc_client.send_signal(15)
app.send_signal(15)
......@@ -467,11 +483,11 @@ def vncapp(vm: str, cmd: str):
ssh(vm, f"systemctl --user stop vncapp-vnc-{display_id}-{unit_id} vncapp-app-{display_id}-{unit_id}")
@cmd
def vncsession(vm: str, display_id: int =0):
def vncsession(vm: str, display_id: int =0, user: str = "u"):
import random
import psutil
unit_id = random.randint(100000, 999999)
vm, user = extended_name(vm)
vm, user = extended_name(vm, user=user)
vm = name_to_id(vm)
vnc_server = subprocess.Popen(ssh_args(vm, f"systemd-run --unit vncsession-{display_id}-{unit_id} --user -P bash -c '(cat /vnc_passwd;echo; cat /vnc_passwd; echo;echo n) | vncpasswd; vncserver :{display_id}'", user=user))
vnc_client_env = os.environ.copy()
......@@ -575,7 +591,7 @@ def create_net(ucred, vm: str):
if os.path.isfile(network_dir+"interface"):
interface = open(network_dir+"interface").read().strip()
else:
p = run(["VBoxManage", "hostonlyif", "create"], capture_output=True, encoding='utf8')
p = subprocess.run(["VBoxManage", "hostonlyif", "create"], capture_output=True, encoding='utf8')
if p.returncode:
print(p.stderr, file=sys.stderr)
raise RuntimeError()
......@@ -841,6 +857,45 @@ def exit_server(ucred):
remove_net(ucred, vm)
exit(0)
@cmd
def run(vm: str, prog: str, *arg: tuple[str, ...], gui: bool = False, out_file: list[str] = []):
arg = [prog, *arg]
import shutil
vm, user = extended_name(vm)
sshfs(vm, user=user)
mountdir = sshfs_mountdir(vm, user=user)
import tempfile
tmp_dir = tempfile.mkdtemp(prefix="run-", dir=mountdir+'~')
orig_file_to_copy = {}
used_files = set()
for i, it in enumerate(arg):
if len(it) and it[0] == '@':
if len(it) > 1 and it[1] == '@':
arg[i] = it[2:]
else:
orig_file = it[1:]
if orig_file not in orig_file_to_copy:
vm_file = orig_file.split("/")[-1]
assert vm_file not in used_files
used_files.add(vm_file)
orig_file_to_copy[orig_file] = vm_file
shutil.copy(orig_file, tmp_dir+"/"+vm_file)
arg[i] = vm_file
tmp_dir_name = tmp_dir.split('/')[-1]
if gui:
vncapp(vm, f"cd {tmp_dir_name}; {escape_sh(*arg)}", user=user)
else:
ssh(vm, "-t", f"cd {tmp_dir_name}; {escape_sh(*arg)}", user=user)
for it in out_file:
if ':' in it:
vm_file = it.split(':')[0]
host_file = it.split(':')[1]
else:
vm_file = it
host_file = it
shutil.copy(tmp_dir+"/"+vm_file, host_file)
##########################################################
......@@ -917,13 +972,25 @@ def main():
spec = get_spec(f)
f_kvarg = {}
f_arg = []
def process_arg(name, has_default, default):
if has_default:
if args.__dict__[name] is not None:
f_kvarg[name] = args.__dict__[name]
else:
f_arg.append(args.__dict__[name])
for i, arg in enumerate(spec.args):
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
default = None
if has_default:
if args.__dict__[arg] is not None:
f_kvarg[arg] = args.__dict__[arg]
else:
f_arg.append(args.__dict__[arg])
default = spec.defaults[i - len(spec.args) + len(spec.defaults)]
process_arg(arg, has_default, default)
for i, arg in enumerate(spec.kwonlyargs):
default = spec.kwonlydefaults[arg]
process_arg(arg, True, default)
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment