#!/usr/bin/env python3 """Sequence v2 Compiler - DAG-based timeline compiler with ping-pong optimization. Converts v2 timeline syntax into optimized C++ SequenceV2 subclasses. Performs DAG validation, topological sorting, and lifetime analysis. """ import argparse import os import re import sys from typing import Dict, List, Set, Tuple, Optional # Node type enum mapping NODE_TYPES = { 'u8x4_norm': 'NodeType::U8X4_NORM', 'f32x4': 'NodeType::F32X4', 'f16x8': 'NodeType::F16X8', 'depth24': 'NodeType::DEPTH24', 'compute_f32': 'NodeType::COMPUTE_F32', } class NodeDecl: def __init__(self, name: str, node_type: str): self.name = name self.type = node_type class EffectDecl: def __init__(self, class_name: str, inputs: List[str], outputs: List[str], start: float, end: float, priority: int, params: str): self.class_name = class_name self.inputs = inputs self.outputs = outputs self.start = start self.end = end self.priority = priority self.params = params self.execution_order = -1 class SequenceDecl: def __init__(self, name: str, start_time: float, priority: int): self.name = name self.start_time = start_time self.priority = priority self.nodes: Dict[str, NodeDecl] = {} self.assets: Set[str] = set() self.effects: List[EffectDecl] = [] def parse_timeline(filename: str) -> List[SequenceDecl]: """Parse v2 timeline file.""" sequences = [] current_seq = None with open(filename, 'r') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # BPM directive (ignored for now) if line.startswith('# BPM'): continue # SEQUENCE start if line.startswith('SEQUENCE'): parts = line.split() start_time = float(parts[1]) priority = int(parts[2]) name = ' '.join(parts[3:]).strip('"') if len(parts) > 3 else f"seq_{start_time}" current_seq = SequenceDecl(name, start_time, priority) sequences.append(current_seq) continue if not current_seq: print(f"Error: {filename}:{line_num}: Effect/Node outside SEQUENCE block", file=sys.stderr) sys.exit(1) # NODE declaration if line.startswith('NODE'): parts = line.split() if len(parts) < 3: print(f"Error: {filename}:{line_num}: NODE requires name and type", file=sys.stderr) sys.exit(1) node_name = parts[1] node_type = parts[2] if node_type not in NODE_TYPES: print(f"Error: {filename}:{line_num}: Unknown node type '{node_type}'", file=sys.stderr) sys.exit(1) current_seq.nodes[node_name] = NodeDecl(node_name, node_type) continue # ASSET declaration if line.startswith('ASSET'): parts = line.split() if len(parts) < 2: print(f"Error: {filename}:{line_num}: ASSET requires name", file=sys.stderr) sys.exit(1) current_seq.assets.add(parts[1]) continue # EFFECT with routing if line.startswith('EFFECT'): # Parse: EFFECT +/=/- ClassName inputs... -> outputs... start end [params...] match = re.match(r'EFFECT\s+([+\-=])\s+(\w+)\s+(.+)', line) if not match: print(f"Error: {filename}:{line_num}: Invalid EFFECT syntax", file=sys.stderr) sys.exit(1) priority_mod = match.group(1) class_name = match.group(2) rest = match.group(3) # Parse routing: inputs... -> outputs... start end [params] if '->' not in rest: print(f"Error: {filename}:{line_num}: EFFECT missing '->' routing", file=sys.stderr) sys.exit(1) before_arrow, after_arrow = rest.split('->', 1) inputs = before_arrow.strip().split() after_parts = after_arrow.strip().split() # Find where outputs end (look for numeric start time) outputs = [] idx = 0 while idx < len(after_parts): try: float(after_parts[idx]) break except ValueError: outputs.append(after_parts[idx]) idx += 1 if idx + 2 > len(after_parts): print(f"Error: {filename}:{line_num}: EFFECT missing start/end times", file=sys.stderr) sys.exit(1) start_time = float(after_parts[idx]) end_time = float(after_parts[idx + 1]) params = ' '.join(after_parts[idx + 2:]) if idx + 2 < len(after_parts) else '' # Priority calculation (relative to sequence priority) if priority_mod == '+': effect_priority = current_seq.priority + len(current_seq.effects) elif priority_mod == '=': effect_priority = current_seq.priority + len(current_seq.effects) - 1 if current_seq.effects else current_seq.priority else: # '-' effect_priority = current_seq.priority - 1 effect = EffectDecl(class_name, inputs, outputs, start_time, end_time, effect_priority, params) current_seq.effects.append(effect) continue print(f"Warning: {filename}:{line_num}: Unrecognized line: {line}", file=sys.stderr) return sequences def validate_dag(seq: SequenceDecl) -> None: """Validate DAG: check for cycles, missing nodes, connectivity.""" # 1. Auto-infer nodes from effects all_nodes = set(seq.nodes.keys()) all_nodes.add('source') # Implicit all_nodes.add('sink') # Implicit for effect in seq.effects: for node in effect.inputs + effect.outputs: if node not in all_nodes and node not in seq.nodes: # Auto-infer as u8x4_norm seq.nodes[node] = NodeDecl(node, 'u8x4_norm') all_nodes.add(node) # 2. Check all referenced nodes exist for effect in seq.effects: for node in effect.inputs: if node not in all_nodes: print(f"Error: Effect {effect.class_name} references undefined input node '{node}'", file=sys.stderr) sys.exit(1) for node in effect.outputs: if node not in all_nodes: print(f"Error: Effect {effect.class_name} references undefined output node '{node}'", file=sys.stderr) sys.exit(1) # 3. Check for cycles (DFS on effect graph, not node graph) effect_visited = {} for effect in seq.effects: effect_visited[id(effect)] = 0 # 0=unvisited, 1=visiting, 2=visited # Build effect dependency graph def get_effect_dependencies(effect: EffectDecl) -> List[EffectDecl]: """Get effects that must execute before this one.""" deps = [] effect_idx = seq.effects.index(effect) for input_node in effect.inputs: if input_node == 'source': continue # Find LAST effect before this one that produces this input producer = None for i in range(effect_idx - 1, -1, -1): other = seq.effects[i] if input_node in other.outputs: producer = other break if producer: deps.append(producer) return deps def dfs_cycle(effect: EffectDecl) -> bool: eff_id = id(effect) if effect_visited[eff_id] == 1: return True # Back edge = cycle if effect_visited[eff_id] == 2: return False effect_visited[eff_id] = 1 for dep in get_effect_dependencies(effect): if dfs_cycle(dep): return True effect_visited[eff_id] = 2 return False for effect in seq.effects: if dfs_cycle(effect): print(f"Error: Cycle detected in effect DAG involving effect '{effect.class_name}'", file=sys.stderr) sys.exit(1) # 4. Check connectivity (source must reach sink) reachable = set(['source']) changed = True while changed: changed = False for effect in seq.effects: if any(inp in reachable for inp in effect.inputs): for out in effect.outputs: if out not in reachable: reachable.add(out) changed = True if 'sink' not in reachable: print(f"Error: No path from 'source' to 'sink' in DAG", file=sys.stderr) sys.exit(1) def topological_sort(seq: SequenceDecl) -> List[EffectDecl]: """Sort effects in execution order using Kahn's algorithm.""" # Build dependency graph in_degree = {} for effect in seq.effects: in_degree[id(effect)] = 0 # Count dependencies node_producers = {} # node -> effect that produces it for effect in seq.effects: for output in effect.outputs: node_producers[output] = effect # Calculate in-degrees for effect in seq.effects: for input_node in effect.inputs: if input_node == 'source': continue if input_node in node_producers: in_degree[id(effect)] += 1 # Find effects with no dependencies queue = [eff for eff in seq.effects if in_degree[id(eff)] == 0] sorted_effects = [] while queue: current = queue.pop(0) sorted_effects.append(current) # Mark outputs as available, decrement downstream dependencies for output in current.outputs: for other in seq.effects: if output in other.inputs and id(other) != id(current): in_degree[id(other)] -= 1 if in_degree[id(other)] == 0: queue.append(other) if len(sorted_effects) != len(seq.effects): print(f"Error: DAG has unreachable effects (disconnected components)", file=sys.stderr) sys.exit(1) # Assign execution order for idx, effect in enumerate(sorted_effects): effect.execution_order = idx return sorted_effects def analyze_lifetimes(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, Tuple[int, int]]: """Analyze node lifetimes: (first_use, last_use) execution order indices.""" lifetimes = {} for effect in sorted_effects: order = effect.execution_order for node in effect.inputs: if node == 'source': continue if node not in lifetimes: lifetimes[node] = (order, order) else: lifetimes[node] = (lifetimes[node][0], order) for node in effect.outputs: if node == 'sink': continue if node not in lifetimes: lifetimes[node] = (order, order) else: lifetimes[node] = (min(lifetimes[node][0], order), max(lifetimes[node][1], order)) return lifetimes def detect_ping_pong(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, str]: """Detect ping-pong patterns and return alias map. Pattern: Effect i writes A, reads B; Effect i+1 writes B, reads A Optimization: Alias B -> A (reuse same texture) """ aliases = {} used_nodes = set() # Look for adjacent alternating read/write patterns for i in range(len(sorted_effects) - 1): eff1 = sorted_effects[i] eff2 = sorted_effects[i + 1] # Find nodes that alternate for out1 in eff1.outputs: if out1 in ['source', 'sink'] or out1 in used_nodes: continue for in1 in eff1.inputs: if in1 in ['source', 'sink'] or in1 in used_nodes: continue # Check if eff2 writes in1 and reads out1 (alternating) if in1 in eff2.outputs and out1 in eff2.inputs: # Classic ping-pong: eff1 (reads in1, writes out1), eff2 (reads out1, writes in1) # Check no other effects use these nodes other_uses = False for j, eff in enumerate(sorted_effects): if j == i or j == i + 1: continue if out1 in eff.inputs + eff.outputs or in1 in eff.inputs + eff.outputs: other_uses = True break if not other_uses: # Alias in1 -> out1 (in1 uses same texture as out1) aliases[in1] = out1 used_nodes.add(out1) used_nodes.add(in1) break return aliases def generate_cpp(seq: SequenceDecl, sorted_effects: List[EffectDecl], aliases: Dict[str, str], flatten: bool = False) -> str: """Generate C++ SequenceV2 subclass.""" class_name = seq.name.replace(' ', '_').replace('-', '_') if not class_name[0].isalpha(): class_name = 'Seq_' + class_name class_name += 'Sequence' # Generate includes includes = set() for effect in seq.effects: # Convert ClassName to snake_case header # Remove V2 suffix first if present base_name = effect.class_name if base_name.endswith('V2'): base_name = base_name[:-2] header = re.sub('([A-Z])', r'_\1', base_name).lower().lstrip('_') if header.endswith('_effect'): header = header[:-7] # Remove _effect suffix includes.add(f'#include "effects/{header}_effect_v2.h"') cpp = f'''// Generated by seq_compiler_v2.py // Sequence: {seq.name} #include "gpu/sequence_v2.h" #include "gpu/effect_v2.h" ''' for inc in sorted(includes): cpp += inc + '\n' cpp += f''' class {class_name} : public SequenceV2 {{ public: {class_name}(const GpuContext& ctx, int width, int height) : SequenceV2(ctx, width, height) {{ ''' # Node declarations cpp += ' // Node declarations\n' for node_name, node_decl in sorted(seq.nodes.items()): if node_name in aliases: # Aliased node cpp += f' nodes_.declare_aliased_node("{node_name}", "{aliases[node_name]}");\n' else: node_type = NODE_TYPES[node_decl.type] cpp += f' nodes_.declare_node("{node_name}", {node_type}, width_, height_);\n' cpp += '\n // Effect DAG construction\n' # Effect instantiation for effect in sorted_effects: inputs_str = ', '.join(f'"{inp}"' for inp in effect.inputs) outputs_str = ', '.join(f'"{out}"' for out in effect.outputs) # Ensure class name has V2 suffix (add if not present) effect_class = effect.class_name if effect.class_name.endswith('V2') else effect.class_name + 'V2' cpp += f''' effect_dag_.push_back({{ .effect = std::make_shared<{effect_class}>(ctx, std::vector{{{inputs_str}}}, std::vector{{{outputs_str}}}), .input_nodes = {{{inputs_str}}}, .output_nodes = {{{outputs_str}}}, .execution_order = {effect.execution_order} }}); ''' cpp += ''' } }; ''' return cpp def main(): parser = argparse.ArgumentParser(description='Sequence v2 compiler with DAG optimization') parser.add_argument('input', help='Input .seq file') parser.add_argument('--output', '-o', help='Output .cc file', required=True) parser.add_argument('--flatten', action='store_true', help='Generate flattened code (FINAL_STRIP mode)') args = parser.parse_args() # Parse timeline sequences = parse_timeline(args.input) if not sequences: print("Error: No sequences found in input file", file=sys.stderr) sys.exit(1) # Process each sequence all_cpp = '''// Generated by seq_compiler_v2.py // DO NOT EDIT #include "gpu/sequence_v2.h" #include "gpu/effect_v2.h" ''' for seq in sequences: # Validate DAG validate_dag(seq) # Topological sort sorted_effects = topological_sort(seq) # Lifetime analysis lifetimes = analyze_lifetimes(seq, sorted_effects) # Ping-pong detection aliases = detect_ping_pong(seq, sorted_effects) # Generate C++ cpp = generate_cpp(seq, sorted_effects, aliases, args.flatten) all_cpp += cpp + '\n' # Generate compatibility stubs for v1 API all_cpp += ''' // V1 compatibility stubs (TODO: Replace with proper v2 integration) #include "gpu/effect.h" void LoadTimeline(MainSequence& main_seq, const GpuContext& ctx) { // TODO: Integrate v2 sequences with MainSequence // For now, this is a no-op to allow linking (void)main_seq; (void)ctx; } float GetDemoDuration() { // TODO: Calculate from v2 sequences return 40.0f; } ''' # Write output with open(args.output, 'w') as f: f.write(all_cpp) print(f"Generated {len(sequences)} sequence(s) -> {args.output}") if __name__ == '__main__': main()