MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
InputManager.cpp
Go to the documentation of this file.
1#include "InputManager.hpp"
2
3#include "OscParser.hpp"
4
8
11
12namespace MayaFlux::Core {
13
15#ifdef MAYAFLUX_PLATFORM_MACOS
16 : m_registrations(new RegistrationList())
17#else
18 : m_registrations(std::make_shared<const RegistrationList>())
19#endif
20{
21#ifdef MAYAFLUX_PLATFORM_MACOS
22 for (auto& hp : m_hazard_ptrs) {
23 hp.store(nullptr);
24 }
25#endif
26
28 "InputManager created");
29}
30
32{
33 if (m_running.load()) {
34 stop();
35 }
36
37#ifdef MAYAFLUX_PLATFORM_MACOS
38 delete m_registrations.load();
39#endif
40}
41
42// ─────────────────────────────────────────────────────────────────────────────
43// Lifecycle
44// ─────────────────────────────────────────────────────────────────────────────
45
47{
48 if (m_running.load()) {
50 "InputManager already running");
51 return;
52 }
53
54 if (!m_input_service) {
57
58 if (!m_input_service) {
60 "InputManager requires InputService but service not registered");
61 return;
62 }
63 }
64
65 m_stop_requested.store(false);
66 m_running.store(true);
68
70 "InputManager started");
71}
72
74{
75 if (!m_running.load()) {
76 return;
77 }
78
79 m_stop_requested.store(true);
80
81 m_queue_notify.store(true);
82 m_queue_notify.notify_one();
83
84 if (m_processing_thread.joinable()) {
86 }
87
88 m_running.store(false);
89
91 "InputManager stopped (processed {} events)", m_events_processed.load());
92}
93
94// ─────────────────────────────────────────────────────────────────────────────
95// Input Enqueueing
96// ─────────────────────────────────────────────────────────────────────────────
97
99{
100
101 if (!m_queue.push(value)) {
103 "Input queue full, dropping oldest event");
104 }
105
106 m_queue_notify.store(true);
107 m_queue_notify.notify_one();
108}
109
110void InputManager::enqueue_batch(const std::vector<InputValue>& values)
111{
112 if (values.empty())
113 return;
114
115 bool any_pushed = false;
116 for (const auto& value : values) {
117 if (m_queue.push(value)) {
118 any_pushed = true;
119 } else {
121 "Input queue full during batch, dropping oldest events");
122 }
123 }
124
125 if (any_pushed) {
126 m_queue_notify.store(true);
127 m_queue_notify.notify_one();
128 }
129}
130
131// ─────────────────────────────────────────────────────────────────────────────
132// Node Registration
133// ─────────────────────────────────────────────────────────────────────────────
134
136 const std::shared_ptr<Nodes::Input::InputNode>& node,
137 InputBinding binding)
138{
139 if (!node)
140 return;
141
142 if (binding.hid_vendor_id || binding.hid_product_id) {
143 if (!m_input_service) {
145 "VID/PID binding requires InputService but service not registered");
146 return;
147 }
148
149 auto devices = m_input_service->get_all_devices();
150 auto resolved = resolve_vid_pid(binding, devices);
151 if (resolved) {
152 binding = *resolved;
153 } else {
155 "No device found for VID/PID");
156 return;
157 }
158 }
159
160 if (binding.device_id != 0) {
161 if (!m_input_service) {
163 "Device ID binding requires InputService but service not registered");
164 return;
165 }
166
167 m_input_service->open_device(binding.backend, binding.device_id);
168 }
169
170 {
171 std::lock_guard lock(m_registry_mutex);
172#ifdef MAYAFLUX_PLATFORM_MACOS
173 auto* old_list = m_registrations.load();
174 auto* new_list = new RegistrationList(*old_list);
175 new_list->push_back({ .node = node, .binding = binding });
176 m_registrations.store(new_list);
177 retire_list(old_list);
178#else
179 auto current_list = m_registrations.load();
180 auto new_list = std::make_shared<RegistrationList>(*current_list);
181 new_list->push_back({ .node = node, .binding = binding });
182 m_registrations.store(new_list);
183#endif
184 m_tracked_nodes.push_back(node);
185 }
186
188 "Registered InputNode for backend {} device {}",
189 static_cast<int>(binding.backend), binding.device_id);
190}
191
192std::optional<InputBinding> InputManager::resolve_vid_pid(
193 const InputBinding& binding,
194 const std::vector<InputDeviceInfo>& devices) const
195{
196 for (const auto& dev : devices) {
197 if (dev.backend_type != binding.backend)
198 continue;
199
200 bool vid_match = !binding.hid_vendor_id || (*binding.hid_vendor_id == dev.vendor_id);
201 bool pid_match = !binding.hid_product_id || (*binding.hid_product_id == dev.product_id);
202
203 if (vid_match && pid_match) {
204 InputBinding resolved = binding;
205 resolved.device_id = dev.id;
206 resolved.hid_vendor_id.reset();
207 resolved.hid_product_id.reset();
208 return resolved;
209 }
210 }
211
212 return std::nullopt;
213}
214
215void InputManager::unregister_node(const std::shared_ptr<Nodes::Input::InputNode>& node)
216{
217 if (!node)
218 return;
219
220 {
221 std::lock_guard lock(m_registry_mutex);
222
223#ifdef MAYAFLUX_PLATFORM_MACOS
224 auto* old_list = m_registrations.load();
225 auto* new_list = new RegistrationList(*old_list);
226
227 new_list->erase(
228 std::remove_if(new_list->begin(), new_list->end(),
229 [&node](const NodeRegistration& reg) {
230 auto locked = reg.node.lock();
231 return !locked || locked == node;
232 }),
233 new_list->end());
234
235 m_registrations.store(new_list);
236 retire_list(old_list);
237#else
238 auto current_list = m_registrations.load();
239 auto new_list = std::make_shared<RegistrationList>(*current_list);
240
241 new_list->erase(
242 std::remove_if(new_list->begin(), new_list->end(),
243 [&node](const NodeRegistration& reg) {
244 auto locked = reg.node.lock();
245 return !locked || locked == node;
246 }),
247 new_list->end());
248
249 m_registrations.store(new_list);
250#endif
251 m_tracked_nodes.erase(
252 std::remove(m_tracked_nodes.begin(), m_tracked_nodes.end(), node),
253 m_tracked_nodes.end());
254 }
255
257 "Unregistered InputNode");
258}
259
261{
262 std::lock_guard lock(m_registry_mutex);
263
264#ifdef MAYAFLUX_PLATFORM_MACOS
265 auto* old_list = m_registrations.load();
267 retire_list(old_list);
268#else
269 m_registrations.store(std::make_shared<const RegistrationList>());
270#endif
271
272 m_tracked_nodes.clear();
273
275 "Unregistered all InputNodes (Registry swapped to empty)");
276}
277
279{
280#ifdef MAYAFLUX_PLATFORM_MACOS
281 // Brief hazard pointer acquisition for count
282 size_t slot = m_hazard_counter.fetch_add(1) % MAX_READERS;
283 const RegistrationList* current;
284 do {
285 current = m_registrations.load();
286 m_hazard_ptrs[slot].store(current);
287 } while (current != m_registrations.load());
288
289 size_t count = current->size();
290 m_hazard_ptrs[slot].store(nullptr);
291 return count;
292#else
293 return m_registrations.load()->size();
294#endif
295}
296
298{
299 return m_queue.snapshot().size();
300}
301
302// ─────────────────────────────────────────────────────────────────────────────
303// Processing Thread
304// ─────────────────────────────────────────────────────────────────────────────
305
307{
309 "Processing thread started");
310
311 while (true) {
312 InputValue value;
313
314 while (auto value = m_queue.pop()) {
315 dispatch_to_nodes(*value);
316 m_events_processed.fetch_add(1);
317 }
318
319 if (m_stop_requested.load()) {
320 break;
321 }
322
323 m_queue_notify.wait(false);
324 m_queue_notify.store(false);
325 }
326
328 "Processing thread exiting");
329}
330
332{
333#ifdef MAYAFLUX_PLATFORM_MACOS
334 // Acquire hazard pointer slot
335 size_t slot = m_hazard_counter.fetch_add(1) % MAX_READERS;
336
337 const RegistrationList* current_regs;
338 do {
339 current_regs = m_registrations.load();
340 m_hazard_ptrs[slot].store(current_regs);
341 } while (current_regs != m_registrations.load());
342
343 // Safe to use current_regs now
344 for (const auto& reg : *current_regs) {
345 auto node = reg.node.lock();
346 if (!node)
347 continue;
348
349 if (matches_binding(value, reg.binding)) {
350 node->process_input(value);
351 }
352 }
353
354 // Release hazard pointer
355 m_hazard_ptrs[slot].store(nullptr);
356#else
357 auto current_regs = m_registrations.load();
358
359 for (const auto& reg : *current_regs) {
360 auto node = reg.node.lock();
361 if (!node)
362 continue;
363
364 if (matches_binding(value, reg.binding)) {
365 node->process_input(value);
366 }
367 }
368#endif
369}
370
371bool InputManager::matches_binding(const InputValue& value, const InputBinding& binding) const
372{
373 if (binding.backend != value.source_type) {
374 return false;
375 }
376
377 if (binding.device_id != 0 && binding.device_id != value.device_id) {
378 return false;
379 }
380
381 switch (binding.backend) {
382 case InputType::MIDI:
383 if (value.type == InputValue::Type::MIDI) {
384 const auto& midi = value.as_midi();
385
386 if (binding.midi_channel && *binding.midi_channel != midi.channel()) {
387 return false;
388 }
389
390 if (binding.midi_message_type && *binding.midi_message_type != midi.type()) {
391 return false;
392 }
393 if (binding.midi_cc_number && midi.type() == 0xB0) {
394 if (*binding.midi_cc_number != midi.data1) {
395 return false;
396 }
397 }
398 }
399 break;
400
401 case InputType::OSC:
402 if (value.type == InputValue::Type::OSC && binding.osc_address_pattern) {
403 const auto& osc = value.as_osc();
404 if (!osc.address.starts_with(*binding.osc_address_pattern)) {
405 return false;
406 }
407 }
408 break;
409
410 default:
411 // HID, Serial: no additional filters beyond device_id
412 break;
413 }
414
415 return true;
416}
417
418#ifdef MAYAFLUX_PLATFORM_MACOS
419void InputManager::retire_list(const RegistrationList* list)
420{
421 // Check if any reader is using this list
422 for (const auto& hp : m_hazard_ptrs) {
423 if (hp.load() == list) {
424 // Still in use - in production you'd add to retirement queue
425 // For now, leak it (registrations are infrequent)
426 // TODO: Implement proper deferred reclamation queue
427 return;
428 }
429 }
430 delete list;
431}
432#endif
433
435{
436 if (!osc_config.enabled) {
437 return;
438 }
439
440 if (!m_network_service) {
443 }
444
445 if (!m_network_service) {
447 "OSC bridge requested but NetworkService not available");
448 return;
449 }
450
451 EndpointInfo ep;
454 ep.local_port = osc_config.receive_port;
455 ep.remote_address = osc_config.send_address;
456 ep.remote_port = osc_config.send_port;
457 ep.label = "osc_input";
458
460
461 if (m_osc_endpoint_id == 0) {
463 "Failed to open UDP endpoint for OSC on port {}",
464 osc_config.receive_port);
465 return;
466 }
467
469 [this](uint64_t, const uint8_t* data, size_t size, std::string_view) {
470 auto parsed = OscParser::parse(data, size);
471 if (parsed) {
472 enqueue(*parsed);
473 }
474 });
475
477 "OSC bridge active on port {}", osc_config.receive_port);
478}
479
497
498} // namespace MayaFlux::Core
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Eigen::Index count
Range size
void register_node(const std::shared_ptr< Nodes::Input::InputNode > &node, InputBinding binding)
Register a node to receive input.
size_t get_queue_depth() const
Get current queue depth.
size_t get_registered_node_count() const
Get count of registered nodes.
void setup_osc_bridge(const OSCConfigInfo &osc_config)
Setup OSC bridge if enabled in config.
std::vector< NodeRegistration > RegistrationList
std::atomic< std::shared_ptr< const RegistrationList > > m_registrations
std::atomic< bool > m_stop_requested
Registry::Service::NetworkService * m_network_service
std::atomic< bool > m_queue_notify
Registry::Service::InputService * m_input_service
std::optional< InputBinding > resolve_vid_pid(const InputBinding &binding, const std::vector< InputDeviceInfo > &devices) const
void start()
Start the processing thread.
void unregister_node(const std::shared_ptr< Nodes::Input::InputNode > &node)
Unregister a node.
void enqueue(const InputValue &value)
Enqueue an input value for processing.
Memory::LockFreeQueue< InputValue, MAX_QUEUE_SIZE > m_queue
void dispatch_to_nodes(const InputValue &value)
void stop()
Stop the processing thread.
std::atomic< bool > m_running
std::atomic< uint64_t > m_events_processed
void enqueue_batch(const std::vector< InputValue > &values)
Enqueue multiple input values.
std::vector< std::shared_ptr< Nodes::Input::InputNode > > m_tracked_nodes
To keep nodes alive.
void unregister_all_nodes()
Unregister all nodes.
bool matches_binding(const InputValue &value, const InputBinding &binding) const
static std::optional< InputValue > parse(const uint8_t *data, size_t size, uint32_t device_id=0)
Parse a single OSC message from raw bytes.
Definition OscParser.cpp:7
Interface * get_service()
Query for a backend service.
static BackendRegistry & instance()
Get the global registry instance.
@ OSC
Open Sound Control (network)
@ MIDI
MIDI controllers and instruments.
@ InputManagement
Input management (Core::InputManager)
@ Init
Engine/subsystem initialization.
@ AsyncIO
Async I/O operations ( network, streaming)
@ Core
Core engine, backend, subsystems.
Describes one logical send/receive endpoint managed by a backend.
std::optional< uint8_t > midi_cc_number
Match specific CC number.
InputType backend
Which backend type.
std::optional< std::string > osc_address_pattern
Match OSC address prefix.
std::optional< uint16_t > hid_vendor_id
Match HID vendor ID.
std::optional< uint8_t > midi_channel
Match specific MIDI channel (1-16)
std::optional< uint8_t > midi_message_type
Match message type (0xB0=CC, 0x90=NoteOn, etc.)
std::optional< uint16_t > hid_product_id
Match HID product ID.
uint32_t device_id
Specific device (0 = any device)
Specifies what input an InputNode wants to receive.
uint32_t device_id
Source device identifier.
const OSCMessage & as_osc() const
@ OSC
Structured OSC message.
@ MIDI
Structured MIDI message.
InputType source_type
Backend that generated this value.
const MIDIMessage & as_midi() const
Generic input value container.
uint16_t receive_port
UDP port to listen on.
std::string send_address
Default send address.
bool enabled
Enable OSC backend.
uint16_t send_port
Default UDP port to send to.
OSC backend configuration.
std::function< std::vector< Core::InputDeviceInfo >()> get_all_devices
Query all available input devices across all backends.
std::function< bool(Core::InputType, uint32_t)> open_device
Open a specific input device.
Backend input device service interface.
std::function< uint64_t(const Core::EndpointInfo &info)> open_endpoint
Open a network endpoint on the appropriate backend.
std::function< void(uint64_t endpoint_id, std::function< void(uint64_t, const uint8_t *, size_t, std::string_view)> callback)> set_endpoint_receive_callback
Register a per-endpoint receive callback.
std::function< void(uint64_t endpoint_id)> close_endpoint
Close an endpoint.
Backend network transport service interface.