summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorskal <pascal.massimino@gmail.com>2026-02-16 08:39:59 +0100
committerskal <pascal.massimino@gmail.com>2026-02-16 08:39:59 +0100
commitb17282752ee7aa384ca36daa21d034617ca34a4b (patch)
treebd1e0f91ffcbdd835a0f1a0b9b87f6aa9de418fb /tools
parent9d1d4df877f96f1970dce2ab30cfae49d3d796e1 (diff)
feat(sequence): Phase 2 - Python DAG compiler
- Pure Python 3 compiler for v2 timeline syntax - DAG validation: cycle detection, connectivity, node inference - Topological sort (Kahn's algorithm) - Lifetime analysis for optimization - Ping-pong detection framework (needs refinement) - Multi-input/multi-output effect routing - Generates optimized C++ SequenceV2 subclasses Validated on: - Simple linear chain (source->temp->sink) - Complex DAG (deferred render + compose + post) - Generates correct execution order Phase 2 complete. Next: Phase 3 effect migration handoff(Claude): Phase 2 complete, compiler generates valid C++
Diffstat (limited to 'tools')
-rwxr-xr-xtools/seq_compiler_v2.py499
1 files changed, 499 insertions, 0 deletions
diff --git a/tools/seq_compiler_v2.py b/tools/seq_compiler_v2.py
new file mode 100755
index 0000000..56bf613
--- /dev/null
+++ b/tools/seq_compiler_v2.py
@@ -0,0 +1,499 @@
+#!/usr/bin/env python3
+"""Sequence v2 Compiler - DAG-based timeline compiler with ping-pong optimization.
+
+Converts v2 timeline syntax into optimized C++ SequenceV2 subclasses.
+Performs DAG validation, topological sorting, and lifetime analysis.
+"""
+
+import argparse
+import os
+import re
+import sys
+from typing import Dict, List, Set, Tuple, Optional
+
+# Node type enum mapping
+NODE_TYPES = {
+ 'u8x4_norm': 'NodeType::U8X4_NORM',
+ 'f32x4': 'NodeType::F32X4',
+ 'f16x8': 'NodeType::F16X8',
+ 'depth24': 'NodeType::DEPTH24',
+ 'compute_f32': 'NodeType::COMPUTE_F32',
+}
+
+class NodeDecl:
+ def __init__(self, name: str, node_type: str):
+ self.name = name
+ self.type = node_type
+
+class EffectDecl:
+ def __init__(self, class_name: str, inputs: List[str], outputs: List[str],
+ start: float, end: float, priority: int, params: str):
+ self.class_name = class_name
+ self.inputs = inputs
+ self.outputs = outputs
+ self.start = start
+ self.end = end
+ self.priority = priority
+ self.params = params
+ self.execution_order = -1
+
+class SequenceDecl:
+ def __init__(self, name: str, start_time: float, priority: int):
+ self.name = name
+ self.start_time = start_time
+ self.priority = priority
+ self.nodes: Dict[str, NodeDecl] = {}
+ self.assets: Set[str] = set()
+ self.effects: List[EffectDecl] = []
+
+def parse_timeline(filename: str) -> List[SequenceDecl]:
+ """Parse v2 timeline file."""
+ sequences = []
+ current_seq = None
+
+ with open(filename, 'r') as f:
+ for line_num, line in enumerate(f, 1):
+ line = line.strip()
+
+ # Skip comments and empty lines
+ if not line or line.startswith('#'):
+ continue
+
+ # BPM directive (ignored for now)
+ if line.startswith('# BPM'):
+ continue
+
+ # SEQUENCE start
+ if line.startswith('SEQUENCE'):
+ parts = line.split()
+ start_time = float(parts[1])
+ priority = int(parts[2])
+ name = ' '.join(parts[3:]).strip('"') if len(parts) > 3 else f"seq_{start_time}"
+ current_seq = SequenceDecl(name, start_time, priority)
+ sequences.append(current_seq)
+ continue
+
+ if not current_seq:
+ print(f"Error: {filename}:{line_num}: Effect/Node outside SEQUENCE block", file=sys.stderr)
+ sys.exit(1)
+
+ # NODE declaration
+ if line.startswith('NODE'):
+ parts = line.split()
+ if len(parts) < 3:
+ print(f"Error: {filename}:{line_num}: NODE requires name and type", file=sys.stderr)
+ sys.exit(1)
+ node_name = parts[1]
+ node_type = parts[2]
+ if node_type not in NODE_TYPES:
+ print(f"Error: {filename}:{line_num}: Unknown node type '{node_type}'", file=sys.stderr)
+ sys.exit(1)
+ current_seq.nodes[node_name] = NodeDecl(node_name, node_type)
+ continue
+
+ # ASSET declaration
+ if line.startswith('ASSET'):
+ parts = line.split()
+ if len(parts) < 2:
+ print(f"Error: {filename}:{line_num}: ASSET requires name", file=sys.stderr)
+ sys.exit(1)
+ current_seq.assets.add(parts[1])
+ continue
+
+ # EFFECT with routing
+ if line.startswith('EFFECT'):
+ # Parse: EFFECT +/=/- ClassName inputs... -> outputs... start end [params...]
+ match = re.match(r'EFFECT\s+([+\-=])\s+(\w+)\s+(.+)', line)
+ if not match:
+ print(f"Error: {filename}:{line_num}: Invalid EFFECT syntax", file=sys.stderr)
+ sys.exit(1)
+
+ priority_mod = match.group(1)
+ class_name = match.group(2)
+ rest = match.group(3)
+
+ # Parse routing: inputs... -> outputs... start end [params]
+ if '->' not in rest:
+ print(f"Error: {filename}:{line_num}: EFFECT missing '->' routing", file=sys.stderr)
+ sys.exit(1)
+
+ before_arrow, after_arrow = rest.split('->', 1)
+ inputs = before_arrow.strip().split()
+
+ after_parts = after_arrow.strip().split()
+ # Find where outputs end (look for numeric start time)
+ outputs = []
+ idx = 0
+ while idx < len(after_parts):
+ try:
+ float(after_parts[idx])
+ break
+ except ValueError:
+ outputs.append(after_parts[idx])
+ idx += 1
+
+ if idx + 2 > len(after_parts):
+ print(f"Error: {filename}:{line_num}: EFFECT missing start/end times", file=sys.stderr)
+ sys.exit(1)
+
+ start_time = float(after_parts[idx])
+ end_time = float(after_parts[idx + 1])
+ params = ' '.join(after_parts[idx + 2:]) if idx + 2 < len(after_parts) else ''
+
+ # Priority calculation (relative to sequence priority)
+ if priority_mod == '+':
+ effect_priority = current_seq.priority + len(current_seq.effects)
+ elif priority_mod == '=':
+ effect_priority = current_seq.priority + len(current_seq.effects) - 1 if current_seq.effects else current_seq.priority
+ else: # '-'
+ effect_priority = current_seq.priority - 1
+
+ effect = EffectDecl(class_name, inputs, outputs, start_time, end_time, effect_priority, params)
+ current_seq.effects.append(effect)
+ continue
+
+ print(f"Warning: {filename}:{line_num}: Unrecognized line: {line}", file=sys.stderr)
+
+ return sequences
+
+def validate_dag(seq: SequenceDecl) -> None:
+ """Validate DAG: check for cycles, missing nodes, connectivity."""
+
+ # 1. Auto-infer nodes from effects
+ all_nodes = set(seq.nodes.keys())
+ all_nodes.add('source') # Implicit
+ all_nodes.add('sink') # Implicit
+
+ for effect in seq.effects:
+ for node in effect.inputs + effect.outputs:
+ if node not in all_nodes and node not in seq.nodes:
+ # Auto-infer as u8x4_norm
+ seq.nodes[node] = NodeDecl(node, 'u8x4_norm')
+ all_nodes.add(node)
+
+ # 2. Check all referenced nodes exist
+ for effect in seq.effects:
+ for node in effect.inputs:
+ if node not in all_nodes:
+ print(f"Error: Effect {effect.class_name} references undefined input node '{node}'", file=sys.stderr)
+ sys.exit(1)
+ for node in effect.outputs:
+ if node not in all_nodes:
+ print(f"Error: Effect {effect.class_name} references undefined output node '{node}'", file=sys.stderr)
+ sys.exit(1)
+
+ # 3. Check for cycles (DFS on effect graph, not node graph)
+ effect_visited = {}
+ for effect in seq.effects:
+ effect_visited[id(effect)] = 0 # 0=unvisited, 1=visiting, 2=visited
+
+ # Build effect dependency graph
+ def get_effect_dependencies(effect: EffectDecl) -> List[EffectDecl]:
+ """Get effects that must execute before this one."""
+ deps = []
+ effect_idx = seq.effects.index(effect)
+
+ for input_node in effect.inputs:
+ if input_node == 'source':
+ continue
+ # Find LAST effect before this one that produces this input
+ producer = None
+ for i in range(effect_idx - 1, -1, -1):
+ other = seq.effects[i]
+ if input_node in other.outputs:
+ producer = other
+ break
+
+ if producer:
+ deps.append(producer)
+ return deps
+
+ def dfs_cycle(effect: EffectDecl) -> bool:
+ eff_id = id(effect)
+ if effect_visited[eff_id] == 1:
+ return True # Back edge = cycle
+ if effect_visited[eff_id] == 2:
+ return False
+
+ effect_visited[eff_id] = 1
+ for dep in get_effect_dependencies(effect):
+ if dfs_cycle(dep):
+ return True
+ effect_visited[eff_id] = 2
+ return False
+
+ for effect in seq.effects:
+ if dfs_cycle(effect):
+ print(f"Error: Cycle detected in effect DAG involving effect '{effect.class_name}'", file=sys.stderr)
+ sys.exit(1)
+
+ # 4. Check connectivity (source must reach sink)
+ reachable = set(['source'])
+ changed = True
+ while changed:
+ changed = False
+ for effect in seq.effects:
+ if any(inp in reachable for inp in effect.inputs):
+ for out in effect.outputs:
+ if out not in reachable:
+ reachable.add(out)
+ changed = True
+
+ if 'sink' not in reachable:
+ print(f"Error: No path from 'source' to 'sink' in DAG", file=sys.stderr)
+ sys.exit(1)
+
+def topological_sort(seq: SequenceDecl) -> List[EffectDecl]:
+ """Sort effects in execution order using Kahn's algorithm."""
+
+ # Build dependency graph
+ in_degree = {}
+ for effect in seq.effects:
+ in_degree[id(effect)] = 0
+
+ # Count dependencies
+ node_producers = {} # node -> effect that produces it
+ for effect in seq.effects:
+ for output in effect.outputs:
+ node_producers[output] = effect
+
+ # Calculate in-degrees
+ for effect in seq.effects:
+ for input_node in effect.inputs:
+ if input_node == 'source':
+ continue
+ if input_node in node_producers:
+ in_degree[id(effect)] += 1
+
+ # Find effects with no dependencies
+ queue = [eff for eff in seq.effects if in_degree[id(eff)] == 0]
+ sorted_effects = []
+
+ while queue:
+ current = queue.pop(0)
+ sorted_effects.append(current)
+
+ # Mark outputs as available, decrement downstream dependencies
+ for output in current.outputs:
+ for other in seq.effects:
+ if output in other.inputs and id(other) != id(current):
+ in_degree[id(other)] -= 1
+ if in_degree[id(other)] == 0:
+ queue.append(other)
+
+ if len(sorted_effects) != len(seq.effects):
+ print(f"Error: DAG has unreachable effects (disconnected components)", file=sys.stderr)
+ sys.exit(1)
+
+ # Assign execution order
+ for idx, effect in enumerate(sorted_effects):
+ effect.execution_order = idx
+
+ return sorted_effects
+
+def analyze_lifetimes(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, Tuple[int, int]]:
+ """Analyze node lifetimes: (first_use, last_use) execution order indices."""
+
+ lifetimes = {}
+
+ for effect in sorted_effects:
+ order = effect.execution_order
+
+ for node in effect.inputs:
+ if node == 'source':
+ continue
+ if node not in lifetimes:
+ lifetimes[node] = (order, order)
+ else:
+ lifetimes[node] = (lifetimes[node][0], order)
+
+ for node in effect.outputs:
+ if node == 'sink':
+ continue
+ if node not in lifetimes:
+ lifetimes[node] = (order, order)
+ else:
+ lifetimes[node] = (min(lifetimes[node][0], order), max(lifetimes[node][1], order))
+
+ return lifetimes
+
+def detect_ping_pong(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, str]:
+ """Detect ping-pong patterns and return alias map.
+
+ Pattern: Effect i writes A, reads B; Effect i+1 writes B, reads A
+ Optimization: Alias B -> A (reuse same texture)
+ """
+
+ aliases = {}
+ used_nodes = set()
+
+ # Look for adjacent alternating read/write patterns
+ for i in range(len(sorted_effects) - 1):
+ eff1 = sorted_effects[i]
+ eff2 = sorted_effects[i + 1]
+
+ print(f"DEBUG: Checking pair {i}: {eff1.class_name}({eff1.inputs}->{eff1.outputs}) and {eff2.class_name}({eff2.inputs}->{eff2.outputs})", file=sys.stderr)
+
+ # Find nodes that alternate
+ for out1 in eff1.outputs:
+ if out1 in ['source', 'sink'] or out1 in used_nodes:
+ continue
+
+ for in1 in eff1.inputs:
+ if in1 in ['source', 'sink'] or in1 in used_nodes:
+ continue
+
+ print(f"DEBUG: Testing out1={out1}, in1={in1}: in1 in eff2.outputs={in1 in eff2.outputs}, out1 in eff2.inputs={out1 in eff2.inputs}", file=sys.stderr)
+
+ # Check if eff2 writes in1 and reads out1 (alternating)
+ if in1 in eff2.outputs and out1 in eff2.inputs:
+ # Classic ping-pong: eff1 (reads in1, writes out1), eff2 (reads out1, writes in1)
+ print(f"DEBUG: Found potential ping-pong: {in1} <-> {out1} between {eff1.class_name} and {eff2.class_name}", file=sys.stderr)
+
+ # Check no other effects use these nodes
+ other_uses = False
+ for j, eff in enumerate(sorted_effects):
+ if j == i or j == i + 1:
+ continue
+ if out1 in eff.inputs + eff.outputs or in1 in eff.inputs + eff.outputs:
+ print(f"DEBUG: Other effect {eff.class_name} uses {out1 if out1 in eff.inputs + eff.outputs else in1}", file=sys.stderr)
+ other_uses = True
+ break
+
+ if not other_uses:
+ print(f"DEBUG: Aliasing {in1} -> {out1}", file=sys.stderr)
+ # Alias in1 -> out1 (in1 uses same texture as out1)
+ aliases[in1] = out1
+ used_nodes.add(out1)
+ used_nodes.add(in1)
+ break
+ else:
+ print(f"DEBUG: Skipping alias due to other uses", file=sys.stderr)
+
+ return aliases
+
+def generate_cpp(seq: SequenceDecl, sorted_effects: List[EffectDecl],
+ aliases: Dict[str, str], flatten: bool = False) -> str:
+ """Generate C++ SequenceV2 subclass."""
+
+ class_name = seq.name.replace(' ', '_').replace('-', '_')
+ if not class_name[0].isalpha():
+ class_name = 'Seq_' + class_name
+ class_name += 'Sequence'
+
+ # Generate includes
+ includes = set()
+ for effect in seq.effects:
+ # Convert ClassName to snake_case header
+ # Remove V2 suffix first if present
+ base_name = effect.class_name
+ if base_name.endswith('V2'):
+ base_name = base_name[:-2]
+
+ header = re.sub('([A-Z])', r'_\1', base_name).lower().lstrip('_')
+ if header.endswith('_effect'):
+ header = header[:-7] # Remove _effect suffix
+ includes.add(f'#include "effects/{header}_effect_v2.h"')
+
+ cpp = f'''// Generated by seq_compiler_v2.py
+// Sequence: {seq.name}
+
+#include "gpu/sequence_v2.h"
+#include "gpu/effect_v2.h"
+'''
+
+ for inc in sorted(includes):
+ cpp += inc + '\n'
+
+ cpp += f'''
+class {class_name} : public SequenceV2 {{
+ public:
+ {class_name}(const GpuContext& ctx, int width, int height)
+ : SequenceV2(ctx, width, height) {{
+'''
+
+ # Node declarations
+ cpp += ' // Node declarations\n'
+ for node_name, node_decl in sorted(seq.nodes.items()):
+ if node_name in aliases:
+ # Aliased node
+ cpp += f' nodes_.declare_aliased_node("{node_name}", "{aliases[node_name]}");\n'
+ else:
+ node_type = NODE_TYPES[node_decl.type]
+ cpp += f' nodes_.declare_node("{node_name}", {node_type}, width_, height_);\n'
+
+ cpp += '\n // Effect DAG construction\n'
+
+ # Effect instantiation
+ for effect in sorted_effects:
+ inputs_str = ', '.join(f'"{inp}"' for inp in effect.inputs)
+ outputs_str = ', '.join(f'"{out}"' for out in effect.outputs)
+
+ # Ensure class name has V2 suffix (add if not present)
+ effect_class = effect.class_name if effect.class_name.endswith('V2') else effect.class_name + 'V2'
+
+ cpp += f''' effect_dag_.push_back({{
+ .effect = std::make_shared<{effect_class}>(ctx,
+ std::vector<std::string>{{{inputs_str}}},
+ std::vector<std::string>{{{outputs_str}}}),
+ .input_nodes = {{{inputs_str}}},
+ .output_nodes = {{{outputs_str}}},
+ .execution_order = {effect.execution_order}
+ }});
+'''
+
+ cpp += ''' }
+};
+'''
+
+ return cpp
+
+def main():
+ parser = argparse.ArgumentParser(description='Sequence v2 compiler with DAG optimization')
+ parser.add_argument('input', help='Input .seq file')
+ parser.add_argument('--output', '-o', help='Output .cc file', required=True)
+ parser.add_argument('--flatten', action='store_true', help='Generate flattened code (FINAL_STRIP mode)')
+
+ args = parser.parse_args()
+
+ # Parse timeline
+ sequences = parse_timeline(args.input)
+
+ if not sequences:
+ print("Error: No sequences found in input file", file=sys.stderr)
+ sys.exit(1)
+
+ # Process each sequence
+ all_cpp = '''// Generated by seq_compiler_v2.py
+// DO NOT EDIT
+
+#include "gpu/sequence_v2.h"
+#include "gpu/effect_v2.h"
+
+'''
+
+ for seq in sequences:
+ # Validate DAG
+ validate_dag(seq)
+
+ # Topological sort
+ sorted_effects = topological_sort(seq)
+
+ # Lifetime analysis
+ lifetimes = analyze_lifetimes(seq, sorted_effects)
+
+ # Ping-pong detection
+ aliases = detect_ping_pong(seq, sorted_effects)
+
+ # Generate C++
+ cpp = generate_cpp(seq, sorted_effects, aliases, args.flatten)
+ all_cpp += cpp + '\n'
+
+ # Write output
+ with open(args.output, 'w') as f:
+ f.write(all_cpp)
+
+ print(f"Generated {len(sequences)} sequence(s) -> {args.output}")
+
+if __name__ == '__main__':
+ main()