Skip to content

Devices API

ADB Device

pymordialblue.devices.adb_device

Controller for ADB interactions.

PymordialAdbDevice

Bases: PymordialBridgeDevice

Handles Android device communication using adb-shell.

Configuration is loaded from default settings but can be overridden per instance by modifying the instance attributes after initialization.

Attributes:

Name Type Description
name str

The plugin name ("adb").

version str

The plugin version.

host str

The address of the device; may be an IP address or a host name.

port int

The device port to which we are connecting

device AdbDeviceTcp

The adb_shell.AdbDeviceTcp instance.

config AdbConfig

The configuration dictionary.

Source code in src/pymordialblue/devices/adb_device.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
class PymordialAdbDevice(PymordialBridgeDevice):
    """Handles Android device communication using adb-shell.

    Configuration is loaded from default settings but can be overridden
    per instance by modifying the instance attributes after initialization.

    Attributes:
        name (str): The plugin name ("adb").
        version (str): The plugin version.
        host (str): The address of the device; may be an IP address or a host name.
        port (int): The device port to which we are connecting
        device (AdbDeviceTcp): The adb_shell.AdbDeviceTcp instance.
        config (AdbConfig): The configuration dictionary.
    """

    name: str = "adb"
    version: str = "0.1.0"

    # DEFAULT_CONFIG will be fetched in __init__ if not provided
    # to allow easier testing and dynamic configuration updates.

    def __init__(
        self,
        host: str | None = None,
        port: int | None = None,
        config: AdbConfig | None = None,
        adbshell_log_level: int = WARNING,
    ):
        """Initalizes PymordialAdbDevice.

        Args:
            host: The address of the device; may be an IP address or a host name.
            port: The device port to which we are connecting.
            config: A TypedDict containing ADB configuration. Defaults to package defaults.
            adbshell_log_level: The log level for adb-shell (e.g. logging.WARNING).
        """
        self.logger = getLogger("PymordialAdbDevice")
        basicConfig(
            level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        self.logger.debug("Initalizing PymordialAdbDevice...")
        self.config = copy.deepcopy(config or get_config()["adb"])
        self.host: str = host or self.config["default_host"]
        self.port: int = port or self.config["default_port"]

        getLogger("adb_shell").setLevel(
            adbshell_log_level
        )  # Silence adb_shell debug logs

        self._device: AdbDeviceTcp | None = None

        # Streaming attributes
        self._stream_thread: threading.Thread | None = None
        self._latest_frame: np.ndarray | None = None
        self._is_streaming = threading.Event()
        self.logger.debug("PymordialAdbDevice initalized.")

    def initialize(self, config: "PymordialBlueConfig") -> None:
        """Initializes the ADB device plugin.

        Args:
            config: Global Pymordial configuration dictionary.
        """
        # In a full comprehensive implementation, we might reload config here.
        # For now, we rely on __init__ logic or manual property updates.
        pass

    def shutdown(self) -> None:
        """Disconnects and cleans up resources."""
        self.disconnect()

    @property
    def is_streaming(self) -> bool:
        """Check if streaming is currently active.

        Returns:
            True if streaming is active, False otherwise.
        """
        return self._is_streaming.is_set()

    def find_package_by_keyword(self, keyword) -> str | None:
        self.logger.debug("Finding package by keyword...")
        # 'pm list packages' returns 'package:com.name.app'
        # We strip 'package:' and filter for your keyword
        output = self.run_command("pm list packages", decode=True)
        if output:
            packages = [
                line.replace("package:", "").strip() for line in output.splitlines()
            ]
            # 1. Try exact match
            if keyword in packages:
                self.logger.debug(f"Exact package match found: {keyword}")
                return keyword
            # 2. Try case-insensitive partial match
            matches = [pkg for pkg in packages if keyword.lower() in pkg.lower()]
            if matches:
                # Return the shortest match (best fit for keyword)
                best_match = min(matches, key=len)
                self.logger.debug(
                    f"Partial package match found: {best_match} (best of {len(matches)} matches)"
                )
                return best_match
        self.logger.debug(f"No package found matching '{keyword}'")
        return None

    def get_launch_activity(self, package_name) -> str | None:
        self.logger.debug("Getting launch activity...")
        # This command queries the system for the EXACT entry point
        cmd = f"cmd package resolve-activity --brief {package_name}"
        # MUST decode to string to avoid b'...' in f-string later
        output = self.run_command(cmd, decode=True)

        if not output:
            return None

        # Standard output is usually the package/activity on the last line
        lines = output.strip().splitlines()
        if lines:
            activity = lines[-1].strip()
            if "/" not in activity or "No activity found" in activity:
                self.logger.debug(f"No valid activity found for {package_name}")
                return None

            self.logger.debug(f"Launch activity found: {activity}")
            return activity

        self.logger.debug("Launch activity not found.")
        return None

    def connect(self) -> bool:
        self.logger.debug(f"Connecting to ADB at {self.host}:{self.port}...")

        if self._device is None:
            self.logger.debug(
                "PymordialAdbDevice device not initialized. Attempting to initialize..."
            )
            self._device = self._create_adb_device()

        if self._device.available:
            self.logger.debug("PymordialAdbDevice device already connected.")
            return True

        self.logger.debug(
            "PymordialAdbDevice device not connected. Attempting to connect..."
        )
        try:
            self._device.connect(rsa_keys=self._get_adb_signer())
            self.logger.debug("PymordialAdbDevice device connection successful.")
            return True
        except Exception as e:
            self.logger.warning(f"Error connecting to PymordialAdbDevice device: {e}")
            self._device = None
            return False

    def is_connected(self) -> bool:
        """Checks if PymordialAdbDevice device is connected.

        Returns:
            True if the device is connected, False otherwise.
        """
        self.logger.debug("Checking if PymordialAdbDevice device is connected...")
        if self._device is None:
            self.logger.debug(
                "PymordialAdbDevice device not initialized. Use connect() method to initialize."
            )
            return False
        if not self._device.available:
            self.logger.debug("PymordialAdbDevice device not connected.")
            return False
        self.logger.debug("PymordialAdbDevice device connected.")
        return True

    def disconnect(self) -> bool:
        """Disconnects the PymordialAdbDevice device.

        Returns:
            True if disconnected (or already disconnected), False on error.
        """
        self.logger.debug("Disconnecting from PymordialAdbDevice device...")
        self.stop_stream()  # Stop streaming if active

        if self._device is None or not self._device.available:
            return True

        try:
            self._device.close()
            if not self._device.available:
                self.logger.debug("Disconnected from PymordialAdbDevice device.")
                return True
            self.logger.debug("Failed to disconnect from PymordialAdbDevice device.")
            return False
        except Exception as e:
            self.logger.error(
                f"Error disconnecting from PymordialAdbDevice device: {e}"
            )
            return False

    def run_command(self, command: str, decode: bool = False) -> bytes | None:
        """Executes a shell command and returns the output.

        Args:
            command: The command to execute.

        Returns:
            The command output as bytes, or None if not connected.
        """
        self.logger.debug(f"Executing shell command: {command}...")
        try:
            output = self._device.shell(
                command,
                timeout_s=self.config["commands"]["timeout"],
                read_timeout_s=self.config["commands"]["read_timeout"],
                transport_timeout_s=self.config["commands"]["transport_timeout"],
                decode=decode,
            )
        except Exception as e:
            self.logger.error(f"Failed to execute shell command {command}: {e}")
            return None
        if output and decode and isinstance(output, str):
            output = output.strip()
        self.logger.debug(f"Shell command {command} executed successfully.")
        return output

    def get_focused_app(self) -> dict[str, str] | None:
        """Gets information about the currently focused app.

        Returns:
            A dictionary with 'package' and 'activity', or None if failed.
        """
        self.logger.debug("Getting focused app info...")
        # 'dumpsys window windows' is more reliable than just 'dumpsys window' on some Android versions
        # Look for mCurrentFocus or mFocusedApp lines
        output = self.run_command(
            "dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'", decode=True
        )
        if not output:
            return None

        # Format: mCurrentFocus=Window{... u0 com.android.settings/com.android.settings.Settings}
        # Format: mFocusedApp=AppWindowToken{... token=Token{... u0 com.example/com.example.MainActivity}}
        import re

        match = re.search(r"([a-zA-Z0-9._]+)/([a-zA-Z0-9._$]+)", output)
        if match:
            pkg, activity = match.groups()
            self.logger.debug(f"Focused app: package={pkg}, activity={activity}")
            return {"package": pkg, "activity": activity}

        self.logger.debug("Could not parse focused app info from dumpsys")
        return None

    def open_app(
        self,
        app_name: str,
        package_name: str | None = None,
        timeout: float = 10.0,
        wait_time: float = 1.0,
    ) -> bool:
        """Opens an app by name or package, with optional verification.
        Args:
            app_name: Name/keyword to search for the package if package_name not provided.
            package_name: Optional exact package name.
            timeout: Max seconds to wait for app to start (if verify=True).
            wait_time: Seconds between verification retries.
        Returns:
            True if launched, False otherwise.
        """
        pkg = package_name or self.find_package_by_keyword(app_name)
        if not pkg:
            self.logger.error(f"Could not find package for '{app_name}'")
            return False
        # Try activity-based launch first (faster, more reliable)
        activity = self.get_launch_activity(pkg)
        if activity:
            self.run_command(f"am start -n {activity}")
            self.logger.debug(f"Launched {app_name} via Activity: {activity}")
        else:
            # Fallback to monkey
            self.logger.debug(
                f"Activity not found, using Monkey fallback for {app_name}"
            )
            self.run_command(f"monkey -p {pkg} -c android.intent.category.LAUNCHER 1")
            self.logger.debug(f"Launched {app_name} via Monkey")

        start_time = time()
        while time() - start_time < timeout:
            if self.is_app_running(pkg, max_retries=1, wait_time=0):
                self.logger.debug(f"Verified {app_name} is running")
                return True
            sleep(wait_time)
        self.logger.warning(f"{app_name} did not start within {timeout}s")
        return False

    # max_retries and wait_time default need
    # to be made into class variables
    def is_app_running(
        self,
        package_name: str | None = None,
        pymordial_app: "PymordialApp" | None = None,
        app_name: str | None = None,
        max_retries: int = 2,
        wait_time: int = 1,
    ) -> bool:
        """Checks if an app is running using pidof command.

        Args:
            package_name: Direct package name (e.g., 'com.android.settings').
            pymordial_app: PymordialApp instance to extract package from.
            app_name: App name keyword to search for.
            max_retries: Number of retries.
            wait_time: Time to wait between retries.

        Returns:
            True if running, False otherwise.

        Priority: package_name > pymordial_app > app_name.
        """
        # Handle case where PymordialApp might be passed positionally as package_name
        if package_name and not isinstance(package_name, str):
            if hasattr(package_name, "package_name"):
                pymordial_app = package_name
                package_name = None

        pkg = (
            package_name
            or (pymordial_app.package_name if pymordial_app else None)
            or (self.find_package_by_keyword(app_name) if app_name else None)
        )

        if not pkg:
            self.logger.warning(
                f"Could not resolve package for is_app_running (package_name={package_name}, "
                f"pymordial_app={pymordial_app}, app_name={app_name})"
            )
            return False

        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Attempting to reconnect..."
            )
            if not self.connect():
                raise ConnectionError(
                    "PymordialAdbDevice device not connected and reconnection failed."
                )
            self.logger.debug("PymordialAdbDevice device reconnected.")

        for attempt in range(max_retries):
            try:
                output = self.run_command(f"pidof {pkg}", decode=True)
                if output and output.strip():
                    self.logger.debug(f"Found {pkg} running with PID: {output.strip()}")
                    return True
            except Exception as e:
                self.logger.debug(f"pidof check failed: {e}")

            if attempt < max_retries - 1:
                self.logger.debug(
                    f"{pkg} not found. Retrying ({attempt + 1}/{max_retries})..."
                )
                sleep(wait_time)

        self.logger.debug(f"{pkg} not found after {max_retries} attempts")
        return False

    def show_recent_apps(self) -> bool:
        """Shows the recent apps drawer.

        Returns:
            True if successful, False otherwise.
        """
        self.logger.debug("Showing recent apps...")
        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Skipping 'show_recent_apps' method call."
            )
            return False
        self.run_command(f"input keyevent {self.config['keyevents']['app_switch']}")
        self.logger.debug("Recent apps drawer successfully opened")
        return True

    def close_app(
        self,
        package_name: str | None = None,
        app_name: str | None = None,
        timeout: float = 5.0,
        wait_time: float = 0.5,
    ) -> bool:
        """Closes an app and verifies it stopped.

        Provide either package_name (exact) or app_name (keyword search).
        If both provided, package_name takes priority.

        Args:
            package_name: Exact package name (e.g., 'com.revomon.vr').
            app_name: Keyword to search for package (e.g., 'revomon').
            timeout: Max seconds to wait for app to close.
            wait_time: Seconds between verification retries.

        Returns:
            True if app is confirmed closed, False otherwise.

        Raises:
            ValueError: If neither package_name nor app_name is provided.
        """
        if package_name:
            pkg = package_name
        elif app_name:
            pkg = self.find_package_by_keyword(app_name)
            if not pkg:
                self.logger.error(f"Could not find package for '{app_name}'")
                return False
        else:
            raise ValueError("Must provide either package_name or app_name")

        self.logger.debug(f"Closing app: {pkg}")

        if not self._device.available:
            self.logger.warning("ADB not connected. Cannot close app.")
            return False

        self.run_command(f"am force-stop {pkg}")

        # Poll to confirm app closed
        start_time = time()
        while time() - start_time < timeout:
            if not self.is_app_running(pkg, max_retries=1, wait_time=0):
                self.logger.debug(f"Verified {pkg} is closed")
                return True
            sleep(wait_time)

        self.logger.warning(f"{pkg} may still be running after {timeout}s")
        return False

    def close_all_apps(self, exclude: list[str] | None = None) -> None:
        """Force stops all packages to clear the device state.

        Args:
            exclude: Optional list of package names to exclude from closing.
        """
        self.logger.debug("Closing all apps...")

        # Get list of all packages
        output = self.run_command("pm list packages", decode=True)
        if not output:
            self.logger.warning("No packages found.")
            return

        # Output format: package:com.example.app
        # run_command with decode=True returns str
        packages = [
            line.replace("package:", "").strip()
            for line in output.splitlines()
            if line.strip()
        ]
        exclude = exclude or []

        count = 0
        for pkg in packages:
            if pkg in exclude:
                continue
            self.run_command(f"am force-stop {pkg}")
            count += 1

        self.logger.debug(f"Closed {count} apps.")

    def tap(self, x: int, y: int) -> bool:
        """Performs a simple tap at (x, y).

        Args:
            x: X coordinate.
            y: Y coordinate.
        """
        self.logger.debug(f"Tapping at ({x}, {y})")
        output = self.run_command(f"input tap {x} {y}")
        if output:
            self.logger.debug(f"Tap at ({x}, {y}) successful")
            return True
        else:
            self.logger.debug(f"Tap at ({x}, {y}) failed")
            return False

    def type_text(self, text: str, enter: bool = False) -> bool:
        """Types text on the device.
        Args:
            text: The text to type.
            enter: Whether to press enter after typing.
        Returns:
            True if successful, False otherwise.
        """
        self.logger.debug(f"Typing text: {text} ...")
        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Skipping 'type_text' method call."
            )
            return False

        # Escape special characters for ADB input text
        # Spaces become %s, other special chars need escaping
        escaped_text = text.replace("\\", "\\\\")  # Escape backslashes first
        escaped_text = escaped_text.replace(" ", "%s")
        escaped_text = escaped_text.replace("'", "\\'")
        escaped_text = escaped_text.replace('"', '\\"')
        escaped_text = escaped_text.replace("&", "\\&")
        escaped_text = escaped_text.replace("<", "\\<")
        escaped_text = escaped_text.replace(">", "\\>")
        escaped_text = escaped_text.replace(";", "\\;")
        escaped_text = escaped_text.replace("|", "\\|")

        self.run_command(f"input text '{escaped_text}'")
        self.logger.debug(f"Text '{text}' sent via ADB")
        if enter:
            self.press_enter()
        return True

    def go_home(self) -> bool:
        """Navigates to the home screen.

        Returns:
            True if successful, False otherwise.
        """
        self.logger.debug("PymordialAdbDevice navigating to home screen...")
        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Skipping 'go_home' method call."
            )
            return False
        # Go to home screen
        self.run_command(f"input keyevent {self.config['keyevents']['home']}")
        sleep(self.config["default_wait_time"])
        self.logger.debug("PymordialAdbDevice successfully navigated to home screen.")
        return True

    def capture_screenshot(self) -> bytes | None:
        """Captures a screenshot of the device.

        Returns:
            The screenshot as bytes, or None if failed.
        """
        self.logger.debug("Capturing screenshot...")
        if not self._device.available:
            self.logger.warning("ADB not connected. Skipping screenshot.")
            return None

        try:
            screenshot_bytes: bytes | None = self.run_command(
                self.config["commands"]["screencap"]
            )
            if screenshot_bytes:
                self.logger.debug("Screenshot captured successfully")
                return screenshot_bytes
        except Exception as e:
            self.logger.error(f"Error capturing screenshot: {e}")
        return None

    # width and height defaults need
    # to be made into class variables
    def start_stream(self) -> bool:
        """Starts screen streaming using adb-shell's streaming_shell with PyAV decoding.

        Automatically detects resolution from a screenshot.

        Returns:
            True if stream started successfully, False otherwise.
        """
        self.logger.debug("Starting PymordialAdbDevice stream...")
        if self._is_streaming.is_set():
            self.logger.debug("PymordialAdbDevice stream already running")
            return True

        if not self._device.available:
            self.logger.error("Cannot start stream: not connected")
            return False

        # Auto-detect resolution
        screenshot_bytes = self.capture_screenshot()
        if not screenshot_bytes:
            self.logger.error(
                "Cannot start stream: failed to capture screenshot for resolution detection"
            )
            return False

        try:
            img = Image.open(BytesIO(screenshot_bytes))
            width, height = img.size
            size_arg = f"{width}x{height}"
            self.logger.debug(f"Detected resolution: {width}x{height}")
        except Exception as e:
            self.logger.error(f"Error detecting resolution: {e}")
            return False

        self._is_streaming.set()
        command = (
            f"screenrecord --output-format=h264 "
            # f"--verbose "  <--- DELETE THIS LINE
            f"--size {size_arg} "
            f"--bit-rate {self.config['stream']['bitrate']} "
            f"--time-limit {self.config['stream']['time_limit']} "
            f"-"
        )

        stream_reader = PymordialStreamReader(
            queue_size=self.config["stream"]["queue_size"],
            read_timeout=self.config["stream"]["read_timeout"],
        )

        self._stream_thread = threading.Thread(
            target=self._stream_worker,
            args=(command, stream_reader),
            daemon=True,
        )
        self._stream_thread.start()

        # Wait for first frame
        for _ in range(self.config["stream"]["start_timeout_iterations"]):
            if self._latest_frame is not None:
                self.logger.info("PymordialAdbDevice stream started successfully")
                return True
            sleep(self.config["stream"]["start_wait"])

        self.logger.error("PymordialAdbDevice stream timeout: no frames")
        self.stop_stream()
        return False

    def stop_stream(self) -> None:
        """Stops the screen stream."""
        self.logger.debug("Stopping PymordialAdbDevice stream...")
        self._is_streaming.clear()
        if self._stream_thread and self._stream_thread.is_alive():
            self._stream_thread.join(timeout=self.config["stream"]["stop_timeout"])

        # Cleanup: kill background screenrecord and remove temp file
        try:
            self.run_command("pkill -9 screenrecord")
        except Exception:
            pass  # Best effort cleanup

        self._latest_frame = None
        self.logger.debug("PymordialAdbDevice stream stopped")

    def get_latest_frame(self) -> np.ndarray | None:
        """Gets the latest decoded frame from the stream.

        Returns:
            The latest frame as a numpy array (RGB), or None if no frame available.
        """
        # No lock needed - reference read is atomic in Python (GIL)
        # Copy to prevent caller from modifying the frame
        frame = self._latest_frame
        return frame.copy() if frame is not None else None

    def capture_screen(self) -> "bytes | np.ndarray | None":
        """Captures the current BlueStacks screen using the appropriate capture strategy.

        Returns:
            The screenshot as bytes or numpy array, or None if failed.
        """

        if not self.is_connected():
            self.connect()
            if not self.is_connected():
                self.logger.warning(
                    "Cannot capture screen - ADB controller is not initialized"
                )
                return None

        # Always use streaming - start if not active
        if not self.is_streaming:
            self.logger.info("Starting stream for capture_screen...")
            if not self.start_stream():
                self.logger.error(
                    "Failed to start stream, falling back to capture_screenshot."
                )
                return self.capture_screenshot()

        frame = self.get_latest_frame()
        if frame is not None:
            # Validate frame isn't corrupted (all same color)
            if frame.std() < 1.0:  # Nearly uniform = likely corrupted
                self.logger.warning(
                    "Frame appears corrupted (uniform color), restarting stream..."
                )
                self.stop_stream()
                if self.start_stream():
                    frame = self.get_latest_frame()
                    if frame is not None and frame.std() >= 1.0:
                        self.logger.debug("Returning fresh frame after restart.")
                        return frame
                self.logger.error("Failed to get valid frame after restart")
                return self.capture_screenshot()
            self.logger.debug("Returning latest frame from stream.")
            return frame

        self.logger.warning(
            "Stream active but no frame available. Falling back to screenshot."
        )
        return self.capture_screenshot()

    def press_enter(self) -> bool:
        """Presses the Enter key.

        Returns:
            True if successful, False otherwise.
        """
        self.logger.debug("Pressing enter key...")
        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Skipping 'press_enter' method call."
            )
            return False
        self.run_command(f"input keyevent {self.config['keyevents']['enter']}")
        self.logger.debug("Enter key sent via ADB")
        return True

    def press_esc(self) -> bool:
        """Presses the Esc key.

        Returns:
            True if successful, False otherwise.
        """
        self.logger.debug("Pressing esc key...")
        if not self._device.available:
            self.logger.debug(
                "PymordialAdbDevice device not connected. Skipping 'press_esc' method call."
            )
            return False
        # Send the esc key using ADB
        self.run_command(f"input keyevent {self.config['keyevents']['esc']}")
        self.logger.debug("Esc key sent via ADB")
        return True

    def _get_adb_signer(
        self, adbkey_path: str | None = None
    ) -> list[PythonRSASigner] | None:
        """
        Gets the ADB signer.

        Args:
            adbkey_path: Path to the ADB key. If None, uses the default location of ~/.android/adbkey.

        Returns:
            List of PythonRSASigner objects if the adb key is found, None otherwise.
        """
        try:
            self.logger.debug("Getting signer...")
            # Standard location for ADB keys on Windows
            if adbkey_path is None:
                adbkey_path: str = os.path.expanduser("~/.android/adbkey")
            if os.path.exists(adbkey_path):
                with open(adbkey_path) as f:
                    priv = f.read()
                self.logger.debug("Signer found.")
                return [PythonRSASigner("", priv)]
            self.logger.debug("Signer not found.")
            return None
        except Exception as e:
            self.logger.error(f"Error getting signer: {e}")
            return None

    def _create_adb_device(self) -> AdbDeviceTcp | None:
        self.logger.debug("Creating PymordialAdbDevice device...")
        device: AdbDeviceTcp | None = None
        try:
            device = AdbDeviceTcp(host=self.host, port=self.port)
        except Exception as e:
            self.logger.error(f"Error creating PymordialAdbDevice device: {e}")

        if device:
            self.logger.debug("PymordialAdbDevice device created.")
        else:
            self.logger.error("PymordialAdbDevice device not created.")
        return device

    def _stream_worker(
        self, command: str, stream_reader: PymordialStreamReader
    ) -> None:
        """Worker thread that reads H264 stream and decodes with PyAV."""
        self.logger.debug(f"Starting stream worker with command: {command}")
        stream_device = None
        try:
            # Create dedicated connection for streaming to allow concurrent commands
            stream_device = self._create_adb_device()
            if not stream_device:
                self.logger.error("Failed to create dedicated stream device")
                return

            try:
                stream_device.connect(rsa_keys=self._get_adb_signer())
            except Exception as e:
                self.logger.error(f"Failed to connect dedicated stream device: {e}")
                return

            # Start streaming shell
            stream_gen = stream_device.streaming_shell(command, decode=False)

            # Feed chunks to reader in a separate thread
            def feeder():
                try:
                    for chunk in stream_gen:
                        if not self._is_streaming.is_set():
                            break
                        stream_reader.queue.put(chunk)
                except Exception as e:
                    self.logger.error(f"Feeder error: {e}")
                finally:
                    stream_reader.queue.put(None)  # End signal

            feeder_thread = threading.Thread(target=feeder, daemon=True)
            feeder_thread.start()

            # Decode with PyAV
            with av.open(stream_reader, mode="r", format="h264") as container:
                for frame in container.decode(video=0):
                    if not self._is_streaming.is_set():
                        break
                    rgb_frame = frame.to_ndarray(format="rgb24")
                    # No lock needed - assignment is atomic in Python (GIL)
                    self._latest_frame = rgb_frame

        except Exception as e:
            if self._is_streaming.is_set():
                self.logger.error(f"Stream error: {e}")
        finally:
            stream_reader.close()
            self._is_streaming.clear()
            if stream_device:
                try:
                    stream_device.close()
                except Exception as e:
                    self.logger.error(f"Error closing stream device: {e}")
            self.logger.debug("Stream ended")

is_streaming property

Check if streaming is currently active.

Returns:

Type Description
bool

True if streaming is active, False otherwise.

__init__(host=None, port=None, config=None, adbshell_log_level=WARNING)

Initalizes PymordialAdbDevice.

Parameters:

Name Type Description Default
host str | None

The address of the device; may be an IP address or a host name.

None
port int | None

The device port to which we are connecting.

None
config AdbConfig | None

A TypedDict containing ADB configuration. Defaults to package defaults.

None
adbshell_log_level int

The log level for adb-shell (e.g. logging.WARNING).

WARNING
Source code in src/pymordialblue/devices/adb_device.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self,
    host: str | None = None,
    port: int | None = None,
    config: AdbConfig | None = None,
    adbshell_log_level: int = WARNING,
):
    """Initalizes PymordialAdbDevice.

    Args:
        host: The address of the device; may be an IP address or a host name.
        port: The device port to which we are connecting.
        config: A TypedDict containing ADB configuration. Defaults to package defaults.
        adbshell_log_level: The log level for adb-shell (e.g. logging.WARNING).
    """
    self.logger = getLogger("PymordialAdbDevice")
    basicConfig(
        level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    self.logger.debug("Initalizing PymordialAdbDevice...")
    self.config = copy.deepcopy(config or get_config()["adb"])
    self.host: str = host or self.config["default_host"]
    self.port: int = port or self.config["default_port"]

    getLogger("adb_shell").setLevel(
        adbshell_log_level
    )  # Silence adb_shell debug logs

    self._device: AdbDeviceTcp | None = None

    # Streaming attributes
    self._stream_thread: threading.Thread | None = None
    self._latest_frame: np.ndarray | None = None
    self._is_streaming = threading.Event()
    self.logger.debug("PymordialAdbDevice initalized.")

capture_screen()

Captures the current BlueStacks screen using the appropriate capture strategy.

Returns:

Type Description
'bytes | np.ndarray | None'

The screenshot as bytes or numpy array, or None if failed.

Source code in src/pymordialblue/devices/adb_device.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def capture_screen(self) -> "bytes | np.ndarray | None":
    """Captures the current BlueStacks screen using the appropriate capture strategy.

    Returns:
        The screenshot as bytes or numpy array, or None if failed.
    """

    if not self.is_connected():
        self.connect()
        if not self.is_connected():
            self.logger.warning(
                "Cannot capture screen - ADB controller is not initialized"
            )
            return None

    # Always use streaming - start if not active
    if not self.is_streaming:
        self.logger.info("Starting stream for capture_screen...")
        if not self.start_stream():
            self.logger.error(
                "Failed to start stream, falling back to capture_screenshot."
            )
            return self.capture_screenshot()

    frame = self.get_latest_frame()
    if frame is not None:
        # Validate frame isn't corrupted (all same color)
        if frame.std() < 1.0:  # Nearly uniform = likely corrupted
            self.logger.warning(
                "Frame appears corrupted (uniform color), restarting stream..."
            )
            self.stop_stream()
            if self.start_stream():
                frame = self.get_latest_frame()
                if frame is not None and frame.std() >= 1.0:
                    self.logger.debug("Returning fresh frame after restart.")
                    return frame
            self.logger.error("Failed to get valid frame after restart")
            return self.capture_screenshot()
        self.logger.debug("Returning latest frame from stream.")
        return frame

    self.logger.warning(
        "Stream active but no frame available. Falling back to screenshot."
    )
    return self.capture_screenshot()

capture_screenshot()

Captures a screenshot of the device.

Returns:

Type Description
bytes | None

The screenshot as bytes, or None if failed.

Source code in src/pymordialblue/devices/adb_device.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def capture_screenshot(self) -> bytes | None:
    """Captures a screenshot of the device.

    Returns:
        The screenshot as bytes, or None if failed.
    """
    self.logger.debug("Capturing screenshot...")
    if not self._device.available:
        self.logger.warning("ADB not connected. Skipping screenshot.")
        return None

    try:
        screenshot_bytes: bytes | None = self.run_command(
            self.config["commands"]["screencap"]
        )
        if screenshot_bytes:
            self.logger.debug("Screenshot captured successfully")
            return screenshot_bytes
    except Exception as e:
        self.logger.error(f"Error capturing screenshot: {e}")
    return None

close_all_apps(exclude=None)

Force stops all packages to clear the device state.

Parameters:

Name Type Description Default
exclude list[str] | None

Optional list of package names to exclude from closing.

None
Source code in src/pymordialblue/devices/adb_device.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def close_all_apps(self, exclude: list[str] | None = None) -> None:
    """Force stops all packages to clear the device state.

    Args:
        exclude: Optional list of package names to exclude from closing.
    """
    self.logger.debug("Closing all apps...")

    # Get list of all packages
    output = self.run_command("pm list packages", decode=True)
    if not output:
        self.logger.warning("No packages found.")
        return

    # Output format: package:com.example.app
    # run_command with decode=True returns str
    packages = [
        line.replace("package:", "").strip()
        for line in output.splitlines()
        if line.strip()
    ]
    exclude = exclude or []

    count = 0
    for pkg in packages:
        if pkg in exclude:
            continue
        self.run_command(f"am force-stop {pkg}")
        count += 1

    self.logger.debug(f"Closed {count} apps.")

close_app(package_name=None, app_name=None, timeout=5.0, wait_time=0.5)

Closes an app and verifies it stopped.

Provide either package_name (exact) or app_name (keyword search). If both provided, package_name takes priority.

Parameters:

Name Type Description Default
package_name str | None

Exact package name (e.g., 'com.revomon.vr').

None
app_name str | None

Keyword to search for package (e.g., 'revomon').

None
timeout float

Max seconds to wait for app to close.

5.0
wait_time float

Seconds between verification retries.

0.5

Returns:

Type Description
bool

True if app is confirmed closed, False otherwise.

Raises:

Type Description
ValueError

If neither package_name nor app_name is provided.

Source code in src/pymordialblue/devices/adb_device.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def close_app(
    self,
    package_name: str | None = None,
    app_name: str | None = None,
    timeout: float = 5.0,
    wait_time: float = 0.5,
) -> bool:
    """Closes an app and verifies it stopped.

    Provide either package_name (exact) or app_name (keyword search).
    If both provided, package_name takes priority.

    Args:
        package_name: Exact package name (e.g., 'com.revomon.vr').
        app_name: Keyword to search for package (e.g., 'revomon').
        timeout: Max seconds to wait for app to close.
        wait_time: Seconds between verification retries.

    Returns:
        True if app is confirmed closed, False otherwise.

    Raises:
        ValueError: If neither package_name nor app_name is provided.
    """
    if package_name:
        pkg = package_name
    elif app_name:
        pkg = self.find_package_by_keyword(app_name)
        if not pkg:
            self.logger.error(f"Could not find package for '{app_name}'")
            return False
    else:
        raise ValueError("Must provide either package_name or app_name")

    self.logger.debug(f"Closing app: {pkg}")

    if not self._device.available:
        self.logger.warning("ADB not connected. Cannot close app.")
        return False

    self.run_command(f"am force-stop {pkg}")

    # Poll to confirm app closed
    start_time = time()
    while time() - start_time < timeout:
        if not self.is_app_running(pkg, max_retries=1, wait_time=0):
            self.logger.debug(f"Verified {pkg} is closed")
            return True
        sleep(wait_time)

    self.logger.warning(f"{pkg} may still be running after {timeout}s")
    return False

disconnect()

Disconnects the PymordialAdbDevice device.

Returns:

Type Description
bool

True if disconnected (or already disconnected), False on error.

Source code in src/pymordialblue/devices/adb_device.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def disconnect(self) -> bool:
    """Disconnects the PymordialAdbDevice device.

    Returns:
        True if disconnected (or already disconnected), False on error.
    """
    self.logger.debug("Disconnecting from PymordialAdbDevice device...")
    self.stop_stream()  # Stop streaming if active

    if self._device is None or not self._device.available:
        return True

    try:
        self._device.close()
        if not self._device.available:
            self.logger.debug("Disconnected from PymordialAdbDevice device.")
            return True
        self.logger.debug("Failed to disconnect from PymordialAdbDevice device.")
        return False
    except Exception as e:
        self.logger.error(
            f"Error disconnecting from PymordialAdbDevice device: {e}"
        )
        return False

get_focused_app()

Gets information about the currently focused app.

Returns:

Type Description
dict[str, str] | None

A dictionary with 'package' and 'activity', or None if failed.

Source code in src/pymordialblue/devices/adb_device.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def get_focused_app(self) -> dict[str, str] | None:
    """Gets information about the currently focused app.

    Returns:
        A dictionary with 'package' and 'activity', or None if failed.
    """
    self.logger.debug("Getting focused app info...")
    # 'dumpsys window windows' is more reliable than just 'dumpsys window' on some Android versions
    # Look for mCurrentFocus or mFocusedApp lines
    output = self.run_command(
        "dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'", decode=True
    )
    if not output:
        return None

    # Format: mCurrentFocus=Window{... u0 com.android.settings/com.android.settings.Settings}
    # Format: mFocusedApp=AppWindowToken{... token=Token{... u0 com.example/com.example.MainActivity}}
    import re

    match = re.search(r"([a-zA-Z0-9._]+)/([a-zA-Z0-9._$]+)", output)
    if match:
        pkg, activity = match.groups()
        self.logger.debug(f"Focused app: package={pkg}, activity={activity}")
        return {"package": pkg, "activity": activity}

    self.logger.debug("Could not parse focused app info from dumpsys")
    return None

get_latest_frame()

Gets the latest decoded frame from the stream.

Returns:

Type Description
ndarray | None

The latest frame as a numpy array (RGB), or None if no frame available.

Source code in src/pymordialblue/devices/adb_device.py
667
668
669
670
671
672
673
674
675
676
def get_latest_frame(self) -> np.ndarray | None:
    """Gets the latest decoded frame from the stream.

    Returns:
        The latest frame as a numpy array (RGB), or None if no frame available.
    """
    # No lock needed - reference read is atomic in Python (GIL)
    # Copy to prevent caller from modifying the frame
    frame = self._latest_frame
    return frame.copy() if frame is not None else None

go_home()

Navigates to the home screen.

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def go_home(self) -> bool:
    """Navigates to the home screen.

    Returns:
        True if successful, False otherwise.
    """
    self.logger.debug("PymordialAdbDevice navigating to home screen...")
    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Skipping 'go_home' method call."
        )
        return False
    # Go to home screen
    self.run_command(f"input keyevent {self.config['keyevents']['home']}")
    sleep(self.config["default_wait_time"])
    self.logger.debug("PymordialAdbDevice successfully navigated to home screen.")
    return True

initialize(config)

Initializes the ADB device plugin.

Parameters:

Name Type Description Default
config 'PymordialBlueConfig'

Global Pymordial configuration dictionary.

required
Source code in src/pymordialblue/devices/adb_device.py
85
86
87
88
89
90
91
92
93
def initialize(self, config: "PymordialBlueConfig") -> None:
    """Initializes the ADB device plugin.

    Args:
        config: Global Pymordial configuration dictionary.
    """
    # In a full comprehensive implementation, we might reload config here.
    # For now, we rely on __init__ logic or manual property updates.
    pass

is_app_running(package_name=None, pymordial_app=None, app_name=None, max_retries=2, wait_time=1)

Checks if an app is running using pidof command.

Parameters:

Name Type Description Default
package_name str | None

Direct package name (e.g., 'com.android.settings').

None
pymordial_app 'PymordialApp' | None

PymordialApp instance to extract package from.

None
app_name str | None

App name keyword to search for.

None
max_retries int

Number of retries.

2
wait_time int

Time to wait between retries.

1

Returns:

Type Description
bool

True if running, False otherwise.

Priority: package_name > pymordial_app > app_name.

Source code in src/pymordialblue/devices/adb_device.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def is_app_running(
    self,
    package_name: str | None = None,
    pymordial_app: "PymordialApp" | None = None,
    app_name: str | None = None,
    max_retries: int = 2,
    wait_time: int = 1,
) -> bool:
    """Checks if an app is running using pidof command.

    Args:
        package_name: Direct package name (e.g., 'com.android.settings').
        pymordial_app: PymordialApp instance to extract package from.
        app_name: App name keyword to search for.
        max_retries: Number of retries.
        wait_time: Time to wait between retries.

    Returns:
        True if running, False otherwise.

    Priority: package_name > pymordial_app > app_name.
    """
    # Handle case where PymordialApp might be passed positionally as package_name
    if package_name and not isinstance(package_name, str):
        if hasattr(package_name, "package_name"):
            pymordial_app = package_name
            package_name = None

    pkg = (
        package_name
        or (pymordial_app.package_name if pymordial_app else None)
        or (self.find_package_by_keyword(app_name) if app_name else None)
    )

    if not pkg:
        self.logger.warning(
            f"Could not resolve package for is_app_running (package_name={package_name}, "
            f"pymordial_app={pymordial_app}, app_name={app_name})"
        )
        return False

    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Attempting to reconnect..."
        )
        if not self.connect():
            raise ConnectionError(
                "PymordialAdbDevice device not connected and reconnection failed."
            )
        self.logger.debug("PymordialAdbDevice device reconnected.")

    for attempt in range(max_retries):
        try:
            output = self.run_command(f"pidof {pkg}", decode=True)
            if output and output.strip():
                self.logger.debug(f"Found {pkg} running with PID: {output.strip()}")
                return True
        except Exception as e:
            self.logger.debug(f"pidof check failed: {e}")

        if attempt < max_retries - 1:
            self.logger.debug(
                f"{pkg} not found. Retrying ({attempt + 1}/{max_retries})..."
            )
            sleep(wait_time)

    self.logger.debug(f"{pkg} not found after {max_retries} attempts")
    return False

is_connected()

Checks if PymordialAdbDevice device is connected.

Returns:

Type Description
bool

True if the device is connected, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def is_connected(self) -> bool:
    """Checks if PymordialAdbDevice device is connected.

    Returns:
        True if the device is connected, False otherwise.
    """
    self.logger.debug("Checking if PymordialAdbDevice device is connected...")
    if self._device is None:
        self.logger.debug(
            "PymordialAdbDevice device not initialized. Use connect() method to initialize."
        )
        return False
    if not self._device.available:
        self.logger.debug("PymordialAdbDevice device not connected.")
        return False
    self.logger.debug("PymordialAdbDevice device connected.")
    return True

open_app(app_name, package_name=None, timeout=10.0, wait_time=1.0)

Opens an app by name or package, with optional verification. Args: app_name: Name/keyword to search for the package if package_name not provided. package_name: Optional exact package name. timeout: Max seconds to wait for app to start (if verify=True). wait_time: Seconds between verification retries. Returns: True if launched, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def open_app(
    self,
    app_name: str,
    package_name: str | None = None,
    timeout: float = 10.0,
    wait_time: float = 1.0,
) -> bool:
    """Opens an app by name or package, with optional verification.
    Args:
        app_name: Name/keyword to search for the package if package_name not provided.
        package_name: Optional exact package name.
        timeout: Max seconds to wait for app to start (if verify=True).
        wait_time: Seconds between verification retries.
    Returns:
        True if launched, False otherwise.
    """
    pkg = package_name or self.find_package_by_keyword(app_name)
    if not pkg:
        self.logger.error(f"Could not find package for '{app_name}'")
        return False
    # Try activity-based launch first (faster, more reliable)
    activity = self.get_launch_activity(pkg)
    if activity:
        self.run_command(f"am start -n {activity}")
        self.logger.debug(f"Launched {app_name} via Activity: {activity}")
    else:
        # Fallback to monkey
        self.logger.debug(
            f"Activity not found, using Monkey fallback for {app_name}"
        )
        self.run_command(f"monkey -p {pkg} -c android.intent.category.LAUNCHER 1")
        self.logger.debug(f"Launched {app_name} via Monkey")

    start_time = time()
    while time() - start_time < timeout:
        if self.is_app_running(pkg, max_retries=1, wait_time=0):
            self.logger.debug(f"Verified {app_name} is running")
            return True
        sleep(wait_time)
    self.logger.warning(f"{app_name} did not start within {timeout}s")
    return False

press_enter()

Presses the Enter key.

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
def press_enter(self) -> bool:
    """Presses the Enter key.

    Returns:
        True if successful, False otherwise.
    """
    self.logger.debug("Pressing enter key...")
    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Skipping 'press_enter' method call."
        )
        return False
    self.run_command(f"input keyevent {self.config['keyevents']['enter']}")
    self.logger.debug("Enter key sent via ADB")
    return True

press_esc()

Presses the Esc key.

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def press_esc(self) -> bool:
    """Presses the Esc key.

    Returns:
        True if successful, False otherwise.
    """
    self.logger.debug("Pressing esc key...")
    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Skipping 'press_esc' method call."
        )
        return False
    # Send the esc key using ADB
    self.run_command(f"input keyevent {self.config['keyevents']['esc']}")
    self.logger.debug("Esc key sent via ADB")
    return True

run_command(command, decode=False)

Executes a shell command and returns the output.

Parameters:

Name Type Description Default
command str

The command to execute.

required

Returns:

Type Description
bytes | None

The command output as bytes, or None if not connected.

Source code in src/pymordialblue/devices/adb_device.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def run_command(self, command: str, decode: bool = False) -> bytes | None:
    """Executes a shell command and returns the output.

    Args:
        command: The command to execute.

    Returns:
        The command output as bytes, or None if not connected.
    """
    self.logger.debug(f"Executing shell command: {command}...")
    try:
        output = self._device.shell(
            command,
            timeout_s=self.config["commands"]["timeout"],
            read_timeout_s=self.config["commands"]["read_timeout"],
            transport_timeout_s=self.config["commands"]["transport_timeout"],
            decode=decode,
        )
    except Exception as e:
        self.logger.error(f"Failed to execute shell command {command}: {e}")
        return None
    if output and decode and isinstance(output, str):
        output = output.strip()
    self.logger.debug(f"Shell command {command} executed successfully.")
    return output

show_recent_apps()

Shows the recent apps drawer.

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def show_recent_apps(self) -> bool:
    """Shows the recent apps drawer.

    Returns:
        True if successful, False otherwise.
    """
    self.logger.debug("Showing recent apps...")
    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Skipping 'show_recent_apps' method call."
        )
        return False
    self.run_command(f"input keyevent {self.config['keyevents']['app_switch']}")
    self.logger.debug("Recent apps drawer successfully opened")
    return True

shutdown()

Disconnects and cleans up resources.

Source code in src/pymordialblue/devices/adb_device.py
95
96
97
def shutdown(self) -> None:
    """Disconnects and cleans up resources."""
    self.disconnect()

start_stream()

Starts screen streaming using adb-shell's streaming_shell with PyAV decoding.

Automatically detects resolution from a screenshot.

Returns:

Type Description
bool

True if stream started successfully, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def start_stream(self) -> bool:
    """Starts screen streaming using adb-shell's streaming_shell with PyAV decoding.

    Automatically detects resolution from a screenshot.

    Returns:
        True if stream started successfully, False otherwise.
    """
    self.logger.debug("Starting PymordialAdbDevice stream...")
    if self._is_streaming.is_set():
        self.logger.debug("PymordialAdbDevice stream already running")
        return True

    if not self._device.available:
        self.logger.error("Cannot start stream: not connected")
        return False

    # Auto-detect resolution
    screenshot_bytes = self.capture_screenshot()
    if not screenshot_bytes:
        self.logger.error(
            "Cannot start stream: failed to capture screenshot for resolution detection"
        )
        return False

    try:
        img = Image.open(BytesIO(screenshot_bytes))
        width, height = img.size
        size_arg = f"{width}x{height}"
        self.logger.debug(f"Detected resolution: {width}x{height}")
    except Exception as e:
        self.logger.error(f"Error detecting resolution: {e}")
        return False

    self._is_streaming.set()
    command = (
        f"screenrecord --output-format=h264 "
        # f"--verbose "  <--- DELETE THIS LINE
        f"--size {size_arg} "
        f"--bit-rate {self.config['stream']['bitrate']} "
        f"--time-limit {self.config['stream']['time_limit']} "
        f"-"
    )

    stream_reader = PymordialStreamReader(
        queue_size=self.config["stream"]["queue_size"],
        read_timeout=self.config["stream"]["read_timeout"],
    )

    self._stream_thread = threading.Thread(
        target=self._stream_worker,
        args=(command, stream_reader),
        daemon=True,
    )
    self._stream_thread.start()

    # Wait for first frame
    for _ in range(self.config["stream"]["start_timeout_iterations"]):
        if self._latest_frame is not None:
            self.logger.info("PymordialAdbDevice stream started successfully")
            return True
        sleep(self.config["stream"]["start_wait"])

    self.logger.error("PymordialAdbDevice stream timeout: no frames")
    self.stop_stream()
    return False

stop_stream()

Stops the screen stream.

Source code in src/pymordialblue/devices/adb_device.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def stop_stream(self) -> None:
    """Stops the screen stream."""
    self.logger.debug("Stopping PymordialAdbDevice stream...")
    self._is_streaming.clear()
    if self._stream_thread and self._stream_thread.is_alive():
        self._stream_thread.join(timeout=self.config["stream"]["stop_timeout"])

    # Cleanup: kill background screenrecord and remove temp file
    try:
        self.run_command("pkill -9 screenrecord")
    except Exception:
        pass  # Best effort cleanup

    self._latest_frame = None
    self.logger.debug("PymordialAdbDevice stream stopped")

tap(x, y)

Performs a simple tap at (x, y).

Parameters:

Name Type Description Default
x int

X coordinate.

required
y int

Y coordinate.

required
Source code in src/pymordialblue/devices/adb_device.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def tap(self, x: int, y: int) -> bool:
    """Performs a simple tap at (x, y).

    Args:
        x: X coordinate.
        y: Y coordinate.
    """
    self.logger.debug(f"Tapping at ({x}, {y})")
    output = self.run_command(f"input tap {x} {y}")
    if output:
        self.logger.debug(f"Tap at ({x}, {y}) successful")
        return True
    else:
        self.logger.debug(f"Tap at ({x}, {y}) failed")
        return False

type_text(text, enter=False)

Types text on the device. Args: text: The text to type. enter: Whether to press enter after typing. Returns: True if successful, False otherwise.

Source code in src/pymordialblue/devices/adb_device.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def type_text(self, text: str, enter: bool = False) -> bool:
    """Types text on the device.
    Args:
        text: The text to type.
        enter: Whether to press enter after typing.
    Returns:
        True if successful, False otherwise.
    """
    self.logger.debug(f"Typing text: {text} ...")
    if not self._device.available:
        self.logger.debug(
            "PymordialAdbDevice device not connected. Skipping 'type_text' method call."
        )
        return False

    # Escape special characters for ADB input text
    # Spaces become %s, other special chars need escaping
    escaped_text = text.replace("\\", "\\\\")  # Escape backslashes first
    escaped_text = escaped_text.replace(" ", "%s")
    escaped_text = escaped_text.replace("'", "\\'")
    escaped_text = escaped_text.replace('"', '\\"')
    escaped_text = escaped_text.replace("&", "\\&")
    escaped_text = escaped_text.replace("<", "\\<")
    escaped_text = escaped_text.replace(">", "\\>")
    escaped_text = escaped_text.replace(";", "\\;")
    escaped_text = escaped_text.replace("|", "\\|")

    self.run_command(f"input text '{escaped_text}'")
    self.logger.debug(f"Text '{text}' sent via ADB")
    if enter:
        self.press_enter()
    return True

BlueStacks Device

pymordialblue.devices.bluestacks_device

Controller for managing the BlueStacks emulator.

PymordialBluestacksDevice

Bases: PymordialEmulatorDevice

Controls the BlueStacks emulator.

Attributes:

Name Type Description
running_apps list[PymordialApp] | list

A list of currently running PymordialApp instances.

state list[PymordialApp] | list

The state machine managing the BlueStacks lifecycle state.

elements list[PymordialApp] | list

A container for BlueStacks UI elements.

elements list[PymordialApp] | list

A container for BlueStacks UI elements.

config

The configuration dictionary for BlueStacks.

Source code in src/pymordialblue/devices/bluestacks_device.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class PymordialBluestacksDevice(PymordialEmulatorDevice):
    """Controls the BlueStacks emulator.

    Attributes:
        running_apps: A list of currently running PymordialApp instances.
        state: The state machine managing the BlueStacks lifecycle state.
        elements: A container for BlueStacks UI elements.
        elements: A container for BlueStacks UI elements.
        config: The configuration dictionary for BlueStacks.
    """

    name: str = "bluestacks"
    version: str = "0.1.0"

    def __init__(
        self,
        adb_bridge_device: PymordialAdbDevice | None = None,
        vision_device: PymordialVisionDevice | None = None,
        config: BluestacksConfig | None = None,
    ) -> None:
        """Initializes the PymordialBluestacksDevice.

        Args:
            adb_bridge_device: The bridge device (e.g. PymordialAdbDevice) used for
                low-level ADB interactions.
            vision_device: The vision device used for screen analysis.
            config: A TypedDict containing BlueStacks configuration options.
                Defaults to package defaults if None.
        """
        self.logger = getLogger("PymordialBluestacksDevice")
        basicConfig(
            level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        self.logger.info("Initializing PymordialBluestacksDevice...")
        super().__init__()
        self.config = copy.deepcopy(config or get_config()["bluestacks"])
        self.running_apps: list[PymordialApp] | list = list()

        self._adb_bridge_device: PymordialAdbDevice | None = adb_bridge_device
        self._vision_device: PymordialVisionDevice | None = vision_device
        self._ref_window_size: tuple[int, int] = tuple(
            self.config["default_resolution"]
        )

        self._filepath: str | None = None
        self._hd_player_exe: str = self.config["hd_player_exe"]

        self.state.register_handler(EmulatorState.LOADING, self.wait_for_load, None)
        self.state.register_handler(EmulatorState.READY, self._connect_adb, None)

        self._autoset_filepath()

        self.logger.debug(
            f"PymordialBluestacksDevice initialized with the following state:\n{self.state}\n"
        )

    def _connect_adb(self) -> None:
        """Connects the ADB bridge device if available."""
        if self._adb_bridge_device:
            self._adb_bridge_device.connect()
        else:
            self.logger.warning(
                "ADB bridge device not set, cannot connect during READY transition."
            )

    def initialize(self, config: "PymordialBlueConfig") -> None:
        """Initializes the BlueStacks device plugin with configuration.

        Args:
            config: Global Pymordial configuration dictionary.
        """
        pass

    def set_dependencies(
        self,
        adb_bridge_device: PymordialAdbDevice,
        vision_device: PymordialVisionDevice,
    ) -> None:
        """Sets external dependencies (dependency injection).

        Args:
            adb_bridge_device: The ADB bridge device.
            vision_device: The vision device.
        """
        self._adb_bridge_device = adb_bridge_device
        self._vision_device = vision_device

    def shutdown(self) -> None:
        """Kills the emulator process."""
        self.close()

    @property
    def ref_window_size(self) -> tuple[int, int] | None:
        """Gets the reference window size.

        Returns:
            A tuple containing (width, height) in pixels, or None if not set.
        """
        return self._ref_window_size

    @ref_window_size.setter
    @log_property_setter
    def ref_window_size(self, width_height: tuple[int | str, int | str]) -> None:
        """Sets the reference window size.

        Args:
            width_height: A tuple containing (width, height) in pixels. Values
                can be integers or string representations of integers.

        Raises:
            ValueError: If the provided width or height are not integers (or
                strings representing integers), or if they are not positive.
        """
        width = width_height[0]
        height = width_height[1]
        if not isinstance(width, int):
            if isinstance(width, str) and width.isdigit():
                width: int = int(width)
                if width <= 0:
                    self.logger.warning(
                        "ValueError while trying to set PymordialBluestacksDevice 'ref_window_size': Provided width must be positive integers!"
                    )
                    raise ValueError("Provided width must be positive integers")
            else:
                self.logger.warning(
                    "ValueError while trying to set PymordialBluestacksDevice 'ref_window_size': Provided width must be an integer or the string representation of an integer!"
                )
                raise ValueError(
                    "Provided width must be integer or the string representation of an integer!"
                )

        if not isinstance(height, int):
            if isinstance(height, str) and height.isdigit():
                height: int = int(height)
                if height <= 0:
                    self.logger.warning(
                        "ValueError while trying to set PymordialBluestacksDevice 'ref_window_size': Provided height must be positive integers!"
                    )
                    raise ValueError("Provided height must be positive integers")
            else:
                self.logger.warning(
                    "ValueError while trying to set PymordialBluestacksDevice 'ref_window_size': Provided height must be an integer or the string representation of an integer!"
                )
                raise ValueError(
                    "Provided height must be integer or the string representation of an integer!"
                )

        self._ref_window_size = (width, height)

    @property
    def filepath(self) -> str | None:
        """Gets the BlueStacks executable filepath.

        Returns:
            The absolute path to the HD-Player.exe file as a string, or None
            if it has not been determined.
        """
        return self._filepath

    @filepath.setter
    @log_property_setter
    def filepath(self, filepath: str) -> None:
        """Sets the BlueStacks executable filepath.

        Args:
            filepath: The absolute path to HD-Player.exe.

        Raises:
            ValueError: If the provided filepath is not a string or if the path
                does not exist on the filesystem.
        """
        if not isinstance(filepath, str):
            self.logger.warning(
                "ValueError while trying to set PymordialBluestacksDevice 'filepath': Provided filepath must be a string!"
            )
            raise ValueError("Provided filepath must be a string")

        if not os.path.exists(filepath):
            self.logger.warning(
                "ValueError while trying to set PymordialBluestacksDevice 'filepath': Provided filepath does not exist!"
            )
            raise ValueError("Provided filepath does not exist")

        self._filepath: str = filepath

    def open(
        self,
        max_retries: int | None = None,
        wait_time: int | None = None,
        timeout_s: int | None = None,
    ) -> None:
        """Opens the BlueStacks emulator application.

        Args:
            max_retries: The maximum number of attempts to detect the process
                after launching. Defaults to the configuration value.
            wait_time: The time in seconds to wait between detection attempts.
                Defaults to the configuration value.
            timeout_s: The maximum total time in seconds to wait for the process
                to appear before timing out. Defaults to the configuration value.

        Raises:
            ValueError: If BlueStacks fails to start due to an OS error.
            Exception: If the BlueStacks process window is not found after the
                specified retries or timeout period.
        """
        max_retries: int = validate_and_convert_int(
            max_retries or self.config["default_open_app_max_retries"], "max_retries"
        )
        wait_time: int = validate_and_convert_int(
            wait_time or self.config["default_open_app_wait_time"], "wait_time"
        )
        timeout_s: int = validate_and_convert_int(
            timeout_s or self.config["default_open_app_timeout"], "timeout_s"
        )
        match self.state.current_state:
            case EmulatorState.CLOSED:
                self.logger.info("Opening Bluestacks controller...")
                if not self._filepath:
                    self._autoset_filepath()
                try:
                    os.startfile(self._filepath)
                except Exception as e:
                    self.logger.error(f"Failed to start Bluestacks: {e}")
                    raise ValueError(f"Failed to start Bluestacks: {e}")

                start_time: float = time.time()

                for attempt in range(max_retries):
                    is_open: bool = any(
                        p.name().lower() == self._hd_player_exe.lower()
                        for p in psutil.process_iter(["name"])
                    )
                    if is_open:
                        self.logger.info("Bluestacks controller opened successfully.")
                        # Transition to LOADING - state handler will automatically call wait_for_load()
                        self.state.transition_to(EmulatorState.LOADING)
                        return

                    if time.time() - start_time > timeout_s:
                        self.logger.error(
                            "Timeout waiting for Bluestacks window to appear"
                        )
                        raise Exception(
                            "Timeout waiting for Bluestacks window to appear"
                        )

                    self.logger.warning(
                        f"Attempt {attempt + 1}/{max_retries}: Could not find Bluestacks window."
                    )
                    time.sleep(wait_time)

                self.logger.error(
                    f"Failed to find Bluestacks window after all attempts {attempt + 1}/{max_retries}"
                )
                raise Exception(
                    f"Failed to find Bluestacks window after all attempts {attempt + 1}/{max_retries}"
                )
            case EmulatorState.LOADING:
                self.logger.info(
                    "Bluestacks controller is already open and currently loading."
                )
                return
            case EmulatorState.READY:
                self.logger.info("Bluestacks controller is already open and ready.")
                return

    def open_settings(self) -> bool:
        """Opens the Settings app using a verified activity name.

        Returns:
            True if opened successfully, False otherwise.
        """
        # Based on manual verification:
        # package: com.bluestacks.settings
        # activity: .SettingsActivity
        self.logger.info("Opening Settings...")
        return self._adb_bridge_device.open_app(
            "settings", package_name="com.bluestacks.settings"
        )

    def wait_for_load(self, timeout_s: int | None = None) -> None:
        """Waits for Bluestacks to finish loading by polling the ADB connection.

        This method blocks until the emulator is responsive via ADB or the
        timeout is reached.

        Args:
            timeout_s: The maximum number of seconds to wait for the emulator
                to load. Defaults to the configuration value.
        """
        self.logger.debug("Waiting for Bluestacks to load (ADB check)...")
        start_time = time.time()
        timeout_s = timeout_s or self.config["default_load_timeout"]

        while self.state.current_state == EmulatorState.LOADING:
            # Try to connect to ADB
            if self._adb_bridge_device.connect():
                self._adb_bridge_device.disconnect()
                default_ui_load_wait_time: int = self.config[
                    "default_ui_load_wait_time"
                ]
                self.logger.debug(
                    f"Waiting {default_ui_load_wait_time} seconds for UI to stabilize..."
                )
                time.sleep(default_ui_load_wait_time)
                self._adb_bridge_device.connect()
                self.logger.info("Bluestacks is loaded & ready.")
                self.state.transition_to(EmulatorState.READY)
                return

            # Check timeout
            if time.time() - start_time > timeout_s:
                self.logger.error(
                    f"Timeout waiting for Bluestacks to load after {timeout_s} seconds."
                )
                # We transition to READY anyway to allow retry logic elsewhere if needed,
                # or maybe we should raise? For now, mimicking previous behavior.
                self.state.transition_to(EmulatorState.READY)
                return

            time.sleep(self.config["default_load_wait_time"])

    def is_ready(self) -> bool:
        """Checks if BlueStacks is in the READY state.

        Returns:
            True if the current state is EmulatorState.READY, False otherwise.
        """
        return self.state.current_state == EmulatorState.READY

    def close(self) -> bool:
        """Kills the Bluestacks controller process.

        This will also disconnect the ADB bridge device.

        Returns:
            True if the Bluestacks process was found and killed, or if no
            process was found running. False if the process was found but
            could not be killed.

        Raises:
            ValueError: If an unexpected error occurs during the process
                killing routine.
        """
        self.logger.info("Killing Bluestacks controller...")

        try:
            process_found = False
            for proc in psutil.process_iter(["pid", "name"]):
                if proc.info["name"] == self._hd_player_exe:
                    process_found = True
                    try:
                        self._adb_bridge_device.disconnect()
                    except Exception as e:
                        self.logger.warning(
                            f"Error in close method while trying to disconnect adb bridge: {e}\nContinuing to close the Bluestacks process..."
                        )
                    try:
                        proc.kill()
                        proc.wait(timeout=self.config["default_process_kill_timeout"])
                    except (
                        psutil.NoSuchProcess,
                        psutil.AccessDenied,
                        psutil.ZombieProcess,
                    ):
                        return False

            if not process_found:
                self.logger.debug("Bluestacks process was not found.")
                return False

            if self.state.current_state != EmulatorState.CLOSED:
                self.state.transition_to(EmulatorState.CLOSED)

            self.logger.info("Bluestacks process killed.")
            return True

        except Exception as e:
            self.logger.error(f"Error in close: {e}")
            raise ValueError(f"Failed to kill Bluestacks: {e}")

    def _autoset_filepath(self) -> None:
        """Automatically detects and sets the BlueStacks executable path.

        This method attempts to locate `HD-Player.exe` by searching:
        1. Standard "Program Files" locations.
        2. Common custom installation paths.
        3. The current working directory.
        4. A broad walk of the C: drive (if initial checks fail).

        Raises:
            FileNotFoundError: If `HD-Player.exe` cannot be located automatically
                in any of the searched locations.
        """
        self.logger.debug("Setting filepath...")

        # Common installation paths for BlueStacks
        search_paths = [
            # Standard Program Files locations
            os.path.join(
                os.environ.get("ProgramFiles", ""),
                "BlueStacks_nxt",
                self._hd_player_exe,
            ),
            os.path.join(
                os.environ.get("ProgramFiles(x86)", ""),
                "BlueStacks_nxt",
                self._hd_player_exe,
            ),
            # Alternative BlueStacks versions
            os.path.join(
                os.environ.get("ProgramFiles", ""), "BlueStacks", self._hd_player_exe
            ),
            os.path.join(
                os.environ.get("ProgramFiles(x86)", ""),
                "BlueStacks",
                self._hd_player_exe,
            ),
            # Common custom installation paths
            f"C:\\Program Files\\BlueStacks_nxt\\{self._hd_player_exe}",
            f"C:\\Program Files (x86)\\BlueStacks_nxt\\{self._hd_player_exe}",
            f"C:\\BlueStacks\\{self._hd_player_exe}",
            f"C:\\BlueStacks_nxt\\{self._hd_player_exe}",
            # Check if file exists in current directory or subdirectories
            self._hd_player_exe,
        ]

        # Remove empty paths from environment variables
        search_paths = [
            path for path in search_paths if path and path != self._hd_player_exe
        ]

        # Add current working directory relative paths
        cwd = os.getcwd()
        search_paths.extend(
            [
                os.path.join(cwd, "BlueStacks_nxt", self._hd_player_exe),
                os.path.join(cwd, "BlueStacks", self._hd_player_exe),
            ]
        )

        self.logger.debug(
            f"Searching for HD-Player.exe in {len(search_paths)} locations"
        )

        for potential_path in search_paths:
            if os.path.exists(potential_path) and os.path.isfile(potential_path):
                self._filepath = potential_path
                self.logger.debug(f"HD-Player.exe filepath set to {self._filepath}.")
                return
            else:
                self.logger.debug(f"Checked path (does not exist): {potential_path}")

        # If we still haven't found it, try a broader search
        self.logger.debug("Performing broader search for HD-Player.exe...")
        try:
            for root, dirs, files in os.walk("C:\\"):
                if self._hd_player_exe in files:
                    potential_path = os.path.join(root, self._hd_player_exe)
                    if "bluestacks" in root.lower():
                        self._filepath = potential_path
                        self.logger.debug(
                            f"HD-Player.exe found via broad search: {self._filepath}"
                        )
                        return
        except Exception as e:
            self.logger.debug(f"Broad search failed: {e}")

        self.logger.error(
            "Could not find HD-Player.exe. Please ensure BlueStacks is installed or manually specify the filepath."
        )
        self.logger.error(f"Searched paths: {search_paths}")
        self.logger.error(f"Current working directory: {os.getcwd()}")
        self.logger.error(f"ProgramFiles: {os.environ.get('ProgramFiles')}")
        self.logger.error(f"ProgramFiles(x86): {os.environ.get('ProgramFiles(x86)')}")
        raise FileNotFoundError(
            "Could not find HD-Player.exe. Please ensure BlueStacks is installed or manually specify the filepath."
        )

filepath property writable

Gets the BlueStacks executable filepath.

Returns:

Type Description
str | None

The absolute path to the HD-Player.exe file as a string, or None

str | None

if it has not been determined.

ref_window_size property writable

Gets the reference window size.

Returns:

Type Description
tuple[int, int] | None

A tuple containing (width, height) in pixels, or None if not set.

__init__(adb_bridge_device=None, vision_device=None, config=None)

Initializes the PymordialBluestacksDevice.

Parameters:

Name Type Description Default
adb_bridge_device PymordialAdbDevice | None

The bridge device (e.g. PymordialAdbDevice) used for low-level ADB interactions.

None
vision_device PymordialVisionDevice | None

The vision device used for screen analysis.

None
config BluestacksConfig | None

A TypedDict containing BlueStacks configuration options. Defaults to package defaults if None.

None
Source code in src/pymordialblue/devices/bluestacks_device.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    adb_bridge_device: PymordialAdbDevice | None = None,
    vision_device: PymordialVisionDevice | None = None,
    config: BluestacksConfig | None = None,
) -> None:
    """Initializes the PymordialBluestacksDevice.

    Args:
        adb_bridge_device: The bridge device (e.g. PymordialAdbDevice) used for
            low-level ADB interactions.
        vision_device: The vision device used for screen analysis.
        config: A TypedDict containing BlueStacks configuration options.
            Defaults to package defaults if None.
    """
    self.logger = getLogger("PymordialBluestacksDevice")
    basicConfig(
        level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    self.logger.info("Initializing PymordialBluestacksDevice...")
    super().__init__()
    self.config = copy.deepcopy(config or get_config()["bluestacks"])
    self.running_apps: list[PymordialApp] | list = list()

    self._adb_bridge_device: PymordialAdbDevice | None = adb_bridge_device
    self._vision_device: PymordialVisionDevice | None = vision_device
    self._ref_window_size: tuple[int, int] = tuple(
        self.config["default_resolution"]
    )

    self._filepath: str | None = None
    self._hd_player_exe: str = self.config["hd_player_exe"]

    self.state.register_handler(EmulatorState.LOADING, self.wait_for_load, None)
    self.state.register_handler(EmulatorState.READY, self._connect_adb, None)

    self._autoset_filepath()

    self.logger.debug(
        f"PymordialBluestacksDevice initialized with the following state:\n{self.state}\n"
    )

close()

Kills the Bluestacks controller process.

This will also disconnect the ADB bridge device.

Returns:

Type Description
bool

True if the Bluestacks process was found and killed, or if no

bool

process was found running. False if the process was found but

bool

could not be killed.

Raises:

Type Description
ValueError

If an unexpected error occurs during the process killing routine.

Source code in src/pymordialblue/devices/bluestacks_device.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def close(self) -> bool:
    """Kills the Bluestacks controller process.

    This will also disconnect the ADB bridge device.

    Returns:
        True if the Bluestacks process was found and killed, or if no
        process was found running. False if the process was found but
        could not be killed.

    Raises:
        ValueError: If an unexpected error occurs during the process
            killing routine.
    """
    self.logger.info("Killing Bluestacks controller...")

    try:
        process_found = False
        for proc in psutil.process_iter(["pid", "name"]):
            if proc.info["name"] == self._hd_player_exe:
                process_found = True
                try:
                    self._adb_bridge_device.disconnect()
                except Exception as e:
                    self.logger.warning(
                        f"Error in close method while trying to disconnect adb bridge: {e}\nContinuing to close the Bluestacks process..."
                    )
                try:
                    proc.kill()
                    proc.wait(timeout=self.config["default_process_kill_timeout"])
                except (
                    psutil.NoSuchProcess,
                    psutil.AccessDenied,
                    psutil.ZombieProcess,
                ):
                    return False

        if not process_found:
            self.logger.debug("Bluestacks process was not found.")
            return False

        if self.state.current_state != EmulatorState.CLOSED:
            self.state.transition_to(EmulatorState.CLOSED)

        self.logger.info("Bluestacks process killed.")
        return True

    except Exception as e:
        self.logger.error(f"Error in close: {e}")
        raise ValueError(f"Failed to kill Bluestacks: {e}")

initialize(config)

Initializes the BlueStacks device plugin with configuration.

Parameters:

Name Type Description Default
config PymordialBlueConfig

Global Pymordial configuration dictionary.

required
Source code in src/pymordialblue/devices/bluestacks_device.py
90
91
92
93
94
95
96
def initialize(self, config: "PymordialBlueConfig") -> None:
    """Initializes the BlueStacks device plugin with configuration.

    Args:
        config: Global Pymordial configuration dictionary.
    """
    pass

is_ready()

Checks if BlueStacks is in the READY state.

Returns:

Type Description
bool

True if the current state is EmulatorState.READY, False otherwise.

Source code in src/pymordialblue/devices/bluestacks_device.py
348
349
350
351
352
353
354
def is_ready(self) -> bool:
    """Checks if BlueStacks is in the READY state.

    Returns:
        True if the current state is EmulatorState.READY, False otherwise.
    """
    return self.state.current_state == EmulatorState.READY

open(max_retries=None, wait_time=None, timeout_s=None)

Opens the BlueStacks emulator application.

Parameters:

Name Type Description Default
max_retries int | None

The maximum number of attempts to detect the process after launching. Defaults to the configuration value.

None
wait_time int | None

The time in seconds to wait between detection attempts. Defaults to the configuration value.

None
timeout_s int | None

The maximum total time in seconds to wait for the process to appear before timing out. Defaults to the configuration value.

None

Raises:

Type Description
ValueError

If BlueStacks fails to start due to an OS error.

Exception

If the BlueStacks process window is not found after the specified retries or timeout period.

Source code in src/pymordialblue/devices/bluestacks_device.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def open(
    self,
    max_retries: int | None = None,
    wait_time: int | None = None,
    timeout_s: int | None = None,
) -> None:
    """Opens the BlueStacks emulator application.

    Args:
        max_retries: The maximum number of attempts to detect the process
            after launching. Defaults to the configuration value.
        wait_time: The time in seconds to wait between detection attempts.
            Defaults to the configuration value.
        timeout_s: The maximum total time in seconds to wait for the process
            to appear before timing out. Defaults to the configuration value.

    Raises:
        ValueError: If BlueStacks fails to start due to an OS error.
        Exception: If the BlueStacks process window is not found after the
            specified retries or timeout period.
    """
    max_retries: int = validate_and_convert_int(
        max_retries or self.config["default_open_app_max_retries"], "max_retries"
    )
    wait_time: int = validate_and_convert_int(
        wait_time or self.config["default_open_app_wait_time"], "wait_time"
    )
    timeout_s: int = validate_and_convert_int(
        timeout_s or self.config["default_open_app_timeout"], "timeout_s"
    )
    match self.state.current_state:
        case EmulatorState.CLOSED:
            self.logger.info("Opening Bluestacks controller...")
            if not self._filepath:
                self._autoset_filepath()
            try:
                os.startfile(self._filepath)
            except Exception as e:
                self.logger.error(f"Failed to start Bluestacks: {e}")
                raise ValueError(f"Failed to start Bluestacks: {e}")

            start_time: float = time.time()

            for attempt in range(max_retries):
                is_open: bool = any(
                    p.name().lower() == self._hd_player_exe.lower()
                    for p in psutil.process_iter(["name"])
                )
                if is_open:
                    self.logger.info("Bluestacks controller opened successfully.")
                    # Transition to LOADING - state handler will automatically call wait_for_load()
                    self.state.transition_to(EmulatorState.LOADING)
                    return

                if time.time() - start_time > timeout_s:
                    self.logger.error(
                        "Timeout waiting for Bluestacks window to appear"
                    )
                    raise Exception(
                        "Timeout waiting for Bluestacks window to appear"
                    )

                self.logger.warning(
                    f"Attempt {attempt + 1}/{max_retries}: Could not find Bluestacks window."
                )
                time.sleep(wait_time)

            self.logger.error(
                f"Failed to find Bluestacks window after all attempts {attempt + 1}/{max_retries}"
            )
            raise Exception(
                f"Failed to find Bluestacks window after all attempts {attempt + 1}/{max_retries}"
            )
        case EmulatorState.LOADING:
            self.logger.info(
                "Bluestacks controller is already open and currently loading."
            )
            return
        case EmulatorState.READY:
            self.logger.info("Bluestacks controller is already open and ready.")
            return

open_settings()

Opens the Settings app using a verified activity name.

Returns:

Type Description
bool

True if opened successfully, False otherwise.

Source code in src/pymordialblue/devices/bluestacks_device.py
292
293
294
295
296
297
298
299
300
301
302
303
304
def open_settings(self) -> bool:
    """Opens the Settings app using a verified activity name.

    Returns:
        True if opened successfully, False otherwise.
    """
    # Based on manual verification:
    # package: com.bluestacks.settings
    # activity: .SettingsActivity
    self.logger.info("Opening Settings...")
    return self._adb_bridge_device.open_app(
        "settings", package_name="com.bluestacks.settings"
    )

set_dependencies(adb_bridge_device, vision_device)

Sets external dependencies (dependency injection).

Parameters:

Name Type Description Default
adb_bridge_device PymordialAdbDevice

The ADB bridge device.

required
vision_device PymordialVisionDevice

The vision device.

required
Source code in src/pymordialblue/devices/bluestacks_device.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def set_dependencies(
    self,
    adb_bridge_device: PymordialAdbDevice,
    vision_device: PymordialVisionDevice,
) -> None:
    """Sets external dependencies (dependency injection).

    Args:
        adb_bridge_device: The ADB bridge device.
        vision_device: The vision device.
    """
    self._adb_bridge_device = adb_bridge_device
    self._vision_device = vision_device

shutdown()

Kills the emulator process.

Source code in src/pymordialblue/devices/bluestacks_device.py
112
113
114
def shutdown(self) -> None:
    """Kills the emulator process."""
    self.close()

wait_for_load(timeout_s=None)

Waits for Bluestacks to finish loading by polling the ADB connection.

This method blocks until the emulator is responsive via ADB or the timeout is reached.

Parameters:

Name Type Description Default
timeout_s int | None

The maximum number of seconds to wait for the emulator to load. Defaults to the configuration value.

None
Source code in src/pymordialblue/devices/bluestacks_device.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def wait_for_load(self, timeout_s: int | None = None) -> None:
    """Waits for Bluestacks to finish loading by polling the ADB connection.

    This method blocks until the emulator is responsive via ADB or the
    timeout is reached.

    Args:
        timeout_s: The maximum number of seconds to wait for the emulator
            to load. Defaults to the configuration value.
    """
    self.logger.debug("Waiting for Bluestacks to load (ADB check)...")
    start_time = time.time()
    timeout_s = timeout_s or self.config["default_load_timeout"]

    while self.state.current_state == EmulatorState.LOADING:
        # Try to connect to ADB
        if self._adb_bridge_device.connect():
            self._adb_bridge_device.disconnect()
            default_ui_load_wait_time: int = self.config[
                "default_ui_load_wait_time"
            ]
            self.logger.debug(
                f"Waiting {default_ui_load_wait_time} seconds for UI to stabilize..."
            )
            time.sleep(default_ui_load_wait_time)
            self._adb_bridge_device.connect()
            self.logger.info("Bluestacks is loaded & ready.")
            self.state.transition_to(EmulatorState.READY)
            return

        # Check timeout
        if time.time() - start_time > timeout_s:
            self.logger.error(
                f"Timeout waiting for Bluestacks to load after {timeout_s} seconds."
            )
            # We transition to READY anyway to allow retry logic elsewhere if needed,
            # or maybe we should raise? For now, mimicking previous behavior.
            self.state.transition_to(EmulatorState.READY)
            return

        time.sleep(self.config["default_load_wait_time"])

UI Device

pymordialblue.devices.ui_device

Controller for image processing and element detection.

PymordialUiDevice

Bases: PymordialVisionDevice

Handles all visual recognition tasks.

This class consolidates image recognition (template matching), pixel color detection, and Optical Character Recognition (OCR) into a single interface. It routes element finding requests to the appropriate backend based on the element type (PymordialImage, PymordialPixel, or PymordialText).

Attributes:

Name Type Description
bridge_device

Helper for underlying device operations (e.g., ADB).

config VisionConfig

Configuration dictionary for vision settings (timeouts, retries).

ocr_engine VisionConfig

The backend engine used for text extraction (e.g., Tesseract).

Source code in src/pymordialblue/devices/ui_device.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
class PymordialUiDevice(PymordialVisionDevice):
    """Handles all visual recognition tasks.

    This class consolidates image recognition (template matching), pixel color detection,
    and Optical Character Recognition (OCR) into a single interface. It routes
    element finding requests to the appropriate backend based on the element type
    (PymordialImage, PymordialPixel, or PymordialText).

    Attributes:
        bridge_device: Helper for underlying device operations (e.g., ADB).
        config: Configuration dictionary for vision settings (timeouts, retries).
        ocr_engine: The backend engine used for text extraction (e.g., Tesseract).
    """

    name: str = "ui"
    version: str = "0.1.0"

    def __init__(
        self,
        bridge_device: PymordialBridgeDevice | None = None,
        config: VisionConfig | None = None,
    ):
        """Initializes the PymordialUiDevice.

        Args:
            bridge_device: Optional PymordialBridgeDevice for device interactions.
                If None, a new PymordialAdbDevice will be created.
            config: Optional configuration dictionary. If None, loads defaults
                from the global configuration.
        """
        self.logger = getLogger("PymordialUiDevice")
        basicConfig(
            level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        self._bridge_device: PymordialBridgeDevice = (
            bridge_device or PymordialAdbDevice()
        )
        self._ocr_device: PymordialOCRDevice = PymordialTesseractDevice()
        self.config: VisionConfig = copy.deepcopy(config or get_config()["ui"])

    def initialize(self, config: "PymordialBlueConfig") -> None:
        """Initializes the UI device plugin with configuration.

        Args:
            config: Global Pymordial configuration dictionary.
        """
        # TODO: Use config to set OCR strategy/timeouts if provided
        pass

    def set_bridge_device(self, bridge_device: PymordialBridgeDevice) -> None:
        """Sets the bridge device (dependency injection).

        Args:
             bridge_device: The bridge device instance to use.
        """
        self._bridge_device = bridge_device

    def shutdown(self) -> None:
        """Cleans up resources."""
        pass

    def scale_img_to_screen(
        self,
        image_path: str,
        screen_image: "str | Image.Image | bytes | np.ndarray",
        bluestacks_resolution: tuple[int, int],
    ) -> Image.Image:
        """Scales an image to match the current screen resolution.

        Args:
            image_path: The file path to the reference image (needle) to scale.
            screen_image: The current screen content. Can be a file path,
                bytes, numpy array, or a PIL Image object.
            bluestacks_resolution: The reference resolution (width, height)
                that the image_path was originally captured at.

        Returns:
            A PIL Image object of the needle image, scaled to match the
            dimensions of the provided screen_image.
        """
        # If screen_image is bytes, convert to PIL Image
        if isinstance(screen_image, bytes):
            screen_image = Image.open(BytesIO(screen_image))
        # If screen_image is numpy array, convert to PIL Image
        elif isinstance(screen_image, np.ndarray):
            screen_image = Image.fromarray(screen_image)
        # If screen_image is a string (file path), open it
        elif isinstance(screen_image, str):
            screen_image = Image.open(screen_image)

        # At this point, screen_image should be a PIL Image
        game_screen_width, game_screen_height = screen_image.size

        needle_img: Image.Image = Image.open(image_path)

        needle_img_size: tuple[int, int] = needle_img.size

        original_window_size: tuple[int, int] = bluestacks_resolution

        ratio_width: float = game_screen_width / original_window_size[0]
        ratio_height: float = game_screen_height / original_window_size[1]

        scaled_image_size: tuple[int, int] = (
            int(needle_img_size[0] * ratio_width),
            int(needle_img_size[1] * ratio_height),
        )
        scaled_image: Image.Image = needle_img.resize(scaled_image_size)
        return scaled_image

    def check_pixel_color(
        self,
        pymordial_pixel: PymordialPixel,
        pymordial_screenshot: "bytes | np.ndarray | None" = None,
    ) -> bool | None:
        """Checks if a specific pixel matches a target color within a given tolerance.

        If a screenshot is not provided, one will be captured automatically from
        the connected device.

        Args:
            pymordial_pixel: The pixel element definition containing coordinates,
                target color, and tolerance.
            pymordial_screenshot: Optional current screen content. Can be bytes
                or a numpy array. If None, a fresh screenshot is captured.

        Returns:
            True if the pixel at the specified coordinates matches the target
            color within the tolerance. Returns None if the pixel element has
            no position defined.

        Raises:
            ValueError: If the screenshot cannot be captured, processed, or if
                the pixel definition is invalid (e.g., missing coordinates).
        """

        def check_color_with_tolerance(
            color1: tuple[int, int, int], color2: tuple[int, int, int], tolerance: int
        ) -> bool:
            """Check if two colors are within a certain tolerance."""
            return all(abs(c1 - c2) <= tolerance for c1, c2 in zip(color1, color2))

        # Capture screen if we don't have an image to check
        if pymordial_screenshot is None:
            # Ensures ADB is connected
            if not self._bridge_device.is_connected():
                self._bridge_device.connect()
                if not self._bridge_device.is_connected():
                    raise ValueError("ADB is not connected")

            try:
                pymordial_screenshot = self._bridge_device.capture_screen()
                if pymordial_screenshot is None:
                    self.logger.warning("Failed to capture screen.")
            # except TcpTimeoutException:
            #    raise TcpTimeoutException(
            #        f"TCP timeout while finding element {pymordial_element.label}"
            #    )
            except Exception as e:
                self.logger.error(f"Error capturing screen: {e}")

        try:
            if pymordial_pixel.position is None:
                self.logger.warning(
                    f"PymordialPixel {pymordial_pixel.label} has no position defined. Cannot find."
                )
                return None

            # Ensure coordinates are integers
            target_coords = (
                int(pymordial_pixel.position[0]),
                int(pymordial_pixel.position[1]),
            )

            # Scale coordinates if og_resolution is defined
            if pymordial_pixel.og_resolution is not None:
                # Get the actual screenshot dimensions
                if isinstance(pymordial_screenshot, bytes):
                    with Image.open(BytesIO(pymordial_screenshot)) as img:
                        actual_width, actual_height = img.size
                elif isinstance(pymordial_screenshot, np.ndarray):
                    actual_height, actual_width = pymordial_screenshot.shape[:2]
                else:
                    actual_width, actual_height = pymordial_pixel.og_resolution

                og_width, og_height = pymordial_pixel.og_resolution
                scale_x = actual_width / og_width
                scale_y = actual_height / og_height
                original_coords = target_coords
                target_coords = (
                    int(target_coords[0] * scale_x),
                    int(target_coords[1] * scale_y),
                )
                self.logger.debug(
                    f"Resolution scaling for '{pymordial_pixel.label}': "
                    f"og={og_width}x{og_height}, actual={actual_width}x{actual_height}, "
                    f"scale=({scale_x:.2f}, {scale_y:.2f}), "
                    f"coords {original_coords} -> {target_coords}"
                )

            if len(target_coords) != 2:
                raise ValueError(
                    f"Coords for {pymordial_pixel.label} must be a tuple of two values, not {target_coords}"
                )
            if len(pymordial_pixel.pixel_color) != 3:
                raise ValueError(
                    f"Pixel color for {pymordial_pixel.label} must be a tuple of three values, not {pymordial_pixel.pixel_color}"
                )
            if pymordial_pixel.tolerance < 0:
                raise ValueError(
                    f"Tolerance for {pymordial_pixel.label} must be a non-negative integer, not {pymordial_pixel.tolerance}"
                )

            if pymordial_screenshot is None:
                raise ValueError(
                    f"Failed to capture screenshot for {pymordial_pixel.label}"
                )

            if isinstance(pymordial_screenshot, bytes):
                with Image.open(BytesIO(pymordial_screenshot)) as image:
                    pixel_color = image.getpixel(target_coords)
                    is_match = check_color_with_tolerance(
                        pixel_color,
                        pymordial_pixel.pixel_color,
                        pymordial_pixel.tolerance,
                    )
                    self.logger.debug(
                        f"Pixel check '{pymordial_pixel.label}' at {target_coords}: "
                        f"actual={pixel_color}, expected={pymordial_pixel.pixel_color}, "
                        f"tolerance={pymordial_pixel.tolerance}, match={is_match}"
                    )
                    return is_match
            elif isinstance(pymordial_screenshot, np.ndarray):
                image = Image.fromarray(pymordial_screenshot)
                pixel_color = image.getpixel(target_coords)
                is_match = check_color_with_tolerance(
                    pixel_color,
                    pymordial_pixel.pixel_color,
                    pymordial_pixel.tolerance,
                )
                self.logger.debug(
                    f"Pixel check '{pymordial_pixel.label}' at {target_coords}: "
                    f"actual={pixel_color}, expected={pymordial_pixel.pixel_color}, "
                    f"tolerance={pymordial_pixel.tolerance}, match={is_match}"
                )
                return is_match
            else:
                raise ValueError(
                    f"Image must be a bytes or numpy array, not {type(pymordial_screenshot)}"
                )

        except ValueError as e:
            self.logger.error(f"ValueError in check_pixel_color: {e}")
            raise
        except Exception as e:
            self.logger.error(f"Error in check_pixel_color: {e}")
            raise ValueError(f"Error checking pixel color: {e}") from e

    def where_element(
        self,
        pymordial_element: PymordialElement,
        pymordial_screenshot: "bytes | np.ndarray | None" = None,
        max_tries: int | None = None,
        set_position: bool = False,
        set_size: bool = False,
    ) -> tuple[int, int] | None:
        """Finds the coordinates of a PymordialElement on the screen.

        Args:
            pymordial_element: The PymordialElement instance to locate.
            pymordial_screenshot: Optional pre-captured screenshot (bytes or
                numpy array) to search within. If None, a new screenshot will
                be captured.
            max_tries: Maximum number of attempts to find the element. If None,
                it will retry indefinitely (useful for waiting on loading screens).
            set_position: If True, updates the element's `position` attribute
                with the found coordinates.
            set_size: If True, updates the element's `size` attribute with the
                found dimensions.

        Returns:
            A tuple containing (x, y) coordinates of the element's center if found,
            or None if the element was not found after the specified attempts.

        Note:
            If max_tries is None, this method loops indefinitely until the element
            is found.
        """
        self.logger.debug(f"Looking for PymordialElement: {pymordial_element.label}...")
        if max_tries is None:
            max_tries = self.config["default_find_ui_retries"]

        find_ui_retries: int = 0
        pymordial_screenshot = pymordial_screenshot

        while (find_ui_retries < max_tries) if max_tries is not None else True:
            # Capture screen if we don't have an image to check
            if pymordial_screenshot is None:
                # Ensures ADB is connected
                if not self._bridge_device.is_connected():
                    self._bridge_device.connect()
                    if not self._bridge_device.is_connected():
                        raise ValueError("ADB is not connected")

                try:
                    pymordial_screenshot = self._bridge_device.capture_screen()
                    if pymordial_screenshot is None:
                        self.logger.warning("Failed to capture screen.")
                except TcpTimeoutException:
                    raise TcpTimeoutException(
                        f"TCP timeout while finding element {pymordial_element.label}"
                    )
                except Exception as e:
                    self.logger.error(f"Error capturing screen: {e}")

            if pymordial_screenshot is not None:
                if isinstance(pymordial_element, PymordialImage):
                    ui_location = None
                    try:
                        if isinstance(pymordial_screenshot, bytes):
                            haystack_img = Image.open(BytesIO(pymordial_screenshot))
                        elif isinstance(pymordial_screenshot, np.ndarray):
                            haystack_img = Image.fromarray(pymordial_screenshot)
                        elif isinstance(pymordial_screenshot, Image.Image):
                            haystack_img = pymordial_screenshot
                        else:
                            # Should not happen if capture_screen returns correct types
                            self.logger.warning(
                                f"Unsupported image type: {type(pymordial_screenshot)}. Attempting to open as file path if string."
                            )
                            if isinstance(pymordial_screenshot, str):
                                haystack_img = Image.open(pymordial_screenshot)
                            else:
                                raise ValueError(
                                    f"Unsupported image type: {type(pymordial_screenshot)}"
                                )

                        # Scale the needle image to match current resolution
                        scaled_img = self.scale_img_to_screen(
                            image_path=pymordial_element.filepath,
                            screen_image=haystack_img,
                            bluestacks_resolution=pymordial_element.og_resolution,
                        )

                        # Prepare OpenCV images
                        haystack_cv = cv2.cvtColor(
                            np.array(haystack_img), cv2.COLOR_RGB2BGR
                        )
                        needle_cv = cv2.cvtColor(
                            np.array(scaled_img), cv2.COLOR_RGB2BGR
                        )

                        if pymordial_element.region:
                            left, top, width, height = pymordial_element.region
                            haystack_cv = haystack_cv[
                                top : top + height, left : left + width
                            ]
                        else:
                            left, top = 0, 0

                        result = cv2.matchTemplate(
                            haystack_cv, needle_cv, cv2.TM_CCOEFF_NORMED
                        )
                        _, max_val, _, max_loc = cv2.minMaxLoc(result)

                        if max_val >= pymordial_element.confidence:
                            # max_loc is (x, y) relative to the region
                            match_x = max_loc[0] + left
                            match_y = max_loc[1] + top
                            match_w, match_h = scaled_img.size

                            ui_location = (match_x, match_y, match_w, match_h)
                        else:
                            ui_location = None

                    except Exception as e:
                        self.logger.error(
                            f"Error finding element {pymordial_element.label}: {e}"
                        )

                    if ui_location:
                        # coords = center(ui_location) -> (x + w//2, y + h//2)
                        coords = (
                            ui_location[0] + ui_location[2] // 2,
                            ui_location[1] + ui_location[3] // 2,
                        )
                        self.logger.debug(
                            f"PymordialImage {pymordial_element.label} found at: {coords}"
                        )

                        if set_position:
                            # ui_location is (left, top, width, height)
                            pymordial_element.position = (
                                ui_location[0],
                                ui_location[1],
                            )
                            self.logger.debug(
                                f"Updated position for {pymordial_element.label} to {pymordial_element.position}"
                            )

                        if set_size:
                            # ui_location is (left, top, width, height)
                            pymordial_element.size = (ui_location[2], ui_location[3])
                            self.logger.debug(
                                f"Updated size for {pymordial_element.label} to {pymordial_element.size}"
                            )

                        return coords
                else:
                    raise NotImplementedError(
                        f"Element type: {type(pymordial_element)} is not supported."
                    )

            # Prepare for next retry
            find_ui_retries += 1
            pymordial_screenshot = None  # Force capture on next iteration

            if max_tries is not None and find_ui_retries >= max_tries:
                break

            self.logger.debug(
                f"PymordialImage {pymordial_element.label} not found. Retrying... ({find_ui_retries}/{max_tries})"
            )
            sleep(self.config["default_wait_time"])

        self.logger.info(
            f"Wasn't able to find PymordialImage within {max_tries} retries: {pymordial_element.label}"
        )
        return None

    def where_elements(
        self,
        pymordial_elements: list[PymordialElement],
        pymordial_screenshot: "bytes | np.ndarray | None" = None,
        max_tries: int | None = None,
    ) -> tuple[int, int] | None:
        """Finds the coordinates of the first found element from a list.

        Args:
            pymordial_elements: A list of PymordialElement objects to search for.
            pymordial_screenshot: Optional pre-captured screenshot (bytes or
                numpy array) to avoid capturing a new one.
            max_tries: Maximum number of retries for each element in the list.

        Returns:
            A tuple containing (x, y) coordinates of the first element successfully
            located, or None if no elements from the list were found.
        """
        if max_tries is None:
            max_tries = self.config["default_find_ui_retries"]
        for pymordial_element in pymordial_elements:
            coord: tuple[int, int] | None = self.where_element(
                pymordial_element=pymordial_element,
                pymordial_screenshot=pymordial_screenshot,
                max_tries=max_tries,
            )
            if coord is not None:
                return coord
        return None

    def find_text(
        self,
        text_to_find: str,
        pymordial_screenshot: "Path | bytes | str | np.ndarray",
        strategy: PymordialExtractStrategy | None = None,
    ) -> tuple[int, int] | None:
        """Finds the coordinates of specific text in the image.

        Args:
            text_to_find: The text string to search for.
            pymordial_screenshot: The source image to search within. Can be a
                file path (str or Path) or raw image bytes.
            strategy: Optional preprocessing strategy to apply before OCR.

        Returns:
            A tuple containing (x, y) coordinates of the found text center, or
            None if the text was not found.
        """
        try:
            # Check if the OCR engine supports find_text (it should as per PymordialOCRDevice)
            if hasattr(self._ocr_device, "find_text"):
                # Pass strategy if it's PymordialTesseractDevice, otherwise just the required args
                if isinstance(self._ocr_device, PymordialTesseractDevice):
                    return self._ocr_device.find_text(
                        text_to_find, pymordial_screenshot, strategy=strategy
                    )
                return self._ocr_device.find_text(text_to_find, pymordial_screenshot)
            else:
                self.logger.warning(
                    f"OCR engine {type(self._ocr_device).__name__} does not support find_text"
                )
                return None
        except Exception as e:
            self.logger.error(f"Error finding text in image: {e}")
            return None

    def check_text(
        self,
        text_to_find: str,
        pymordial_screenshot: "Path | bytes | str | np.ndarray",
        case_sensitive: bool = False,
        strategy: PymordialExtractStrategy | None = None,
    ) -> bool:
        """Checks if specific text is present in the image.

        Args:
            text_to_find: The exact text string to search for.
            pymordial_screenshot: The source image to search within. Can be a
                file path (str or Path), raw image bytes, or a numpy array.
            case_sensitive: If True, performs a case-sensitive search.
                Defaults to False.
            strategy: Optional preprocessing strategy to apply before OCR.
                This is currently only supported by PymordialTesseractDevice.

        Returns:
            True if the text is found in the image, False otherwise.

        Raises:
            ValueError: If the image cannot be loaded or processed.
        """

        # Capture screen if we don't have an image to check
        if pymordial_screenshot is None:
            # Ensures ADB is connected
            if not self._bridge_device.is_connected():
                self._bridge_device.connect()
                if not self._bridge_device.is_connected():
                    raise ValueError("ADB is not connected")

            try:
                pymordial_screenshot = self._bridge_device.capture_screen()
                if pymordial_screenshot is None:
                    self.logger.warning("Failed to capture screen.")
            # except TcpTimeoutException:
            #    raise TcpTimeoutException(
            #        f"TCP timeout while finding element {pymordial_element.label}"
            #    )
            except Exception as e:
                self.logger.error(f"Error capturing screen: {e}")

        try:
            # Extract text with optional strategy (if supported)
            if strategy is not None and isinstance(
                self._ocr_device, PymordialTesseractDevice
            ):
                extracted = self._ocr_device.extract_text(
                    pymordial_screenshot, strategy=strategy
                )
            else:
                extracted = self._ocr_device.extract_text(pymordial_screenshot)

            if case_sensitive:
                return text_to_find in extracted
            return text_to_find.lower() in extracted.lower()
        except Exception as e:
            self.logger.error(f"Error checking text in image: {e}")
            raise ValueError(f"Error checking text in image: {e}") from e

    def read_text(
        self,
        pymordial_screenshot: "Path | bytes | str | np.ndarray",
        case_sensitive: bool = False,
        strategy: PymordialExtractStrategy | None = None,
    ) -> list[str]:
        """Reads and extracts text lines from an image.

        Args:
            pymordial_screenshot: The source image to read text from. Can be a
                file path (str or Path), raw image bytes, or a numpy array.
            case_sensitive: If True, preserves the original case of the text.
                If False, converts all text to lowercase. Defaults to False.
            strategy: Optional preprocessing strategy to apply before OCR.
                This is currently only supported by PymordialTesseractDevice.

        Returns:
            A list of strings, where each string corresponds to a line of text
            detected in the image. Empty lines and whitespace-only lines are
            excluded.

        Raises:
            ValueError: If the image cannot be loaded or processed by the OCR engine.
        """

        try:
            # Extract text with optional strategy (if supported)
            if strategy is not None and isinstance(
                self._ocr_device, PymordialTesseractDevice
            ):
                text = self._ocr_device.extract_text(
                    pymordial_screenshot, strategy=strategy
                )
            else:
                text = self._ocr_device.extract_text(pymordial_screenshot)
            if case_sensitive:
                return [line.strip() for line in text.split("\n") if line.strip()]
            return [
                line.strip().lower()
                for line in text.split("\n")
                if line.strip().lower()
            ]
        except Exception as e:
            self.logger.error(f"Error reading text from image: {e}")
            raise ValueError(f"Error reading text from image: {e}") from e

__init__(bridge_device=None, config=None)

Initializes the PymordialUiDevice.

Parameters:

Name Type Description Default
bridge_device PymordialBridgeDevice | None

Optional PymordialBridgeDevice for device interactions. If None, a new PymordialAdbDevice will be created.

None
config VisionConfig | None

Optional configuration dictionary. If None, loads defaults from the global configuration.

None
Source code in src/pymordialblue/devices/ui_device.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    bridge_device: PymordialBridgeDevice | None = None,
    config: VisionConfig | None = None,
):
    """Initializes the PymordialUiDevice.

    Args:
        bridge_device: Optional PymordialBridgeDevice for device interactions.
            If None, a new PymordialAdbDevice will be created.
        config: Optional configuration dictionary. If None, loads defaults
            from the global configuration.
    """
    self.logger = getLogger("PymordialUiDevice")
    basicConfig(
        level=DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    self._bridge_device: PymordialBridgeDevice = (
        bridge_device or PymordialAdbDevice()
    )
    self._ocr_device: PymordialOCRDevice = PymordialTesseractDevice()
    self.config: VisionConfig = copy.deepcopy(config or get_config()["ui"])

check_pixel_color(pymordial_pixel, pymordial_screenshot=None)

Checks if a specific pixel matches a target color within a given tolerance.

If a screenshot is not provided, one will be captured automatically from the connected device.

Parameters:

Name Type Description Default
pymordial_pixel PymordialPixel

The pixel element definition containing coordinates, target color, and tolerance.

required
pymordial_screenshot bytes | ndarray | None

Optional current screen content. Can be bytes or a numpy array. If None, a fresh screenshot is captured.

None

Returns:

Type Description
bool | None

True if the pixel at the specified coordinates matches the target

bool | None

color within the tolerance. Returns None if the pixel element has

bool | None

no position defined.

Raises:

Type Description
ValueError

If the screenshot cannot be captured, processed, or if the pixel definition is invalid (e.g., missing coordinates).

Source code in src/pymordialblue/devices/ui_device.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def check_pixel_color(
    self,
    pymordial_pixel: PymordialPixel,
    pymordial_screenshot: "bytes | np.ndarray | None" = None,
) -> bool | None:
    """Checks if a specific pixel matches a target color within a given tolerance.

    If a screenshot is not provided, one will be captured automatically from
    the connected device.

    Args:
        pymordial_pixel: The pixel element definition containing coordinates,
            target color, and tolerance.
        pymordial_screenshot: Optional current screen content. Can be bytes
            or a numpy array. If None, a fresh screenshot is captured.

    Returns:
        True if the pixel at the specified coordinates matches the target
        color within the tolerance. Returns None if the pixel element has
        no position defined.

    Raises:
        ValueError: If the screenshot cannot be captured, processed, or if
            the pixel definition is invalid (e.g., missing coordinates).
    """

    def check_color_with_tolerance(
        color1: tuple[int, int, int], color2: tuple[int, int, int], tolerance: int
    ) -> bool:
        """Check if two colors are within a certain tolerance."""
        return all(abs(c1 - c2) <= tolerance for c1, c2 in zip(color1, color2))

    # Capture screen if we don't have an image to check
    if pymordial_screenshot is None:
        # Ensures ADB is connected
        if not self._bridge_device.is_connected():
            self._bridge_device.connect()
            if not self._bridge_device.is_connected():
                raise ValueError("ADB is not connected")

        try:
            pymordial_screenshot = self._bridge_device.capture_screen()
            if pymordial_screenshot is None:
                self.logger.warning("Failed to capture screen.")
        # except TcpTimeoutException:
        #    raise TcpTimeoutException(
        #        f"TCP timeout while finding element {pymordial_element.label}"
        #    )
        except Exception as e:
            self.logger.error(f"Error capturing screen: {e}")

    try:
        if pymordial_pixel.position is None:
            self.logger.warning(
                f"PymordialPixel {pymordial_pixel.label} has no position defined. Cannot find."
            )
            return None

        # Ensure coordinates are integers
        target_coords = (
            int(pymordial_pixel.position[0]),
            int(pymordial_pixel.position[1]),
        )

        # Scale coordinates if og_resolution is defined
        if pymordial_pixel.og_resolution is not None:
            # Get the actual screenshot dimensions
            if isinstance(pymordial_screenshot, bytes):
                with Image.open(BytesIO(pymordial_screenshot)) as img:
                    actual_width, actual_height = img.size
            elif isinstance(pymordial_screenshot, np.ndarray):
                actual_height, actual_width = pymordial_screenshot.shape[:2]
            else:
                actual_width, actual_height = pymordial_pixel.og_resolution

            og_width, og_height = pymordial_pixel.og_resolution
            scale_x = actual_width / og_width
            scale_y = actual_height / og_height
            original_coords = target_coords
            target_coords = (
                int(target_coords[0] * scale_x),
                int(target_coords[1] * scale_y),
            )
            self.logger.debug(
                f"Resolution scaling for '{pymordial_pixel.label}': "
                f"og={og_width}x{og_height}, actual={actual_width}x{actual_height}, "
                f"scale=({scale_x:.2f}, {scale_y:.2f}), "
                f"coords {original_coords} -> {target_coords}"
            )

        if len(target_coords) != 2:
            raise ValueError(
                f"Coords for {pymordial_pixel.label} must be a tuple of two values, not {target_coords}"
            )
        if len(pymordial_pixel.pixel_color) != 3:
            raise ValueError(
                f"Pixel color for {pymordial_pixel.label} must be a tuple of three values, not {pymordial_pixel.pixel_color}"
            )
        if pymordial_pixel.tolerance < 0:
            raise ValueError(
                f"Tolerance for {pymordial_pixel.label} must be a non-negative integer, not {pymordial_pixel.tolerance}"
            )

        if pymordial_screenshot is None:
            raise ValueError(
                f"Failed to capture screenshot for {pymordial_pixel.label}"
            )

        if isinstance(pymordial_screenshot, bytes):
            with Image.open(BytesIO(pymordial_screenshot)) as image:
                pixel_color = image.getpixel(target_coords)
                is_match = check_color_with_tolerance(
                    pixel_color,
                    pymordial_pixel.pixel_color,
                    pymordial_pixel.tolerance,
                )
                self.logger.debug(
                    f"Pixel check '{pymordial_pixel.label}' at {target_coords}: "
                    f"actual={pixel_color}, expected={pymordial_pixel.pixel_color}, "
                    f"tolerance={pymordial_pixel.tolerance}, match={is_match}"
                )
                return is_match
        elif isinstance(pymordial_screenshot, np.ndarray):
            image = Image.fromarray(pymordial_screenshot)
            pixel_color = image.getpixel(target_coords)
            is_match = check_color_with_tolerance(
                pixel_color,
                pymordial_pixel.pixel_color,
                pymordial_pixel.tolerance,
            )
            self.logger.debug(
                f"Pixel check '{pymordial_pixel.label}' at {target_coords}: "
                f"actual={pixel_color}, expected={pymordial_pixel.pixel_color}, "
                f"tolerance={pymordial_pixel.tolerance}, match={is_match}"
            )
            return is_match
        else:
            raise ValueError(
                f"Image must be a bytes or numpy array, not {type(pymordial_screenshot)}"
            )

    except ValueError as e:
        self.logger.error(f"ValueError in check_pixel_color: {e}")
        raise
    except Exception as e:
        self.logger.error(f"Error in check_pixel_color: {e}")
        raise ValueError(f"Error checking pixel color: {e}") from e

check_text(text_to_find, pymordial_screenshot, case_sensitive=False, strategy=None)

Checks if specific text is present in the image.

Parameters:

Name Type Description Default
text_to_find str

The exact text string to search for.

required
pymordial_screenshot Path | bytes | str | ndarray

The source image to search within. Can be a file path (str or Path), raw image bytes, or a numpy array.

required
case_sensitive bool

If True, performs a case-sensitive search. Defaults to False.

False
strategy PymordialExtractStrategy | None

Optional preprocessing strategy to apply before OCR. This is currently only supported by PymordialTesseractDevice.

None

Returns:

Type Description
bool

True if the text is found in the image, False otherwise.

Raises:

Type Description
ValueError

If the image cannot be loaded or processed.

Source code in src/pymordialblue/devices/ui_device.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def check_text(
    self,
    text_to_find: str,
    pymordial_screenshot: "Path | bytes | str | np.ndarray",
    case_sensitive: bool = False,
    strategy: PymordialExtractStrategy | None = None,
) -> bool:
    """Checks if specific text is present in the image.

    Args:
        text_to_find: The exact text string to search for.
        pymordial_screenshot: The source image to search within. Can be a
            file path (str or Path), raw image bytes, or a numpy array.
        case_sensitive: If True, performs a case-sensitive search.
            Defaults to False.
        strategy: Optional preprocessing strategy to apply before OCR.
            This is currently only supported by PymordialTesseractDevice.

    Returns:
        True if the text is found in the image, False otherwise.

    Raises:
        ValueError: If the image cannot be loaded or processed.
    """

    # Capture screen if we don't have an image to check
    if pymordial_screenshot is None:
        # Ensures ADB is connected
        if not self._bridge_device.is_connected():
            self._bridge_device.connect()
            if not self._bridge_device.is_connected():
                raise ValueError("ADB is not connected")

        try:
            pymordial_screenshot = self._bridge_device.capture_screen()
            if pymordial_screenshot is None:
                self.logger.warning("Failed to capture screen.")
        # except TcpTimeoutException:
        #    raise TcpTimeoutException(
        #        f"TCP timeout while finding element {pymordial_element.label}"
        #    )
        except Exception as e:
            self.logger.error(f"Error capturing screen: {e}")

    try:
        # Extract text with optional strategy (if supported)
        if strategy is not None and isinstance(
            self._ocr_device, PymordialTesseractDevice
        ):
            extracted = self._ocr_device.extract_text(
                pymordial_screenshot, strategy=strategy
            )
        else:
            extracted = self._ocr_device.extract_text(pymordial_screenshot)

        if case_sensitive:
            return text_to_find in extracted
        return text_to_find.lower() in extracted.lower()
    except Exception as e:
        self.logger.error(f"Error checking text in image: {e}")
        raise ValueError(f"Error checking text in image: {e}") from e

find_text(text_to_find, pymordial_screenshot, strategy=None)

Finds the coordinates of specific text in the image.

Parameters:

Name Type Description Default
text_to_find str

The text string to search for.

required
pymordial_screenshot Path | bytes | str | ndarray

The source image to search within. Can be a file path (str or Path) or raw image bytes.

required
strategy PymordialExtractStrategy | None

Optional preprocessing strategy to apply before OCR.

None

Returns:

Type Description
tuple[int, int] | None

A tuple containing (x, y) coordinates of the found text center, or

tuple[int, int] | None

None if the text was not found.

Source code in src/pymordialblue/devices/ui_device.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def find_text(
    self,
    text_to_find: str,
    pymordial_screenshot: "Path | bytes | str | np.ndarray",
    strategy: PymordialExtractStrategy | None = None,
) -> tuple[int, int] | None:
    """Finds the coordinates of specific text in the image.

    Args:
        text_to_find: The text string to search for.
        pymordial_screenshot: The source image to search within. Can be a
            file path (str or Path) or raw image bytes.
        strategy: Optional preprocessing strategy to apply before OCR.

    Returns:
        A tuple containing (x, y) coordinates of the found text center, or
        None if the text was not found.
    """
    try:
        # Check if the OCR engine supports find_text (it should as per PymordialOCRDevice)
        if hasattr(self._ocr_device, "find_text"):
            # Pass strategy if it's PymordialTesseractDevice, otherwise just the required args
            if isinstance(self._ocr_device, PymordialTesseractDevice):
                return self._ocr_device.find_text(
                    text_to_find, pymordial_screenshot, strategy=strategy
                )
            return self._ocr_device.find_text(text_to_find, pymordial_screenshot)
        else:
            self.logger.warning(
                f"OCR engine {type(self._ocr_device).__name__} does not support find_text"
            )
            return None
    except Exception as e:
        self.logger.error(f"Error finding text in image: {e}")
        return None

initialize(config)

Initializes the UI device plugin with configuration.

Parameters:

Name Type Description Default
config PymordialBlueConfig

Global Pymordial configuration dictionary.

required
Source code in src/pymordialblue/devices/ui_device.py
70
71
72
73
74
75
76
77
def initialize(self, config: "PymordialBlueConfig") -> None:
    """Initializes the UI device plugin with configuration.

    Args:
        config: Global Pymordial configuration dictionary.
    """
    # TODO: Use config to set OCR strategy/timeouts if provided
    pass

read_text(pymordial_screenshot, case_sensitive=False, strategy=None)

Reads and extracts text lines from an image.

Parameters:

Name Type Description Default
pymordial_screenshot Path | bytes | str | ndarray

The source image to read text from. Can be a file path (str or Path), raw image bytes, or a numpy array.

required
case_sensitive bool

If True, preserves the original case of the text. If False, converts all text to lowercase. Defaults to False.

False
strategy PymordialExtractStrategy | None

Optional preprocessing strategy to apply before OCR. This is currently only supported by PymordialTesseractDevice.

None

Returns:

Type Description
list[str]

A list of strings, where each string corresponds to a line of text

list[str]

detected in the image. Empty lines and whitespace-only lines are

list[str]

excluded.

Raises:

Type Description
ValueError

If the image cannot be loaded or processed by the OCR engine.

Source code in src/pymordialblue/devices/ui_device.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
def read_text(
    self,
    pymordial_screenshot: "Path | bytes | str | np.ndarray",
    case_sensitive: bool = False,
    strategy: PymordialExtractStrategy | None = None,
) -> list[str]:
    """Reads and extracts text lines from an image.

    Args:
        pymordial_screenshot: The source image to read text from. Can be a
            file path (str or Path), raw image bytes, or a numpy array.
        case_sensitive: If True, preserves the original case of the text.
            If False, converts all text to lowercase. Defaults to False.
        strategy: Optional preprocessing strategy to apply before OCR.
            This is currently only supported by PymordialTesseractDevice.

    Returns:
        A list of strings, where each string corresponds to a line of text
        detected in the image. Empty lines and whitespace-only lines are
        excluded.

    Raises:
        ValueError: If the image cannot be loaded or processed by the OCR engine.
    """

    try:
        # Extract text with optional strategy (if supported)
        if strategy is not None and isinstance(
            self._ocr_device, PymordialTesseractDevice
        ):
            text = self._ocr_device.extract_text(
                pymordial_screenshot, strategy=strategy
            )
        else:
            text = self._ocr_device.extract_text(pymordial_screenshot)
        if case_sensitive:
            return [line.strip() for line in text.split("\n") if line.strip()]
        return [
            line.strip().lower()
            for line in text.split("\n")
            if line.strip().lower()
        ]
    except Exception as e:
        self.logger.error(f"Error reading text from image: {e}")
        raise ValueError(f"Error reading text from image: {e}") from e

scale_img_to_screen(image_path, screen_image, bluestacks_resolution)

Scales an image to match the current screen resolution.

Parameters:

Name Type Description Default
image_path str

The file path to the reference image (needle) to scale.

required
screen_image str | Image | bytes | ndarray

The current screen content. Can be a file path, bytes, numpy array, or a PIL Image object.

required
bluestacks_resolution tuple[int, int]

The reference resolution (width, height) that the image_path was originally captured at.

required

Returns:

Type Description
Image

A PIL Image object of the needle image, scaled to match the

Image

dimensions of the provided screen_image.

Source code in src/pymordialblue/devices/ui_device.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def scale_img_to_screen(
    self,
    image_path: str,
    screen_image: "str | Image.Image | bytes | np.ndarray",
    bluestacks_resolution: tuple[int, int],
) -> Image.Image:
    """Scales an image to match the current screen resolution.

    Args:
        image_path: The file path to the reference image (needle) to scale.
        screen_image: The current screen content. Can be a file path,
            bytes, numpy array, or a PIL Image object.
        bluestacks_resolution: The reference resolution (width, height)
            that the image_path was originally captured at.

    Returns:
        A PIL Image object of the needle image, scaled to match the
        dimensions of the provided screen_image.
    """
    # If screen_image is bytes, convert to PIL Image
    if isinstance(screen_image, bytes):
        screen_image = Image.open(BytesIO(screen_image))
    # If screen_image is numpy array, convert to PIL Image
    elif isinstance(screen_image, np.ndarray):
        screen_image = Image.fromarray(screen_image)
    # If screen_image is a string (file path), open it
    elif isinstance(screen_image, str):
        screen_image = Image.open(screen_image)

    # At this point, screen_image should be a PIL Image
    game_screen_width, game_screen_height = screen_image.size

    needle_img: Image.Image = Image.open(image_path)

    needle_img_size: tuple[int, int] = needle_img.size

    original_window_size: tuple[int, int] = bluestacks_resolution

    ratio_width: float = game_screen_width / original_window_size[0]
    ratio_height: float = game_screen_height / original_window_size[1]

    scaled_image_size: tuple[int, int] = (
        int(needle_img_size[0] * ratio_width),
        int(needle_img_size[1] * ratio_height),
    )
    scaled_image: Image.Image = needle_img.resize(scaled_image_size)
    return scaled_image

set_bridge_device(bridge_device)

Sets the bridge device (dependency injection).

Parameters:

Name Type Description Default
bridge_device PymordialBridgeDevice

The bridge device instance to use.

required
Source code in src/pymordialblue/devices/ui_device.py
79
80
81
82
83
84
85
def set_bridge_device(self, bridge_device: PymordialBridgeDevice) -> None:
    """Sets the bridge device (dependency injection).

    Args:
         bridge_device: The bridge device instance to use.
    """
    self._bridge_device = bridge_device

shutdown()

Cleans up resources.

Source code in src/pymordialblue/devices/ui_device.py
87
88
89
def shutdown(self) -> None:
    """Cleans up resources."""
    pass

where_element(pymordial_element, pymordial_screenshot=None, max_tries=None, set_position=False, set_size=False)

Finds the coordinates of a PymordialElement on the screen.

Parameters:

Name Type Description Default
pymordial_element PymordialElement

The PymordialElement instance to locate.

required
pymordial_screenshot bytes | ndarray | None

Optional pre-captured screenshot (bytes or numpy array) to search within. If None, a new screenshot will be captured.

None
max_tries int | None

Maximum number of attempts to find the element. If None, it will retry indefinitely (useful for waiting on loading screens).

None
set_position bool

If True, updates the element's position attribute with the found coordinates.

False
set_size bool

If True, updates the element's size attribute with the found dimensions.

False

Returns:

Type Description
tuple[int, int] | None

A tuple containing (x, y) coordinates of the element's center if found,

tuple[int, int] | None

or None if the element was not found after the specified attempts.

Note

If max_tries is None, this method loops indefinitely until the element is found.

Source code in src/pymordialblue/devices/ui_device.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def where_element(
    self,
    pymordial_element: PymordialElement,
    pymordial_screenshot: "bytes | np.ndarray | None" = None,
    max_tries: int | None = None,
    set_position: bool = False,
    set_size: bool = False,
) -> tuple[int, int] | None:
    """Finds the coordinates of a PymordialElement on the screen.

    Args:
        pymordial_element: The PymordialElement instance to locate.
        pymordial_screenshot: Optional pre-captured screenshot (bytes or
            numpy array) to search within. If None, a new screenshot will
            be captured.
        max_tries: Maximum number of attempts to find the element. If None,
            it will retry indefinitely (useful for waiting on loading screens).
        set_position: If True, updates the element's `position` attribute
            with the found coordinates.
        set_size: If True, updates the element's `size` attribute with the
            found dimensions.

    Returns:
        A tuple containing (x, y) coordinates of the element's center if found,
        or None if the element was not found after the specified attempts.

    Note:
        If max_tries is None, this method loops indefinitely until the element
        is found.
    """
    self.logger.debug(f"Looking for PymordialElement: {pymordial_element.label}...")
    if max_tries is None:
        max_tries = self.config["default_find_ui_retries"]

    find_ui_retries: int = 0
    pymordial_screenshot = pymordial_screenshot

    while (find_ui_retries < max_tries) if max_tries is not None else True:
        # Capture screen if we don't have an image to check
        if pymordial_screenshot is None:
            # Ensures ADB is connected
            if not self._bridge_device.is_connected():
                self._bridge_device.connect()
                if not self._bridge_device.is_connected():
                    raise ValueError("ADB is not connected")

            try:
                pymordial_screenshot = self._bridge_device.capture_screen()
                if pymordial_screenshot is None:
                    self.logger.warning("Failed to capture screen.")
            except TcpTimeoutException:
                raise TcpTimeoutException(
                    f"TCP timeout while finding element {pymordial_element.label}"
                )
            except Exception as e:
                self.logger.error(f"Error capturing screen: {e}")

        if pymordial_screenshot is not None:
            if isinstance(pymordial_element, PymordialImage):
                ui_location = None
                try:
                    if isinstance(pymordial_screenshot, bytes):
                        haystack_img = Image.open(BytesIO(pymordial_screenshot))
                    elif isinstance(pymordial_screenshot, np.ndarray):
                        haystack_img = Image.fromarray(pymordial_screenshot)
                    elif isinstance(pymordial_screenshot, Image.Image):
                        haystack_img = pymordial_screenshot
                    else:
                        # Should not happen if capture_screen returns correct types
                        self.logger.warning(
                            f"Unsupported image type: {type(pymordial_screenshot)}. Attempting to open as file path if string."
                        )
                        if isinstance(pymordial_screenshot, str):
                            haystack_img = Image.open(pymordial_screenshot)
                        else:
                            raise ValueError(
                                f"Unsupported image type: {type(pymordial_screenshot)}"
                            )

                    # Scale the needle image to match current resolution
                    scaled_img = self.scale_img_to_screen(
                        image_path=pymordial_element.filepath,
                        screen_image=haystack_img,
                        bluestacks_resolution=pymordial_element.og_resolution,
                    )

                    # Prepare OpenCV images
                    haystack_cv = cv2.cvtColor(
                        np.array(haystack_img), cv2.COLOR_RGB2BGR
                    )
                    needle_cv = cv2.cvtColor(
                        np.array(scaled_img), cv2.COLOR_RGB2BGR
                    )

                    if pymordial_element.region:
                        left, top, width, height = pymordial_element.region
                        haystack_cv = haystack_cv[
                            top : top + height, left : left + width
                        ]
                    else:
                        left, top = 0, 0

                    result = cv2.matchTemplate(
                        haystack_cv, needle_cv, cv2.TM_CCOEFF_NORMED
                    )
                    _, max_val, _, max_loc = cv2.minMaxLoc(result)

                    if max_val >= pymordial_element.confidence:
                        # max_loc is (x, y) relative to the region
                        match_x = max_loc[0] + left
                        match_y = max_loc[1] + top
                        match_w, match_h = scaled_img.size

                        ui_location = (match_x, match_y, match_w, match_h)
                    else:
                        ui_location = None

                except Exception as e:
                    self.logger.error(
                        f"Error finding element {pymordial_element.label}: {e}"
                    )

                if ui_location:
                    # coords = center(ui_location) -> (x + w//2, y + h//2)
                    coords = (
                        ui_location[0] + ui_location[2] // 2,
                        ui_location[1] + ui_location[3] // 2,
                    )
                    self.logger.debug(
                        f"PymordialImage {pymordial_element.label} found at: {coords}"
                    )

                    if set_position:
                        # ui_location is (left, top, width, height)
                        pymordial_element.position = (
                            ui_location[0],
                            ui_location[1],
                        )
                        self.logger.debug(
                            f"Updated position for {pymordial_element.label} to {pymordial_element.position}"
                        )

                    if set_size:
                        # ui_location is (left, top, width, height)
                        pymordial_element.size = (ui_location[2], ui_location[3])
                        self.logger.debug(
                            f"Updated size for {pymordial_element.label} to {pymordial_element.size}"
                        )

                    return coords
            else:
                raise NotImplementedError(
                    f"Element type: {type(pymordial_element)} is not supported."
                )

        # Prepare for next retry
        find_ui_retries += 1
        pymordial_screenshot = None  # Force capture on next iteration

        if max_tries is not None and find_ui_retries >= max_tries:
            break

        self.logger.debug(
            f"PymordialImage {pymordial_element.label} not found. Retrying... ({find_ui_retries}/{max_tries})"
        )
        sleep(self.config["default_wait_time"])

    self.logger.info(
        f"Wasn't able to find PymordialImage within {max_tries} retries: {pymordial_element.label}"
    )
    return None

where_elements(pymordial_elements, pymordial_screenshot=None, max_tries=None)

Finds the coordinates of the first found element from a list.

Parameters:

Name Type Description Default
pymordial_elements list[PymordialElement]

A list of PymordialElement objects to search for.

required
pymordial_screenshot bytes | ndarray | None

Optional pre-captured screenshot (bytes or numpy array) to avoid capturing a new one.

None
max_tries int | None

Maximum number of retries for each element in the list.

None

Returns:

Type Description
tuple[int, int] | None

A tuple containing (x, y) coordinates of the first element successfully

tuple[int, int] | None

located, or None if no elements from the list were found.

Source code in src/pymordialblue/devices/ui_device.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def where_elements(
    self,
    pymordial_elements: list[PymordialElement],
    pymordial_screenshot: "bytes | np.ndarray | None" = None,
    max_tries: int | None = None,
) -> tuple[int, int] | None:
    """Finds the coordinates of the first found element from a list.

    Args:
        pymordial_elements: A list of PymordialElement objects to search for.
        pymordial_screenshot: Optional pre-captured screenshot (bytes or
            numpy array) to avoid capturing a new one.
        max_tries: Maximum number of retries for each element in the list.

    Returns:
        A tuple containing (x, y) coordinates of the first element successfully
        located, or None if no elements from the list were found.
    """
    if max_tries is None:
        max_tries = self.config["default_find_ui_retries"]
    for pymordial_element in pymordial_elements:
        coord: tuple[int, int] | None = self.where_element(
            pymordial_element=pymordial_element,
            pymordial_screenshot=pymordial_screenshot,
            max_tries=max_tries,
        )
        if coord is not None:
            return coord
    return None

Tesseract Device

pymordialblue.devices.tesseract_device

Tesseract OCR implementation (requires Tesseract installation).

PymordialTesseractDevice

Bases: PymordialOCRDevice

Tesseract OCR implementation.

Advantages: - Lightweight (~50MB) - Fast CPU-only inference - Good accuracy for clean text - Cross-platform

Attributes:

Name Type Description
config

Tesseract configuration string.

Source code in src/pymordialblue/devices/tesseract_device.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class PymordialTesseractDevice(PymordialOCRDevice):
    """Tesseract OCR implementation.

    Advantages:
    - Lightweight (~50MB)
    - Fast CPU-only inference
    - Good accuracy for clean text
    - Cross-platform

    Attributes:
        config: Tesseract configuration string.
    """

    def __init__(self, config: str = DEFAULT_CONFIG):
        """Initializes Tesseract OCR.

        Args:
            config: Tesseract configuration string.
        """
        self.config = config

        # Check for configured Tesseract path
        tesseract_cmd = _CONFIG["extract_strategy"]["tesseract"].get("tesseract_cmd")
        if tesseract_cmd and Path(tesseract_cmd).exists():
            pytesseract.pytesseract.tesseract_cmd = str(tesseract_cmd)
            logger.info(f"Using configured Tesseract: {tesseract_cmd}")
            return

        # Check for bundled Tesseract (fallback)
        bundled_tess = (
            Path(__file__).parent.parent / "bin" / "tesseract" / "tesseract.exe"
        )
        if bundled_tess.exists():
            pytesseract.pytesseract.tesseract_cmd = str(bundled_tess)
            logger.info(f"Using bundled Tesseract: {bundled_tess}")

    def extract_text(
        self,
        image_path: "Path | bytes | str | np.ndarray",
        strategy: PymordialExtractStrategy | None = None,
    ) -> str:
        """Extracts text from an image using Tesseract with optional preprocessing.

        Args:
            image_path: Path to image file, image bytes, numpy array, or a string path.
            strategy: Optional PymordialExtractStrategy instance. If None, a
                DefaultExtractStrategy is used, providing generic preprocessing
                suitable for any image.

        Returns:
            The extracted text.

        Raises:
            ValueError: If the image cannot be processed.
        """
        try:
            # Load image
            image = self._load_image(image_path)
            # Choose strategy
            if strategy is None:
                strategy = DefaultExtractStrategy()
            # Preprocess image using the strategy
            processed = strategy.preprocess(image)
            # Use strategy-provided Tesseract config (fallback to self.config)
            config = strategy.tesseract_config() or self.config
            # Extract text using Tesseract
            text = pytesseract.image_to_string(processed, config=config)
            return text.strip()
        except Exception as e:
            logger.error(f"Error extracting text with Tesseract: {e}")
            raise ValueError(f"Failed to extract text: {e}")

    def find_text(
        self,
        search_text: str,
        image_path: "Path | bytes | str | np.ndarray",
        strategy: PymordialExtractStrategy | None = None,
    ) -> tuple[int, int] | None:
        """Finds the coordinates (center) of the specified text in the image.

        Args:
            search_text: Text to search for.
            image_path: Path to image file, image bytes, or numpy array.
            strategy: Optional preprocessing strategy.

        Returns:
            (x, y) coordinates of the center of the found text, or None if not found.
        """
        try:
            image = self._load_image(image_path)
            if strategy is None:
                strategy = DefaultExtractStrategy()
            processed = strategy.preprocess(image)
            config = strategy.tesseract_config() or self.config

            data = pytesseract.image_to_data(
                processed, config=config, output_type=pytesseract.Output.DICT
            )

            search_text_lower = search_text.lower()
            n_boxes = len(data["text"])
            for i in range(n_boxes):
                # Check if confidence is sufficient (e.g. > 0) to avoid noise
                if int(data["conf"][i]) > 0:
                    text = data["text"][i].strip().lower()
                    if search_text_lower in text:
                        x, y, w, h = (
                            data["left"][i],
                            data["top"][i],
                            data["width"][i],
                            data["height"][i],
                        )
                        center_x = x + w // 2
                        center_y = y + h // 2
                        return (center_x, center_y)
            return None
        except Exception as e:
            logger.error(f"Error finding text with Tesseract: {e}")
            return None

    def _load_image(self, image_path: "Path | bytes | str | np.ndarray") -> np.ndarray:
        """Loads image from path, bytes, or numpy array.

        Args:
            image_path: Path to image file, image bytes, numpy array, or a string path.

        Returns:
            The loaded image as a numpy array.

        Raises:
            ValueError: If the image cannot be read.
        """
        if isinstance(image_path, np.ndarray):
            return image_path
        if isinstance(image_path, bytes):
            nparr = np.frombuffer(image_path, np.uint8)
            image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        else:
            image = cv2.imread(str(image_path))

        if image is None:
            raise ValueError(f"Could not read image from {image_path}")

        return image

__init__(config=DEFAULT_CONFIG)

Initializes Tesseract OCR.

Parameters:

Name Type Description Default
config str

Tesseract configuration string.

DEFAULT_CONFIG
Source code in src/pymordialblue/devices/tesseract_device.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, config: str = DEFAULT_CONFIG):
    """Initializes Tesseract OCR.

    Args:
        config: Tesseract configuration string.
    """
    self.config = config

    # Check for configured Tesseract path
    tesseract_cmd = _CONFIG["extract_strategy"]["tesseract"].get("tesseract_cmd")
    if tesseract_cmd and Path(tesseract_cmd).exists():
        pytesseract.pytesseract.tesseract_cmd = str(tesseract_cmd)
        logger.info(f"Using configured Tesseract: {tesseract_cmd}")
        return

    # Check for bundled Tesseract (fallback)
    bundled_tess = (
        Path(__file__).parent.parent / "bin" / "tesseract" / "tesseract.exe"
    )
    if bundled_tess.exists():
        pytesseract.pytesseract.tesseract_cmd = str(bundled_tess)
        logger.info(f"Using bundled Tesseract: {bundled_tess}")

extract_text(image_path, strategy=None)

Extracts text from an image using Tesseract with optional preprocessing.

Parameters:

Name Type Description Default
image_path Path | bytes | str | ndarray

Path to image file, image bytes, numpy array, or a string path.

required
strategy PymordialExtractStrategy | None

Optional PymordialExtractStrategy instance. If None, a DefaultExtractStrategy is used, providing generic preprocessing suitable for any image.

None

Returns:

Type Description
str

The extracted text.

Raises:

Type Description
ValueError

If the image cannot be processed.

Source code in src/pymordialblue/devices/tesseract_device.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def extract_text(
    self,
    image_path: "Path | bytes | str | np.ndarray",
    strategy: PymordialExtractStrategy | None = None,
) -> str:
    """Extracts text from an image using Tesseract with optional preprocessing.

    Args:
        image_path: Path to image file, image bytes, numpy array, or a string path.
        strategy: Optional PymordialExtractStrategy instance. If None, a
            DefaultExtractStrategy is used, providing generic preprocessing
            suitable for any image.

    Returns:
        The extracted text.

    Raises:
        ValueError: If the image cannot be processed.
    """
    try:
        # Load image
        image = self._load_image(image_path)
        # Choose strategy
        if strategy is None:
            strategy = DefaultExtractStrategy()
        # Preprocess image using the strategy
        processed = strategy.preprocess(image)
        # Use strategy-provided Tesseract config (fallback to self.config)
        config = strategy.tesseract_config() or self.config
        # Extract text using Tesseract
        text = pytesseract.image_to_string(processed, config=config)
        return text.strip()
    except Exception as e:
        logger.error(f"Error extracting text with Tesseract: {e}")
        raise ValueError(f"Failed to extract text: {e}")

find_text(search_text, image_path, strategy=None)

Finds the coordinates (center) of the specified text in the image.

Parameters:

Name Type Description Default
search_text str

Text to search for.

required
image_path Path | bytes | str | ndarray

Path to image file, image bytes, or numpy array.

required
strategy PymordialExtractStrategy | None

Optional preprocessing strategy.

None

Returns:

Type Description
tuple[int, int] | None

(x, y) coordinates of the center of the found text, or None if not found.

Source code in src/pymordialblue/devices/tesseract_device.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def find_text(
    self,
    search_text: str,
    image_path: "Path | bytes | str | np.ndarray",
    strategy: PymordialExtractStrategy | None = None,
) -> tuple[int, int] | None:
    """Finds the coordinates (center) of the specified text in the image.

    Args:
        search_text: Text to search for.
        image_path: Path to image file, image bytes, or numpy array.
        strategy: Optional preprocessing strategy.

    Returns:
        (x, y) coordinates of the center of the found text, or None if not found.
    """
    try:
        image = self._load_image(image_path)
        if strategy is None:
            strategy = DefaultExtractStrategy()
        processed = strategy.preprocess(image)
        config = strategy.tesseract_config() or self.config

        data = pytesseract.image_to_data(
            processed, config=config, output_type=pytesseract.Output.DICT
        )

        search_text_lower = search_text.lower()
        n_boxes = len(data["text"])
        for i in range(n_boxes):
            # Check if confidence is sufficient (e.g. > 0) to avoid noise
            if int(data["conf"][i]) > 0:
                text = data["text"][i].strip().lower()
                if search_text_lower in text:
                    x, y, w, h = (
                        data["left"][i],
                        data["top"][i],
                        data["width"][i],
                        data["height"][i],
                    )
                    center_x = x + w // 2
                    center_y = y + h // 2
                    return (center_x, center_y)
        return None
    except Exception as e:
        logger.error(f"Error finding text with Tesseract: {e}")
        return None