Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • jirikalvoda/vm
1 result
Select Git revision
  • master
1 result
Show changes
Commits on Source (2)
...@@ -239,13 +239,15 @@ class Identification: ...@@ -239,13 +239,15 @@ class Identification:
user: Optional[str] user: Optional[str]
display: Optional[str] display: Optional[str]
wayland_display: Optional[str] wayland_display: Optional[str]
sway_socket: Optional[str]
vnc_port: Optional[int] vnc_port: Optional[int]
def __init__(self, vm: str, user: Optional[str] = None, display: Optional[str] = None, wayland_display: Optional[str] = None, vnc_port: Optional[int] = None): def __init__(self, vm: str, user: Optional[str] = None, display: Optional[str] = None, wayland_display: Optional[str] = None, sway_socket: Optional[str] = None, vnc_port: Optional[int] = None):
self.vm = vm self.vm = vm
self.user = user self.user = user
self.display = display self.display = display
self.wayland_display = wayland_display self.wayland_display = wayland_display
self.sway_socket = sway_socket
self.vnc_port = vnc_port self.vnc_port = vnc_port
def __getitem__(self, key): def __getitem__(self, key):
...@@ -255,6 +257,14 @@ class Identification: ...@@ -255,6 +257,14 @@ class Identification:
out = self.vm out = self.vm
if self.user is not None: if self.user is not None:
out = f"{self.user}@{out}" out = f"{self.user}@{out}"
if self.vnc_port:
out += f":{self.vnc_port}"
if self.display:
out += f":[{self.display}]"
if self.wayland_display:
out += f":wayland[{self.wayland_display}]"
if self.sway_socket:
out += f":sway[{self.sway_socket}]"
return out return out
def set_default_user(self, user: str = "u"): def set_default_user(self, user: str = "u"):
...@@ -346,9 +356,12 @@ def has_write_acces(ucred, vm: str): ...@@ -346,9 +356,12 @@ def has_write_acces(ucred, vm: str):
######################## ########################
@internal_cmd @internal_cmd
def get_ip(vm: str) -> str: def get_ip(vm: str) -> Optional[str]:
network_dir = vm_dir(vm)+"network/" network_dir = vm_dir(vm)+"network/"
try:
net_id = open(network_dir+"net_id").read().strip() net_id = open(network_dir+"net_id").read().strip()
except FileNotFoundError:
return None
return f'{net_prefix}.{net_id}.150' return f'{net_prefix}.{net_id}.150'
@internal_cmd @internal_cmd
...@@ -493,8 +506,8 @@ def kill(ucred, vm: str): ...@@ -493,8 +506,8 @@ def kill(ucred, vm: str):
@cmd @cmd
@daemon() @daemon()
def pause(ucred, vm: str): def pause(ucred, vm: Identification):
vm = name_to_id(vm) vm, user = ident
assert has_write_acces(ucred, vm) assert has_write_acces(ucred, vm)
if backend == backend_vbox: if backend == backend_vbox:
...@@ -505,8 +518,8 @@ def pause(ucred, vm: str): ...@@ -505,8 +518,8 @@ def pause(ucred, vm: str):
raise NotImplementedError() raise NotImplementedError()
@cmd @cmd
@daemon() @daemon()
def resume(ucred, vm: str): def resume(ucred, ident: Identification):
vm = name_to_id(vm) vm, user = ident
assert has_write_acces(ucred, vm) assert has_write_acces(ucred, vm)
if backend == backend_vbox: if backend == backend_vbox:
...@@ -729,10 +742,11 @@ def create_net(ucred, vm: str): ...@@ -729,10 +742,11 @@ def create_net(ucred, vm: str):
add chain inet filter input_from_{interface} add chain inet filter input_from_{interface}
add chain inet filter forward_from_{interface} add chain inet filter forward_from_{interface}
add chain inet filter forward_to_{interface} add chain inet filter forward_to_{interface}
add chain inet filter forward_from_{interface}
insert rule inet filter input iifname {interface} jump input_from_{interface} insert rule inet filter input_from_vm iifname {interface} jump input_from_{interface}
insert rule inet filter forward iifname {interface} jump forward_from_{interface} insert rule inet filter forward_from_vm iifname {interface} jump forward_from_{interface}
insert rule inet filter forward oifname {interface} jump forward_to_{interface} insert rule inet filter forward_to_vm oifname {interface} jump forward_to_{interface}
""") """)
modify_net(ucred, vm) modify_net(ucred, vm)
#nft("add rule inet filter forward iifname wlp1s0 accept") #nft("add rule inet filter forward iifname wlp1s0 accept")
...@@ -759,6 +773,7 @@ def create_net(ucred, vm: str): ...@@ -759,6 +773,7 @@ def create_net(ucred, vm: str):
f.write(S-f""" f.write(S-f"""
#!/bin/sh #!/bin/sh
ifconfig {interface} {net_prefix}.{net_id}.1 netmask 255.255.255.0 up ifconfig {interface} {net_prefix}.{net_id}.1 netmask 255.255.255.0 up
ip route add {net_prefix}.{net_id}.0/24 dev {interface} table 38
dhcpd -4 -cf network/dhcp.config -pf network/dhcp.pid -lf network/dhcp.lp {interface} dhcpd -4 -cf network/dhcp.config -pf network/dhcp.pid -lf network/dhcp.lp {interface}
""") """)
with open(network_dir+"down.sh", "w") as f: with open(network_dir+"down.sh", "w") as f:
...@@ -769,7 +784,7 @@ def create_net(ucred, vm: str): ...@@ -769,7 +784,7 @@ def create_net(ucred, vm: str):
@internal_cmd @internal_cmd
@daemon() @daemon()
def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool = False, pc_all: bool = False): def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool = False, pc_all: bool = False, route_table: int = 6, route_blackhole: bool = True):
vm = name_to_id(vm) vm = name_to_id(vm)
assert has_write_acces(ucred, vm) assert has_write_acces(ucred, vm)
assert not (pc_all and not pc) assert not (pc_all and not pc)
...@@ -783,7 +798,9 @@ def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool = ...@@ -783,7 +798,9 @@ def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool =
pass pass
todo = [f"flush chain inet filter input_from_{interface}", todo = [f"flush chain inet filter input_from_{interface}",
f"flush chain inet filter forward_from_{interface}", f"flush chain inet filter forward_from_{interface}",
f"flush chain inet filter forward_to_{interface}"] f"flush chain inet filter forward_to_{interface}",
# f"flush chain inet route forward_from_{interface}",
]
todo.append(f"add rule inet filter input_from_{interface} ct state {{ established, related }} accept") todo.append(f"add rule inet filter input_from_{interface} ct state {{ established, related }} accept")
if not pc: if not pc:
todo.append(f"add rule inet filter input_from_{interface} drop") todo.append(f"add rule inet filter input_from_{interface} drop")
...@@ -806,21 +823,47 @@ def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool = ...@@ -806,21 +823,47 @@ def modify_net(ucred, vm: str, wan: bool = False, lan: bool = False, pc: bool =
todo.append(f"add rule inet filter forward_from_{interface} drop") todo.append(f"add rule inet filter forward_from_{interface} drop")
todo.append(f"add rule inet filter forward_to_{interface} drop") todo.append(f"add rule inet filter forward_to_{interface} drop")
# todo.append(f"add rule inet route forward_from_{interface} meta mark set {fwmark}")
nft("\n".join(todo)) nft("\n".join(todo))
r("ip-man", "replace_rule", "--iif", interface, *[ "--blackhole" for _ in [1] if route_blackhole], "--priority_base", "100", str(route_table))
######################## ########################
# Using vm # # Using vm #
######################## ########################
def ssh_args(ident: Identification, *arg: tuple[str, ...]): def ssh_args(ident: Identification, *arg_in: tuple[str, ...]):
ident.set_default_user() ident.set_default_user()
vm, user = ident vm, user = ident
vm = name_to_id(vm) vm = name_to_id(vm)
set_env = []
if ident.display:
set_env.append(f"export DISPLAY={ident.display};")
if ident.wayland_display:
set_env.append(f"export WAYLAND_DISPLAY={ident.wayland_display};")
if ident.sway_socket:
set_env.append(f"export SWAYSOCK={ident.sway_socket};")
arg = ['ssh', "-i", vm_dir(vm)+"id_ed25519", arg = ['ssh', "-i", vm_dir(vm)+"id_ed25519",
"-o", f"UserKnownHostsFile={vm_dir(vm)}/known_hosts", "-o", "HostKeyAlgorithms=ssh-ed25519", "-o", f"HostKeyAlias=vm_{vm}", "-o", f"UserKnownHostsFile={vm_dir(vm)}/known_hosts", "-o", "HostKeyAlgorithms=ssh-ed25519", "-o", f"HostKeyAlias=vm_{vm}",
f"{user}@{get_ip(vm)}", *arg] f"{user}@{get_ip(vm)}"]
while len(arg_in) and arg_in[0].startswith("-"):
tmp, *arg_in = arg_in
if tmp == '--':
break
arg.append(tmp)
if len(arg_in):
arg += ["--"] + set_env + list(arg_in)
else:
if set_env:
arg += ["-t", "--"] + set_env + ["bash"]
if verbose: print(">>", " ".join(arg)) if verbose: print(">>", " ".join(arg))
return arg return arg
...@@ -828,20 +871,24 @@ def ssh_args(ident: Identification, *arg: tuple[str, ...]): ...@@ -828,20 +871,24 @@ def ssh_args(ident: Identification, *arg: tuple[str, ...]):
def ssh(ident: Identification, *arg: tuple[str, ...]): def ssh(ident: Identification, *arg: tuple[str, ...]):
subprocess.run(ssh_args(ident, *arg)) subprocess.run(ssh_args(ident, *arg))
@cmd
def terminal_ssh(ident: Identification, *arg: tuple[str, ...], terminal: str = "alacritty"):
subprocess.run([terminal, "-e"] + ssh_args(ident, *arg))
sshfs_root = lambda: os.environ["HOME"]+f"/m/vm/" sshfs_root = lambda: os.environ["HOME"]+f"/m/vm/"
@internal_cmd @internal_cmd
def sshfs_mountdir(ident: Identification): def sshfs_mountdir(ident: Identification):
return sshfs_root()+f"/{ident.user}@{name(ident.vm)}" return sshfs_root()+f"/{ident.user or 'u'}@{name(ident.vm)}"
@cmd @cmd
def sshfs(vm: str): def sshfs(ident: Identification):
vm, user = ident vm, user = ident
if user is None: if user is None:
sshfs(Identification(vm, "root")) sshfs(Identification(vm, "root"))
sshfs(Identification(vm, "u")) sshfs(Identification(vm, "u"))
return return
mount_dir = sshfs_mountdir(vm, user) mount_dir = sshfs_mountdir(ident)
if os.path.isdir(mount_dir) and len(os.listdir(mount_dir)) != 0: if os.path.isdir(mount_dir) and len(os.listdir(mount_dir)) != 0:
return return
r("mkdir", "-p", mount_dir) r("mkdir", "-p", mount_dir)
...@@ -866,6 +913,7 @@ def get_vnc_client_env(ident): ...@@ -866,6 +913,7 @@ def get_vnc_client_env(ident):
vnc_client_env["VNC_PASSWORD"] = open(vm_dir(ident.vm)+"vnc_passwd", "r").read().strip() vnc_client_env["VNC_PASSWORD"] = open(vm_dir(ident.vm)+"vnc_passwd", "r").read().strip()
vnc_client_env["VNC_USERNAME"] = ident.user vnc_client_env["VNC_USERNAME"] = ident.user
vnc_client_env["VM_IDENT"] = str(ident) vnc_client_env["VM_IDENT"] = str(ident)
print(vnc_client_env)
return vnc_client_env return vnc_client_env
vncviewer_args = ["-FullscreenSystemKeys=0", "-AcceptClipboard=0", "-SendClipboard=0"] vncviewer_args = ["-FullscreenSystemKeys=0", "-AcceptClipboard=0", "-SendClipboard=0"]
...@@ -901,7 +949,7 @@ def vncapp(ident: Identification, cmd: str, wayland: bool = False): ...@@ -901,7 +949,7 @@ def vncapp(ident: Identification, cmd: str, wayland: bool = False):
unit_id = random.randint(100000, 999999) unit_id = random.randint(100000, 999999)
vnc_server, ident = start_vnc_server(ident, f"vncapp-vnc-{unit_id}", wayland=wayland) vnc_server, ident = start_vnc_server(ident, f"vncapp-vnc-{unit_id}", wayland=wayland)
time.sleep(1) time.sleep(1)
app = subprocess.Popen(ssh_args(ident, f"systemd-run --unit vncapp-app-{unit_id} --user -P -E DISPLAY={ident.display} -E WAYLAND_DISPLAY={ident.wayland_display} bash -c {escape_sh(cmd)}")); app = subprocess.Popen(ssh_args(ident, f"{'swaymsg' if wayland else 'i3-msg'} bar mode invisible; systemd-run --unit vncapp-app-{unit_id} --user -P -E DISPLAY={ident.display} -E WAYLAND_DISPLAY={ident.wayland_display} bash -c {escape_sh(cmd)}"));
vnc_client = start_vnc_client(ident) vnc_client = start_vnc_client(ident)
def on_terminate(proc): def on_terminate(proc):
...@@ -917,11 +965,12 @@ def vncapp(ident: Identification, cmd: str, wayland: bool = False): ...@@ -917,11 +965,12 @@ def vncapp(ident: Identification, cmd: str, wayland: bool = False):
# WAYLAND_DISPLAY=wayland-1 SWAYSOCK=/run/user/1000/sway-ipc.1000.6544.sock swaymsg output HEADLESS-1 pos 0 0 res 1920x1080 # WAYLAND_DISPLAY=wayland-1 SWAYSOCK=/run/user/1000/sway-ipc.1000.6544.sock swaymsg output HEADLESS-1 pos 0 0 res 1920x1080
@cmd @cmd
def waydroid(vm: str, *apps: tuple[str, ...]): def waydroid(ident: Identification, *apps: tuple[str, ...]):
if len(apps): if len(apps):
return vncapp(vm, f"(sleep 14; waydroid app launch {apps[0]}) & waydroid show-full-ui", wayland=True) return vncapp(ident, f"bin/waydroid-run {apps[0]}", wayland=True)
# return vncapp(vm, f"(sleep 14; waydroid app launch {apps[0]}) & waydroid show-full-ui", wayland=True)
else: else:
return vncapp(vm, f"waydroid show-full-ui", wayland=True) return vncapp(ident, f"bin/waydroid-run", wayland=True)
@cmd @cmd
def vncsession(ident: Identification, wayland: bool = False): def vncsession(ident: Identification, wayland: bool = False):
...@@ -940,6 +989,14 @@ def vncsession(ident: Identification, wayland: bool = False): ...@@ -940,6 +989,14 @@ def vncsession(ident: Identification, wayland: bool = False):
psutil.wait_procs([vnc_client, vnc_server], callback=on_terminate) psutil.wait_procs([vnc_client, vnc_server], callback=on_terminate)
ssh(ident, f"systemctl --user stop vncsession-{unit_id}") ssh(ident, f"systemctl --user stop vncsession-{unit_id}")
@cmd
def resize_wayland(ident: Identification, x: int = None, y: int = None):
if x is None or y is None:
win_place = dict(v.split('=') for v in subprocess.check_output(["xdotool", "getactivewindow", "getwindowgeometry", "--shell"]).decode("utf-8").split('\n') if v)
if x is None: x = int(win_place["WIDTH"]) - 2
if y is None: y = int(win_place["HEIGHT"]) - 2
ssh(ident, f"swaymsg output HEADLESS-1 pos 0 0 res {x}x{y}; pgrep waydroid-run | while read p; do echo reload > /proc/$p/fd/0; done");
@daemon() @daemon()
def chown_qemu_vnc_sock(ucred, vm): def chown_qemu_vnc_sock(ucred, vm):
vm = name_to_id(vm) vm = name_to_id(vm)
...@@ -957,7 +1014,7 @@ def str_remove_prefix(s, prefix): ...@@ -957,7 +1014,7 @@ def str_remove_prefix(s, prefix):
return s[len(prefix):] return s[len(prefix):]
@internal_cmd @internal_cmd
def get_vm_by_window(win_id: int = None): def get_ident_by_window(win_id: int = None):
import psutil import psutil
if win_id is None: if win_id is None:
win_id = int(subprocess.check_output(["xdotool", "getactivewindow"]).decode("utf-8")) win_id = int(subprocess.check_output(["xdotool", "getactivewindow"]).decode("utf-8"))
...@@ -966,10 +1023,9 @@ def get_vm_by_window(win_id: int = None): ...@@ -966,10 +1023,9 @@ def get_vm_by_window(win_id: int = None):
if '"Alacritty"' in win_class: if '"Alacritty"' in win_class:
pass pass
else: else:
process = psutil.Process(pid=os.getpid()) process = psutil.Process(pid=pid)
process_env: Dict = process.environ() process_env: Dict = process.environ()
print(process_env) return extended_name(process_env["VM_IDENT"])
return win_class
######################## ########################
...@@ -977,7 +1033,6 @@ def get_vm_by_window(win_id: int = None): ...@@ -977,7 +1033,6 @@ def get_vm_by_window(win_id: int = None):
######################## ########################
def terminal_len(val: str) -> int: def terminal_len(val: str) -> int:
# TODO
return len(val) return len(val)
def format_table(table): def format_table(table):
...@@ -1002,7 +1057,7 @@ def index(color: bool = True): ...@@ -1002,7 +1057,7 @@ def index(color: bool = True):
else: else:
out_state += " (NO PING)" out_state += " (NO PING)"
out_rw = ('w' if has_write_acces(vm) else 'r') if has_read_acces(vm) else '-' out_rw = ('w' if has_write_acces(vm) else 'r') if has_read_acces(vm) else '-'
out.append([vm, out_rw, name(vm), out_state, get_permanency(vm)]) out.append([vm, out_rw, name(vm), out_state, get_permanency(vm), get_ip(vm) or "None"])
return format_table(out) return format_table(out)
...@@ -1065,7 +1120,7 @@ if is_daemon: ...@@ -1065,7 +1120,7 @@ if is_daemon:
@internal_cmd @internal_cmd
@daemon() @daemon()
def get_prepared_fork(ucred, base: str = "base") -> Optional[str]: def get_prepared_fork(ucred, base: str = "base") -> Optional[str]:
name_to_id(base) base = name_to_id(base)
assert has_read_acces(ucred, base) assert has_read_acces(ucred, base)
if base in prepared_forks and len(prepared_forks[base]): if base in prepared_forks and len(prepared_forks[base]):
pf = prepared_forks[base][0] pf = prepared_forks[base][0]
...@@ -1079,6 +1134,7 @@ def get_prepared_fork(ucred, base: str = "base") -> Optional[str]: ...@@ -1079,6 +1134,7 @@ def get_prepared_fork(ucred, base: str = "base") -> Optional[str]:
@cmd @cmd
@daemon(root_only=True) @daemon(root_only=True)
def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]: def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]:
base = name_to_id(base)
if not base in prepared_forks: if not base in prepared_forks:
prepared_forks[base] = [] prepared_forks[base] = []
if len(prepared_forks[base]) < count: if len(prepared_forks[base]) < count:
...@@ -1093,6 +1149,7 @@ def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]: ...@@ -1093,6 +1149,7 @@ def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]:
@cmd @cmd
@daemon(root_only=True) @daemon(root_only=True)
def pause_prepared_forks(ucred, base: str = "base", runtime: int = 30): def pause_prepared_forks(ucred, base: str = "base", runtime: int = 30):
base = name_to_id(base)
for pf in prepared_forks[base]: for pf in prepared_forks[base]:
if not pf.is_paused and pf.start_monotonic_time + runtime <= time.monotonic(): if not pf.is_paused and pf.start_monotonic_time + runtime <= time.monotonic():
pause(ucred, pf.vm) pause(ucred, pf.vm)
...@@ -1107,45 +1164,101 @@ def get_tmp_vm(base: str = "base"): ...@@ -1107,45 +1164,101 @@ def get_tmp_vm(base: str = "base"):
start(target) start(target)
return target return target
def multisplit_toplevel(s, *separators, brackets={'[':']'}):
out = [(None, None)]
stack = []
i = 0
begin = 0
while i < len(s):
if s[i] in brackets:
stack.append(s[i])
if s[i] in brackets.values():
assert s[i] == brackets[stack[-1]]
stack.pop()
elif not stack:
for separator in separators:
if s[i:].startswith(separator):
out[-1] = (out[-1][0], s[begin: i])
out.append((separator, None))
begin = i+1
i += 1
out[-1] = (out[-1][0], s[begin: i])
assert not stack
return out
def split_toplevel(s, separator):
return [v for sep, v in multisplit_toplevel(s, separator)]
def valid_bracket(s, brackets={'[':']'}):
stack = []
for c in s:
if c in brackets:
stack.append(c)
if c in brackets.values():
if c != brackets[stack[-1]]: return False
stack.pop()
return not stack
def is_in_bracket(s, left='[', right=']'):
return s[0] == left and s[-1] == right and valid_bracket(s[1:-1], {left: right})
@cmd @cmd
def extended_name(name: str) -> tuple[str, str]: def extended_name(name: str) -> tuple[str, str]:
assert not is_daemon assert not is_daemon
vm = name vm = name
user = None user = None
if len(vm.split("@"))==2: if len(split_toplevel(vm, "@"))==2:
user, vm = vm.split("@") user, vm = vm.split("@")
do_power_on = False do_power_on = False
do_power_on_display= False do_power_on_display= False
net_options = None net_options = None
permanency = None permanency = None
if len(vm.split("$"))==2: if len(multisplit_toplevel(vm, "!", "$"))==2:
vm, tmp = vm.split("$") (_, vm), (mark, tmp) = multisplit_toplevel(vm, "!", "$")
do_power_on = True
do_power_on_display = True
assert tmp == "" assert tmp == ""
if len(vm.split("!"))==2:
vm, tmp = vm.split("!")
do_power_on = True do_power_on = True
assert tmp == "" do_power_on_display = mark = '$'
if len(vm.split("~"))==2:
vm, net_options = vm.split("~")
if len(vm.split("^"))==2: (_, vm), *modifires = multisplit_toplevel(vm, "~", "^", ":")
vm, permanency = vm.split("^")
if len(vm.split("+"))==2: if is_in_bracket(vm, '[', ']'):
base, vm = vm.split("+") ident = get_ident_by_window(int(vm[1:-1]) if len(vm) >= 3 else None)
vm = ident.vm
if user is None: user = ident.user
elif len(split_toplevel(vm, "+"))==2:
base, vm = split_toplevel(vm, "+")
base_ident = extended_name(base or "base")
if not vm: if not vm:
vm = get_tmp_vm(base or "base") vm = get_tmp_vm(base_ident.vm)
else: else:
vm = clone(vm, base or "base") vm = clone(vm, base_ident.vm)
start(vm) start(vm)
wait_started(vm) wait_started(vm)
ident = Identification(vm)
else:
vm = name_to_id(vm) vm = name_to_id(vm)
ident = Identification(vm)
for key, val in modifires:
if key == '~':
assert net_options is None
net_options = val
if key == '^':
assert permanency is None
permanency = val
if key == ':':
if val.isnumeric():
ident.vnc_port = int(val)
elif val.startswith("[") and val.endswith("]"):
ident.display = val[len("["):-len("]")]
elif val.startswith("wayland[") and val.endswith("]"):
ident.wayland_display = val[len("wayland["):-len("]")]
elif val.startswith("sway[") and val.endswith("]"):
ident.sway_socket = val[len("sway["):-len("]")]
else: assert False
if do_power_on: if do_power_on:
if state(vm) != state_running: if state(vm) != state_running:
...@@ -1154,10 +1267,19 @@ def extended_name(name: str) -> tuple[str, str]: ...@@ -1154,10 +1267,19 @@ def extended_name(name: str) -> tuple[str, str]:
else: else:
resume(vm) resume(vm)
if net_options is not None: if net_options is not None:
modify_net(vm, wan="w" in net_options, lan="l" in net_options, pc="p" in net_options or "P" in net_options, pc_all="P" in net_options) if any(ch.isnumeric() for ch in net_options):
route_table_ind = min(i for i, ch in enumerate(net_options) if ch.isnumeric())
route_table = int(net_options[route_table_ind:])
net_options = net_options[:route_table_ind]
else:
route_table = 6
modify_net(vm, wan="w" in net_options, lan="l" in net_options, pc="p" in net_options or "P" in net_options, pc_all="P" in net_options, route_table=route_table, route_blackhole="B" not in net_options)
if permanency is not None: if permanency is not None:
set_permanency(vm, permanency or "stable") set_permanency(vm, permanency or "stable")
return Identification(vm, user)
ident.vm = vm
ident.user = user
return ident
@cmd @cmd
def eval(ident: Identification): def eval(ident: Identification):
......