#!/usr/bin/env python3 """Sequence Compiler - DAG-based timeline compiler with ping-pong optimization. Converts timeline syntax into optimized C++ Sequence subclasses. Performs DAG validation, topological sorting, and lifetime analysis. """ import argparse import os import re import sys from typing import Dict, List, Set, Tuple, Optional # Node type enum mapping NODE_TYPES = { 'u8x4_norm': 'NodeType::U8X4_NORM', 'f32x4': 'NodeType::F32X4', 'f16x8': 'NodeType::F16X8', 'depth24': 'NodeType::DEPTH24', 'compute_f32': 'NodeType::COMPUTE_F32', } class NodeDecl: def __init__(self, name: str, node_type: str): self.name = name self.type = node_type class EffectDecl: def __init__(self, class_name: str, inputs: List[str], outputs: List[str], start: float, end: float, priority: int, params: str): self.class_name = class_name self.inputs = inputs self.outputs = outputs self.start = start self.end = end self.priority = priority self.params = params self.execution_order = -1 class SequenceDecl: def __init__(self, name: str, start_time: float, priority: int): self.name = name self.start_time = start_time self.priority = priority self.nodes: Dict[str, NodeDecl] = {} self.assets: Set[str] = set() self.effects: List[EffectDecl] = [] def parse_timeline(filename: str) -> List[SequenceDecl]: """Parse timeline file.""" sequences = [] current_seq = None with open(filename, 'r') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # BPM directive (ignored for now) if line.startswith('# BPM'): continue # SEQUENCE start if line.startswith('SEQUENCE'): parts = line.split() start_time = float(parts[1]) priority = int(parts[2]) name = ' '.join(parts[3:]).strip('"') if len(parts) > 3 else f"seq_{start_time}" current_seq = SequenceDecl(name, start_time, priority) sequences.append(current_seq) continue if not current_seq: print(f"Error: {filename}:{line_num}: Effect/Node outside SEQUENCE block", file=sys.stderr) sys.exit(1) # NODE declaration if line.startswith('NODE'): parts = line.split() if len(parts) < 3: print(f"Error: {filename}:{line_num}: NODE requires name and type", file=sys.stderr) sys.exit(1) node_name = parts[1] node_type = parts[2] if node_type not in NODE_TYPES: print(f"Error: {filename}:{line_num}: Unknown node type '{node_type}'", file=sys.stderr) sys.exit(1) current_seq.nodes[node_name] = NodeDecl(node_name, node_type) continue # ASSET declaration if line.startswith('ASSET'): parts = line.split() if len(parts) < 2: print(f"Error: {filename}:{line_num}: ASSET requires name", file=sys.stderr) sys.exit(1) current_seq.assets.add(parts[1]) continue # EFFECT with routing if line.startswith('EFFECT'): # Parse: EFFECT +/=/- ClassName inputs... -> outputs... start end [params...] match = re.match(r'EFFECT\s+([+\-=])\s+(\w+)\s+(.+)', line) if not match: print(f"Error: {filename}:{line_num}: Invalid EFFECT syntax", file=sys.stderr) sys.exit(1) priority_mod = match.group(1) class_name = match.group(2) rest = match.group(3) # Parse routing: inputs... -> outputs... start end [params] if '->' not in rest: print(f"Error: {filename}:{line_num}: EFFECT missing '->' routing", file=sys.stderr) sys.exit(1) before_arrow, after_arrow = rest.split('->', 1) inputs = before_arrow.strip().split() after_parts = after_arrow.strip().split() # Find where outputs end (look for numeric start time) outputs = [] idx = 0 while idx < len(after_parts): try: float(after_parts[idx]) break except ValueError: outputs.append(after_parts[idx]) idx += 1 if idx + 2 > len(after_parts): print(f"Error: {filename}:{line_num}: EFFECT missing start/end times", file=sys.stderr) sys.exit(1) start_time = float(after_parts[idx]) end_time = float(after_parts[idx + 1]) params = ' '.join(after_parts[idx + 2:]) if idx + 2 < len(after_parts) else '' # Priority calculation (relative to sequence priority) if priority_mod == '+': effect_priority = current_seq.priority + len(current_seq.effects) elif priority_mod == '=': effect_priority = current_seq.priority + len(current_seq.effects) - 1 if current_seq.effects else current_seq.priority else: # '-' effect_priority = current_seq.priority - 1 effect = EffectDecl(class_name, inputs, outputs, start_time, end_time, effect_priority, params) current_seq.effects.append(effect) continue print(f"Warning: {filename}:{line_num}: Unrecognized line: {line}", file=sys.stderr) return sequences def validate_dag(seq: SequenceDecl) -> None: """Validate DAG: check for cycles, missing nodes, connectivity.""" # 1. Auto-infer nodes from effects all_nodes = set(seq.nodes.keys()) all_nodes.add('source') # Implicit all_nodes.add('sink') # Implicit for effect in seq.effects: for node in effect.inputs + effect.outputs: if node not in all_nodes and node not in seq.nodes: # Auto-infer as u8x4_norm seq.nodes[node] = NodeDecl(node, 'u8x4_norm') all_nodes.add(node) # 2. Check all referenced nodes exist for effect in seq.effects: for node in effect.inputs: if node not in all_nodes: print(f"Error: Effect {effect.class_name} references undefined input node '{node}'", file=sys.stderr) sys.exit(1) for node in effect.outputs: if node not in all_nodes: print(f"Error: Effect {effect.class_name} references undefined output node '{node}'", file=sys.stderr) sys.exit(1) # 3. Check for cycles (DFS on effect graph, not node graph) effect_visited = {} for effect in seq.effects: effect_visited[id(effect)] = 0 # 0=unvisited, 1=visiting, 2=visited # Build effect dependency graph def get_effect_dependencies(effect: EffectDecl) -> List[EffectDecl]: """Get effects that must execute before this one.""" deps = [] effect_idx = seq.effects.index(effect) for input_node in effect.inputs: if input_node == 'source': continue # Find LAST effect before this one that produces this input producer = None for i in range(effect_idx - 1, -1, -1): other = seq.effects[i] if input_node in other.outputs: producer = other break if producer: deps.append(producer) return deps def dfs_cycle(effect: EffectDecl) -> bool: eff_id = id(effect) if effect_visited[eff_id] == 1: return True # Back edge = cycle if effect_visited[eff_id] == 2: return False effect_visited[eff_id] = 1 for dep in get_effect_dependencies(effect): if dfs_cycle(dep): return True effect_visited[eff_id] = 2 return False for effect in seq.effects: if dfs_cycle(effect): print(f"Error: Cycle detected in effect DAG involving effect '{effect.class_name}'", file=sys.stderr) sys.exit(1) # 4. Check connectivity (source must reach sink) reachable = set(['source']) changed = True while changed: changed = False for effect in seq.effects: if any(inp in reachable for inp in effect.inputs): for out in effect.outputs: if out not in reachable: reachable.add(out) changed = True if 'sink' not in reachable: print(f"Error: No path from 'source' to 'sink' in DAG", file=sys.stderr) sys.exit(1) # 5. Check producer/consumer lifespan constraints # Multi-output effects (can't auto-passthrough) must have lifespan >= all consumers for producer in seq.effects: # Check if producer can auto-passthrough (1:1 topology) if len(producer.inputs) == 1 and len(producer.outputs) == 1: continue # Can auto-passthrough, no constraint # Find all consumers of producer's outputs for output_node in producer.outputs: for consumer in seq.effects: if output_node in consumer.inputs: # Verify: producer.[start, end] contains consumer.[start, end] if not (producer.start <= consumer.start and consumer.end <= producer.end): print(f"Error: Producer '{producer.class_name}' [{producer.start}, {producer.end}] " f"has lifespan shorter than consumer '{consumer.class_name}' [{consumer.start}, {consumer.end}] " f"for node '{output_node}'", file=sys.stderr) print(f" Multi-output effects cannot auto-passthrough and must span consumer lifespans", file=sys.stderr) sys.exit(1) def topological_sort(seq: SequenceDecl) -> List[EffectDecl]: """Sort effects in execution order using Kahn's algorithm.""" # Build dependency graph in_degree = {} for effect in seq.effects: in_degree[id(effect)] = 0 # Count dependencies node_producers = {} # node -> effect that produces it for effect in seq.effects: for output in effect.outputs: node_producers[output] = effect # Calculate in-degrees for effect in seq.effects: for input_node in effect.inputs: if input_node == 'source': continue if input_node in node_producers: in_degree[id(effect)] += 1 # Find effects with no dependencies queue = [eff for eff in seq.effects if in_degree[id(eff)] == 0] sorted_effects = [] while queue: current = queue.pop(0) sorted_effects.append(current) # Mark outputs as available, decrement downstream dependencies for output in current.outputs: for other in seq.effects: if output in other.inputs and id(other) != id(current): in_degree[id(other)] -= 1 if in_degree[id(other)] == 0: queue.append(other) if len(sorted_effects) != len(seq.effects): print(f"Error: DAG has unreachable effects (disconnected components)", file=sys.stderr) sys.exit(1) # Assign execution order for idx, effect in enumerate(sorted_effects): effect.execution_order = idx return sorted_effects def analyze_lifetimes(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, Tuple[int, int]]: """Analyze node lifetimes: (first_use, last_use) execution order indices.""" lifetimes = {} for effect in sorted_effects: order = effect.execution_order for node in effect.inputs: if node == 'source': continue if node not in lifetimes: lifetimes[node] = (order, order) else: lifetimes[node] = (lifetimes[node][0], order) for node in effect.outputs: if node == 'sink': continue if node not in lifetimes: lifetimes[node] = (order, order) else: lifetimes[node] = (min(lifetimes[node][0], order), max(lifetimes[node][1], order)) return lifetimes def detect_ping_pong(seq: SequenceDecl, sorted_effects: List[EffectDecl]) -> Dict[str, str]: """Detect ping-pong patterns and return alias map. Pattern: Effect i writes A, reads B; Effect i+1 writes B, reads A Optimization: Alias B -> A (reuse same texture) """ aliases = {} used_nodes = set() # Look for adjacent alternating read/write patterns for i in range(len(sorted_effects) - 1): eff1 = sorted_effects[i] eff2 = sorted_effects[i + 1] # Find nodes that alternate for out1 in eff1.outputs: if out1 in ['source', 'sink'] or out1 in used_nodes: continue for in1 in eff1.inputs: if in1 in ['source', 'sink'] or in1 in used_nodes: continue # Check if eff2 writes in1 and reads out1 (alternating) if in1 in eff2.outputs and out1 in eff2.inputs: # Classic ping-pong: eff1 (reads in1, writes out1), eff2 (reads out1, writes in1) # Check no other effects use these nodes other_uses = False for j, eff in enumerate(sorted_effects): if j == i or j == i + 1: continue if out1 in eff.inputs + eff.outputs or in1 in eff.inputs + eff.outputs: other_uses = True break if not other_uses: # Alias in1 -> out1 (in1 uses same texture as out1) aliases[in1] = out1 used_nodes.add(out1) used_nodes.add(in1) break return aliases def generate_cpp(seq: SequenceDecl, sorted_effects: List[EffectDecl], aliases: Dict[str, str], seq_index: int, flatten: bool = False) -> str: """Generate C++ Sequence subclass.""" class_name = seq.name.replace(' ', '_').replace('-', '_') if not class_name[0].isalpha(): class_name = 'Seq_' + class_name class_name += f'_{seq_index}_Sequence' # Generate includes includes = set() for effect in seq.effects: # Convert ClassName to snake_case header header = re.sub('([A-Z])', r'_\1', effect.class_name).lower().lstrip('_') if header.endswith('_effect'): header = header[:-7] # Remove _effect suffix includes.add(f'#include "effects/{header}_effect.h"') cpp = f'''// Generated by seq_compiler.py // Sequence: {seq.name} #include "gpu/sequence.h" #include "gpu/effect.h" ''' for inc in sorted(includes): cpp += inc + '\n' cpp += f''' class {class_name} : public Sequence {{ public: {class_name}(const GpuContext& ctx, int width, int height) : Sequence(ctx, width, height) {{ ''' # Node declarations cpp += ' // Node declarations\n' for node_name, node_decl in sorted(seq.nodes.items()): if node_name in aliases: # Aliased node cpp += f' nodes_.declare_aliased_node("{node_name}", "{aliases[node_name]}");\n' else: node_type = NODE_TYPES[node_decl.type] cpp += f' nodes_.declare_node("{node_name}", {node_type}, width_, height_);\n' cpp += '\n // Effect DAG construction\n' # Effect instantiation for effect in sorted_effects: inputs_str = ', '.join(f'"{inp}"' for inp in effect.inputs) outputs_str = ', '.join(f'"{out}"' for out in effect.outputs) cpp += f''' effect_dag_.push_back({{ .effect = std::make_shared<{effect.class_name}>(ctx, std::vector{{{inputs_str}}}, std::vector{{{outputs_str}}}, {effect.start}f, {effect.end}f), .input_nodes = {{{inputs_str}}}, .output_nodes = {{{outputs_str}}}, .execution_order = {effect.execution_order} }}); ''' cpp += ''' init_effect_nodes(); } }; ''' return cpp def main(): parser = argparse.ArgumentParser(description='Sequence compiler with DAG optimization') parser.add_argument('input', help='Input .seq file') parser.add_argument('--output', '-o', help='Output .cc file') parser.add_argument('--flatten', action='store_true', help='Generate flattened code (FINAL_STRIP mode)') parser.add_argument('--validate', action='store_true', help='Validate DAG only (no code generation)') args = parser.parse_args() # Parse timeline sequences = parse_timeline(args.input) if not sequences: print("Error: No sequences found in input file", file=sys.stderr) sys.exit(1) # Sort sequences by start time sequences.sort(key=lambda s: s.start_time) # Validate all sequences (always) for seq in sequences: validate_dag(seq) # If --validate flag, exit after validation if args.validate: print(f"Validation passed: {len(sequences)} sequence(s) validated") sys.exit(0) # Require --output for code generation if not args.output: print("Error: --output is required for code generation", file=sys.stderr) sys.exit(1) # Calculate demo duration from max effect end time (absolute time) demo_duration = 0.0 for seq in sequences: for effect in seq.effects: # Effect times are relative to sequence start demo_duration = max(demo_duration, seq.start_time + effect.end) # Process each sequence all_cpp = '''// Generated by seq_compiler.py // DO NOT EDIT #include "gpu/sequence.h" #include "gpu/effect.h" ''' for seq_idx, seq in enumerate(sequences): # validate_dag() already called above # Topological sort sorted_effects = topological_sort(seq) # Lifetime analysis lifetimes = analyze_lifetimes(seq, sorted_effects) # Ping-pong detection aliases = detect_ping_pong(seq, sorted_effects) # Generate C++ cpp = generate_cpp(seq, sorted_effects, aliases, seq_idx, args.flatten) all_cpp += cpp + '\n' # Generate sequence registry and accessors all_cpp += ''' // Sequence Registry #include #include struct SequenceEntry { float start_time; int priority; std::unique_ptr sequence; }; static std::vector g_sequences; static bool g_initialized = false; void InitializeSequences(const GpuContext& ctx, int width, int height) { if (g_initialized) return; g_initialized = true; ''' # Instantiate each sequence for seq_idx, seq in enumerate(sequences): seq_name = seq.name.replace(' ', '_').replace('-', '_') if not seq_name[0].isalpha(): seq_name = 'Seq_' + seq_name class_name = f"{seq_name}_{seq_idx}_Sequence" all_cpp += f' g_sequences.push_back({{{seq.start_time}f, {seq.priority}, std::make_unique<{class_name}>(ctx, width, height)}});\n' all_cpp += ''' } Sequence* GetActiveSequence(float time) { // Find active sequence (latest start_time <= current time) Sequence* active = nullptr; for (auto& entry : g_sequences) { if (entry.start_time <= time) { active = entry.sequence.get(); } } return active; } void RenderTimeline(WGPUCommandEncoder encoder, float time, int width, int height, float beat_time, float audio_intensity) { Sequence* seq = GetActiveSequence(time); if (seq) { float beat_phase = fmodf(beat_time, 1.0f); seq->preprocess(time, beat_time, beat_phase, audio_intensity); seq->render_effects(encoder); } } float GetDemoDuration() { return ''' + f'{demo_duration:.1f}f' + '''; } // Surface-based rendering with framebuffers #include "gpu/post_process_helper.h" #include "effects/shaders.h" static WGPUTexture g_source_texture = nullptr; static WGPUTextureView g_source_view = nullptr; static WGPUTexture g_sink_texture = nullptr; static WGPUTextureView g_sink_view = nullptr; static int g_fb_width = 0; static int g_fb_height = 0; static UniformBuffer g_blit_uniforms; static void ensure_framebuffers(WGPUDevice device, int width, int height) { if (g_source_texture && g_fb_width == width && g_fb_height == height) { return; } // Release old if (g_source_view) wgpuTextureViewRelease(g_source_view); if (g_source_texture) wgpuTextureRelease(g_source_texture); if (g_sink_view) wgpuTextureViewRelease(g_sink_view); if (g_sink_texture) wgpuTextureRelease(g_sink_texture); // Create new WGPUTextureDescriptor tex_desc = {}; tex_desc.size = {(uint32_t)width, (uint32_t)height, 1}; tex_desc.format = WGPUTextureFormat_RGBA8Unorm; tex_desc.usage = WGPUTextureUsage_RenderAttachment | WGPUTextureUsage_TextureBinding; tex_desc.dimension = WGPUTextureDimension_2D; tex_desc.mipLevelCount = 1; tex_desc.sampleCount = 1; g_source_texture = wgpuDeviceCreateTexture(device, &tex_desc); g_source_view = wgpuTextureCreateView(g_source_texture, nullptr); g_sink_texture = wgpuDeviceCreateTexture(device, &tex_desc); g_sink_view = wgpuTextureCreateView(g_sink_texture, nullptr); g_fb_width = width; g_fb_height = height; } void RenderTimeline(WGPUSurface surface, float time, int width, int height, float beat_time, float audio_intensity) { Sequence* seq = GetActiveSequence(time); if (!seq) return; const GpuContext* ctx = gpu_get_context(); ensure_framebuffers(ctx->device, width, height); // Initialize blit uniforms buffer if needed if (!g_blit_uniforms.get().buffer) { g_blit_uniforms.init(ctx->device); } // Bind source/sink views to sequence seq->set_source_view(g_source_view); seq->set_sink_view(g_sink_view); // Update uniforms via preprocess float beat_phase = fmodf(beat_time, 1.0f); seq->preprocess(time, beat_time, beat_phase, audio_intensity); WGPUCommandEncoder encoder = wgpuDeviceCreateCommandEncoder(ctx->device, nullptr); // Clear source WGPURenderPassColorAttachment clear_attach = {}; clear_attach.view = g_source_view; #if !defined(DEMO_CROSS_COMPILE_WIN32) clear_attach.depthSlice = WGPU_DEPTH_SLICE_UNDEFINED; #endif clear_attach.loadOp = WGPULoadOp_Clear; clear_attach.storeOp = WGPUStoreOp_Store; clear_attach.clearValue = {0.0, 0.0, 0.0, 1.0}; WGPURenderPassDescriptor clear_desc = {}; clear_desc.colorAttachmentCount = 1; clear_desc.colorAttachments = &clear_attach; WGPURenderPassEncoder clear_pass = wgpuCommandEncoderBeginRenderPass(encoder, &clear_desc); wgpuRenderPassEncoderEnd(clear_pass); wgpuRenderPassEncoderRelease(clear_pass); // Render effects seq->render_effects(encoder); // Blit sink to surface WGPUSurfaceTexture surface_texture; wgpuSurfaceGetCurrentTexture(surface, &surface_texture); if (surface_texture.status == WGPUSurfaceGetCurrentTextureStatus_SuccessOptimal) { WGPURenderPassColorAttachment blit_attach = {}; blit_attach.view = surface_texture.texture ? wgpuTextureCreateView(surface_texture.texture, nullptr) : nullptr; #if !defined(DEMO_CROSS_COMPILE_WIN32) blit_attach.depthSlice = WGPU_DEPTH_SLICE_UNDEFINED; #endif blit_attach.loadOp = WGPULoadOp_Clear; blit_attach.storeOp = WGPUStoreOp_Store; blit_attach.clearValue = {0.0, 0.0, 0.0, 1.0}; WGPURenderPassDescriptor blit_desc = {}; blit_desc.colorAttachmentCount = 1; blit_desc.colorAttachments = &blit_attach; static WGPURenderPipeline blit_pipeline = nullptr; static WGPUBindGroup blit_bind_group = nullptr; if (!blit_pipeline) { blit_pipeline = create_post_process_pipeline(ctx->device, ctx->format, passthrough_shader_wgsl); } // Update blit uniforms UniformsSequenceParams blit_params = {}; blit_params.resolution = {(float)width, (float)height}; blit_params.aspect_ratio = (float)width / (float)height; blit_params.time = time; blit_params.beat_time = beat_time; blit_params.beat_phase = 0.0f; blit_params.audio_intensity = audio_intensity; g_blit_uniforms.update(ctx->queue, blit_params); pp_update_bind_group(ctx->device, blit_pipeline, &blit_bind_group, g_sink_view, g_blit_uniforms.get(), {nullptr, 0}); WGPURenderPassEncoder blit_pass = wgpuCommandEncoderBeginRenderPass(encoder, &blit_desc); wgpuRenderPassEncoderSetPipeline(blit_pass, blit_pipeline); wgpuRenderPassEncoderSetBindGroup(blit_pass, 0, blit_bind_group, 0, nullptr); wgpuRenderPassEncoderDraw(blit_pass, 3, 1, 0, 0); wgpuRenderPassEncoderEnd(blit_pass); wgpuRenderPassEncoderRelease(blit_pass); if (blit_attach.view) wgpuTextureViewRelease(blit_attach.view); } WGPUCommandBuffer commands = wgpuCommandEncoderFinish(encoder, nullptr); wgpuQueueSubmit(ctx->queue, 1, &commands); wgpuCommandBufferRelease(commands); wgpuCommandEncoderRelease(encoder); wgpuSurfacePresent(surface); if (surface_texture.texture) { wgpuTextureRelease(surface_texture.texture); } } ''' # Write output with open(args.output, 'w') as f: f.write(all_cpp) print(f"Generated {len(sequences)} sequence(s) -> {args.output}") if __name__ == '__main__': main()