|
1 |
| -import fcntl |
2 | 1 | import os
|
3 |
| -import pty |
4 | 2 | import re
|
5 | 3 | import select
|
6 | 4 | import struct
|
7 | 5 | import subprocess
|
8 | 6 | import sys
|
9 |
| -import termios |
10 | 7 | import time
|
11 | 8 | import typing as t
|
12 | 9 | from pathlib import Path
|
13 | 10 | import pytest
|
14 | 11 | from functools import cached_property
|
15 | 12 | import re
|
16 | 13 | import subprocess
|
| 14 | +import platform |
17 | 15 |
|
18 | 16 | from shellingham import detect_shell
|
19 | 17 |
|
@@ -67,11 +65,9 @@ class _DefaultCompleteTestCase(with_typehint(TestCase)):
|
67 | 65 | manage_script = "manage.py"
|
68 | 66 | launch_script = "./manage.py"
|
69 | 67 |
|
70 |
| - @property |
71 |
| - def interactive_opt(self): |
72 |
| - # currently all supported shells support -i for interactive mode |
73 |
| - # this includes zsh, bash, fish and powershell |
74 |
| - return "-i" |
| 68 | + interactive_opt: t.Optional[str] = None |
| 69 | + |
| 70 | + environment: t.List[str] = [] |
75 | 71 |
|
76 | 72 | @cached_property
|
77 | 73 | def command(self) -> ShellCompletion:
|
@@ -116,75 +112,109 @@ def remove(self, script=None):
|
116 | 112 | self.get_completions("ping") # just to reinit shell
|
117 | 113 | self.verify_remove(script=script)
|
118 | 114 |
|
119 |
| - def set_environment(self, fd): |
120 |
| - os.write(fd, f"PATH={Path(sys.executable).parent}:$PATH\n".encode()) |
121 |
| - os.write( |
122 |
| - fd, |
123 |
| - f"DJANGO_SETTINGS_MODULE=tests.settings.completion\n".encode(), |
124 |
| - ) |
| 115 | + if platform.system() == "Windows": |
125 | 116 |
|
126 |
| - def get_completions(self, *cmds: str, scrub_output=True) -> str: |
127 |
| - def read(fd): |
128 |
| - """Function to read from a file descriptor.""" |
129 |
| - return os.read(fd, 1024 * 1024).decode() |
130 |
| - |
131 |
| - # Create a pseudo-terminal |
132 |
| - master_fd, slave_fd = pty.openpty() |
133 |
| - |
134 |
| - # Define window size - width and height |
135 |
| - os.set_blocking(slave_fd, False) |
136 |
| - win_size = struct.pack("HHHH", 24, 80, 0, 0) # 24 rows, 80 columns |
137 |
| - fcntl.ioctl(slave_fd, termios.TIOCSWINSZ, win_size) |
138 |
| - |
139 |
| - # env = os.environ.copy() |
140 |
| - # env["TERM"] = "xterm-256color" |
141 |
| - |
142 |
| - # Spawn a new shell process |
143 |
| - shell = self.shell or detect_shell()[0] |
144 |
| - process = subprocess.Popen( |
145 |
| - [shell, *([self.interactive_opt] if self.interactive_opt else [])], |
146 |
| - stdin=slave_fd, |
147 |
| - stdout=slave_fd, |
148 |
| - stderr=slave_fd, |
149 |
| - text=True, |
150 |
| - # env=env, |
151 |
| - # preexec_fn=os.setsid, |
152 |
| - ) |
153 |
| - # Wait for the shell to start and get to the prompt |
154 |
| - print(read(master_fd)) |
| 117 | + def get_completions(self, *cmds: str, scrub_output=True) -> str: |
| 118 | + import winpty |
| 119 | + |
| 120 | + assert self.shell |
| 121 | + |
| 122 | + pty = winpty.PTY(24, 80) |
| 123 | + |
| 124 | + def read_all() -> str: |
| 125 | + output = "" |
| 126 | + while data := pty.read(): |
| 127 | + output += data |
| 128 | + time.sleep(0.1) |
| 129 | + return output |
| 130 | + |
| 131 | + # Start the subprocess |
| 132 | + pty.spawn( |
| 133 | + self.shell, *([self.interactive_opt] if self.interactive_opt else []) |
| 134 | + ) |
| 135 | + |
| 136 | + # Wait for the shell to start and get to the prompt |
| 137 | + time.sleep(3) |
| 138 | + read_all() |
| 139 | + |
| 140 | + for line in self.environment: |
| 141 | + pty.write(line) |
| 142 | + |
| 143 | + time.sleep(2) |
| 144 | + output = read_all() + read_all() |
| 145 | + |
| 146 | + pty.write(" ".join(cmds)) |
| 147 | + time.sleep(0.1) |
| 148 | + pty.write("\t") |
| 149 | + |
| 150 | + time.sleep(2) |
| 151 | + completion = read_all() + read_all() |
155 | 152 |
|
156 |
| - self.set_environment(master_fd) |
| 153 | + return scrub(completion) if scrub_output else completion |
157 | 154 |
|
158 |
| - print(read(master_fd)) |
159 |
| - # Send a command with a tab character for completion |
| 155 | + else: |
160 | 156 |
|
161 |
| - cmd = " ".join(cmds) |
162 |
| - os.write(master_fd, cmd.encode()) |
163 |
| - time.sleep(0.25) |
| 157 | + def get_completions(self, *cmds: str, scrub_output=True) -> str: |
| 158 | + import fcntl |
| 159 | + import termios |
| 160 | + import pty |
164 | 161 |
|
165 |
| - print(f'"{cmd}"') |
166 |
| - os.write(master_fd, b"\t\t\t") |
| 162 | + def read(fd): |
| 163 | + """Function to read from a file descriptor.""" |
| 164 | + return os.read(fd, 1024 * 1024).decode() |
167 | 165 |
|
168 |
| - time.sleep(0.25) |
| 166 | + # Create a pseudo-terminal |
| 167 | + master_fd, slave_fd = pty.openpty() |
169 | 168 |
|
170 |
| - # Read the output |
171 |
| - output = read_all_from_fd_with_timeout(master_fd) |
| 169 | + # Define window size - width and height |
| 170 | + os.set_blocking(slave_fd, False) |
| 171 | + win_size = struct.pack("HHHH", 24, 80, 0, 0) # 24 rows, 80 columns |
| 172 | + fcntl.ioctl(slave_fd, termios.TIOCSWINSZ, win_size) |
172 | 173 |
|
173 |
| - # todo - avoid large output because this can mess things up |
174 |
| - if "do you wish" in output or "Display all" in output: |
175 |
| - os.write(master_fd, b"y\n") |
| 174 | + # env = os.environ.copy() |
| 175 | + # env["TERM"] = "xterm-256color" |
| 176 | + |
| 177 | + # Spawn a new shell process |
| 178 | + shell = self.shell or detect_shell()[0] |
| 179 | + process = subprocess.Popen( |
| 180 | + [shell, *([self.interactive_opt] if self.interactive_opt else [])], |
| 181 | + stdin=slave_fd, |
| 182 | + stdout=slave_fd, |
| 183 | + stderr=slave_fd, |
| 184 | + text=True, |
| 185 | + # env=env, |
| 186 | + # preexec_fn=os.setsid, |
| 187 | + ) |
| 188 | + # Wait for the shell to start and get to the prompt |
| 189 | + print(read(master_fd)) |
| 190 | + |
| 191 | + for line in self.environment: |
| 192 | + os.write(master_fd, line.encode()) |
| 193 | + |
| 194 | + print(read(master_fd)) |
| 195 | + # Send a command with a tab character for completion |
| 196 | + |
| 197 | + cmd = " ".join(cmds) |
| 198 | + os.write(master_fd, cmd.encode()) |
176 | 199 | time.sleep(0.25)
|
| 200 | + |
| 201 | + print(f'"{cmd}"') |
| 202 | + os.write(master_fd, b"\t\t\t") |
| 203 | + |
| 204 | + time.sleep(0.25) |
| 205 | + |
| 206 | + # Read the output |
177 | 207 | output = read_all_from_fd_with_timeout(master_fd)
|
178 | 208 |
|
179 |
| - # Clean up |
180 |
| - os.close(slave_fd) |
181 |
| - os.close(master_fd) |
182 |
| - process.terminate() |
183 |
| - process.wait() |
184 |
| - # remove bell character which can show up in some terminals where we hit tab |
185 |
| - if scrub_output: |
186 |
| - return scrub(output) |
187 |
| - return output |
| 209 | + # Clean up |
| 210 | + os.close(slave_fd) |
| 211 | + os.close(master_fd) |
| 212 | + process.terminate() |
| 213 | + process.wait() |
| 214 | + # remove bell character which can show up in some terminals where we hit tab |
| 215 | + if scrub_output: |
| 216 | + return scrub(output) |
| 217 | + return output |
188 | 218 |
|
189 | 219 | def run_app_completion(self):
|
190 | 220 | completions = self.get_completions(self.launch_script, "completion", " ")
|
@@ -323,18 +353,24 @@ class _InstalledScriptTestCase(_DefaultCompleteTestCase):
|
323 | 353 | manage_script = "django_manage"
|
324 | 354 | launch_script = "django_manage"
|
325 | 355 |
|
326 |
| - def setUp(self): |
| 356 | + @classmethod |
| 357 | + def setUpClass(cls): |
327 | 358 | lines = []
|
328 |
| - with open(self.MANAGE_SCRIPT_TMPL, "r") as f: |
| 359 | + with open(cls.MANAGE_SCRIPT_TMPL, "r") as f: |
329 | 360 | for line in f.readlines():
|
330 | 361 | if line.startswith("#!{{shebang}}"):
|
331 | 362 | line = f"#!{sys.executable}\n"
|
332 | 363 | lines.append(line)
|
333 |
| - exe = Path(sys.executable).parent / self.manage_script |
| 364 | + exe = Path(sys.executable).parent / cls.manage_script |
334 | 365 | with open(exe, "w") as f:
|
335 | 366 | for line in lines:
|
336 | 367 | f.write(line)
|
337 | 368 |
|
338 | 369 | # make the script executable
|
339 | 370 | os.chmod(exe, os.stat(exe).st_mode | 0o111)
|
340 |
| - super().setUp() |
| 371 | + |
| 372 | + if platform.system() == "Windows": |
| 373 | + with open(exe.with_suffix(".cmd"), "w") as f: |
| 374 | + f.write(f'@echo off{os.linesep}"{sys.executable}" "%~dp0{exe.name}" %*') |
| 375 | + os.chmod(exe, os.stat(exe.with_suffix(".cmd")).st_mode | 0o111) |
| 376 | + super().setUpClass() |
0 commit comments