Commit 5f7339cd authored by Roman Alifanov's avatar Roman Alifanov

Fix coproc fd race condition on 32-bit systems

Duplicate coproc fds with exec instead of copying fd numbers. Bash auto-closes coproc fds when the process exits, causing race conditions on slower systems. Save PID before coproc cleanup and close original write fd after dup to prevent hangs.
parent 486b06d0
...@@ -799,8 +799,10 @@ class DispatchMixin: ...@@ -799,8 +799,10 @@ class DispatchMixin:
cmd_str = self.generate_expr(inner) cmd_str = self.generate_expr(inner)
self.emit(f'coproc {cp_name} {{ {cmd_str}; }}') self.emit(f'coproc {cp_name} {{ {cmd_str}; }}')
self.emit(f'{cp_name}_wr=${{{cp_name}[1]}}') self.emit(f'exec {{{cp_name}_wr}}>&${{{cp_name}[1]}}')
self.emit(f'{cp_name}_rd=${{{cp_name}[0]}}') self.emit(f'exec {{{cp_name}_rd}}<&${{{cp_name}[0]}}')
self.emit(f'{cp_name}_pid=${cp_name}_PID')
self.emit(f'eval "exec ${{{cp_name}[1]}}>&-"')
self.process_handle_vars.add(target) self.process_handle_vars.add(target)
self.process_handle_map[target] = cp_name self.process_handle_map[target] = cp_name
...@@ -818,9 +820,9 @@ class DispatchMixin: ...@@ -818,9 +820,9 @@ class DispatchMixin:
elif method == "close": elif method == "close":
self.emit(f'exec {{{cp}_wr}}>&-') self.emit(f'exec {{{cp}_wr}}>&-')
elif method == "kill": elif method == "kill":
self.emit(f'kill ${cp}_PID 2>/dev/null || true') self.emit(f'kill ${cp}_pid 2>/dev/null || true')
elif method == "wait": elif method == "wait":
self.emit(f'wait ${cp}_PID 2>/dev/null || true') self.emit(f'wait ${cp}_pid 2>/dev/null || true')
def _generate_pipe_assignment(self, stmt: Assignment, target: str): def _generate_pipe_assignment(self, stmt: Assignment, target: str):
"""Generate pipe expression assignment.""" """Generate pipe expression assignment."""
...@@ -1298,9 +1300,9 @@ class DispatchMixin: ...@@ -1298,9 +1300,9 @@ class DispatchMixin:
elif method == "close": elif method == "close":
return f'exec {{{cp}_wr}}>&-' return f'exec {{{cp}_wr}}>&-'
elif method == "kill": elif method == "kill":
return f'kill ${cp}_PID 2>/dev/null || true' return f'kill ${cp}_pid 2>/dev/null || true'
elif method == "wait": elif method == "wait":
return f'wait ${cp}_PID 2>/dev/null || true' return f'wait ${cp}_pid 2>/dev/null || true'
else: else:
self._validate_type_method("process_handle", method, location) self._validate_type_method("process_handle", method, location)
......
...@@ -117,7 +117,7 @@ class ExprMixin: ...@@ -117,7 +117,7 @@ class ExprMixin:
ph_map = getattr(self, 'process_handle_map', {}) ph_map = getattr(self, 'process_handle_map', {})
if parts[0] in ph_vars and parts[1] == 'pid': if parts[0] in ph_vars and parts[1] == 'pid':
cp = ph_map[parts[0]] cp = ph_map[parts[0]]
return f'\\$${cp}_PID' return f'\\$${cp}_pid'
return f'\\$${{__CT_OBJ["${{{parts[0]}}}.{parts[1]}"]:-}}' return f'\\$${{__CT_OBJ["${{{parts[0]}}}.{parts[1]}"]:-}}'
return f'\\$${{{content}}}' return f'\\$${{{content}}}'
...@@ -140,7 +140,7 @@ class ExprMixin: ...@@ -140,7 +140,7 @@ class ExprMixin:
ph_map = getattr(self, 'process_handle_map', {}) ph_map = getattr(self, 'process_handle_map', {})
if parts[0] in ph_vars and parts[1] == 'pid': if parts[0] in ph_vars and parts[1] == 'pid':
cp = ph_map[parts[0]] cp = ph_map[parts[0]]
return f'${cp}_PID' return f'${cp}_pid'
return f'${{__CT_OBJ["${{{parts[0]}}}.{parts[1]}"]:-}}' return f'${{__CT_OBJ["${{{parts[0]}}}.{parts[1]}"]:-}}'
return f'${{{content}}}' return f'${{{content}}}'
...@@ -468,8 +468,10 @@ class ExprMixin: ...@@ -468,8 +468,10 @@ class ExprMixin:
else: else:
cmd_str = self.generate_expr(inner) cmd_str = self.generate_expr(inner)
self.emit(f'coproc {cp_name} {{ {cmd_str}; }}') self.emit(f'coproc {cp_name} {{ {cmd_str}; }}')
self.emit(f'{cp_name}_wr=${{{cp_name}[1]}}') self.emit(f'exec {{{cp_name}_wr}}>&${{{cp_name}[1]}}')
self.emit(f'{cp_name}_rd=${{{cp_name}[0]}}') self.emit(f'exec {{{cp_name}_rd}}<&${{{cp_name}[0]}}')
self.emit(f'{cp_name}_pid=${cp_name}_PID')
self.emit(f'eval "exec ${{{cp_name}[1]}}>&-"')
return "" return ""
def _generate_member_access(self, expr: MemberAccess) -> str: def _generate_member_access(self, expr: MemberAccess) -> str:
...@@ -480,7 +482,7 @@ class ExprMixin: ...@@ -480,7 +482,7 @@ class ExprMixin:
obj_name = expr.object.name obj_name = expr.object.name
if obj_name in getattr(self, 'process_handle_vars', set()) and expr.member == "pid": if obj_name in getattr(self, 'process_handle_vars', set()) and expr.member == "pid":
cp = self.process_handle_map[obj_name] cp = self.process_handle_map[obj_name]
return f'${cp}_PID' return f'${cp}_pid'
if isinstance(expr.object, Identifier): if isinstance(expr.object, Identifier):
obj_name = expr.object.name obj_name = expr.object.name
......
...@@ -359,8 +359,10 @@ class StmtMixin: ...@@ -359,8 +359,10 @@ class StmtMixin:
else: else:
cmd_str = self.generate_expr(inner) cmd_str = self.generate_expr(inner)
self.emit(f'coproc {cp_name} {{ {cmd_str}; }}') self.emit(f'coproc {cp_name} {{ {cmd_str}; }}')
self.emit(f'{cp_name}_wr=${{{cp_name}[1]}}') self.emit(f'exec {{{cp_name}_wr}}>&${{{cp_name}[1]}}')
self.emit(f'{cp_name}_rd=${{{cp_name}[0]}}') self.emit(f'exec {{{cp_name}_rd}}<&${{{cp_name}[0]}}')
self.emit(f'{cp_name}_pid=${cp_name}_PID')
self.emit(f'eval "exec ${{{cp_name}[1]}}>&-"')
self.process_handle_vars.add(var) self.process_handle_vars.add(var)
self.process_handle_map[var] = cp_name self.process_handle_map[var] = cp_name
with_async[i] = cp_name with_async[i] = cp_name
...@@ -388,7 +390,7 @@ class StmtMixin: ...@@ -388,7 +390,7 @@ class StmtMixin:
if i in with_async: if i in with_async:
cp = with_async[i] cp = with_async[i]
self.emit(f'exec {{{cp}_wr}}>&- 2>/dev/null || true') self.emit(f'exec {{{cp}_wr}}>&- 2>/dev/null || true')
self.emit(f'wait ${cp}_PID 2>/dev/null || true') self.emit(f'wait ${cp}_pid 2>/dev/null || true')
else: else:
self.emit(f'__ct_fh___exit__ "$__ct_with_{i}"') self.emit(f'__ct_fh___exit__ "$__ct_with_{i}"')
...@@ -470,7 +472,7 @@ class StmtMixin: ...@@ -470,7 +472,7 @@ class StmtMixin:
var = stmt.expression.name var = stmt.expression.name
if var in self.process_handle_vars: if var in self.process_handle_vars:
cp = self.process_handle_map[var] cp = self.process_handle_map[var]
self.emit(f'wait ${cp}_PID 2>/dev/null || true') self.emit(f'wait ${cp}_pid 2>/dev/null || true')
return return
expr = self.generate_expr(stmt.expression) expr = self.generate_expr(stmt.expression)
self.emit(f'wait "{expr}" 2>/dev/null || true') self.emit(f'wait "{expr}" 2>/dev/null || true')
......
...@@ -125,11 +125,11 @@ await proc ...@@ -125,11 +125,11 @@ await proc
''') ''')
assert code == 0 assert code == 0
assert "coproc __ct_cp1" in output assert "coproc __ct_cp1" in output
assert "__ct_cp1_wr=${__ct_cp1[1]}" in output assert "exec {__ct_cp1_wr}>&${__ct_cp1[1]}" in output
assert "__ct_cp1_rd=${__ct_cp1[0]}" in output assert "exec {__ct_cp1_rd}<&${__ct_cp1[0]}" in output
assert 'echo "data" >&$__ct_cp1_wr' in output assert 'echo "data" >&$__ct_cp1_wr' in output
assert "exec {__ct_cp1_wr}>&-" in output assert "exec {__ct_cp1_wr}>&-" in output
assert "wait $__ct_cp1_PID" in output assert "wait $__ct_cp1_pid" in output
def test_compile_on_signal(self): def test_compile_on_signal(self):
code, output, _ = compile_ct(''' code, output, _ = compile_ct('''
...@@ -161,8 +161,8 @@ await p2 ...@@ -161,8 +161,8 @@ await p2
assert code == 0 assert code == 0
assert "coproc __ct_cp1" in output assert "coproc __ct_cp1" in output
assert "coproc __ct_cp2" in output assert "coproc __ct_cp2" in output
assert "wait $__ct_cp1_PID" in output assert "wait $__ct_cp1_pid" in output
assert "wait $__ct_cp2_PID" in output assert "wait $__ct_cp2_pid" in output
class TestKeywordAsIdentifier: class TestKeywordAsIdentifier:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment