Browse Source

Pretty functional debugger, still trying to figure out wh

- V-blank can cause two distinct interrupts: the STAT interrupt, and the VBLANK interrupt. The former is enabled by writing to the STATE register, and the latter is enabled by writing to the IE register. Previously, we were only emitting the STAT interrupt.
- Something's still up with the joypad; A seems to act like the right directional.
- Implemented stepping a little differently, with the CPU now having a `trace` and `step` property (I think this is simpler than introducing a fake CPU state).
master
Zack Marvel 9 months ago
parent
commit
e629758fe4
  1. 4
      README.md
  2. 38
      scripts/reconstruct_screen.py
  3. 69
      slowboy/debug/debug_thread.py
  4. 360
      slowboy/debug/debugger.py
  5. 1
      slowboy/debug/message_handler.py
  6. 4
      slowboy/debug/message_protocol.py
  7. 136
      slowboy/debug/messages.py
  8. 2
      slowboy/gfx.py
  9. 38
      slowboy/gpu.py
  10. 2
      slowboy/interrupts.py
  11. 150
      slowboy/mmu.py
  12. 38
      slowboy/ui.py
  13. 94
      slowboy/z80.py
  14. 2
      test_roms/Makefile
  15. 2
      test_roms/scripts/image2tilemap.py
  16. 3
      tests/test_z80.py

4
README.md

@ -57,7 +57,7 @@ I develop on Linux---these Linux distributions package at least libsdl2 version
After cloning the repository, you can install the project for development with
```
$ pip install --editable .
$ pip install --editable .\[dev\]
```
You can run the tests from the root of the project with
@ -82,4 +82,4 @@ reason. The workaround is just running the tests with `unittest`:
```
$ python -m unittest discover tests
```
```

38
scripts/reconstruct_screen.py

@ -0,0 +1,38 @@
# +
import argparse as ap
from pathlib import Path
from PIL import Image
import numpy as np
# +
# parser = ap.ArgumentParser()
# parser.add_argument('tileset_image')
# parser.add_argument('tilemap')
# args = parser.parse_args()
base_path = Path('/home/zack/src/slowboy')
tileset_image = base_path / 'tiledata.bmp'
tilemap = base_path / 'tilemap.bin'
# -
tile_image = np.array(Image.open(tileset_image).getdata())
# tile_image.shape
tile_image.dtype
24576 // 8**2
_ // 8
sheet_width = 8
sheet_height = tile_image.shape[0] // sheet_width
# +
tiles = []
for row in range(sheet_height):
for col in range(sheet_width):
tiles.append(tile_image[row*8:(row+1)*8,col*8:(col+1)*8,:])
tiles

69
slowboy/debug/debug_thread.py

@ -12,7 +12,8 @@ from slowboy.debug.messages import (Command, ShutdownCommand, StepCommand,
ReadRegisterCommand, ReadMemoryCommand,
DumpTilesCommand, UpdateTilesCommand,
Response, ReadRegisterResponse,
commands)
commands, ReadMemoryResponse, SetWatchpointCommand,
HitWatchpointResponse)
from slowboy.debug.exceptions import UnrecognizedCommandException
@ -61,38 +62,48 @@ class DebugMessageReceiver(MessageHandler):
pass
elif msg.code == StepCommand.code:
# TODO this is probably not atomic
self.cpu.step = True
self.cpu.trace = True
elif msg.code == ContinueCommand.code:
self.cpu.step = False
self.cpu.trace = False
elif msg.code == SetBreakpointCommand.code:
elif isinstance(msg, SetBreakpointCommand):
self.cpu.set_breakpoint(msg.address)
elif msg.code == ReadRegisterCommand.code:
value = self.cpu.read_register(ReadRegisterCommand.decode_register(
msg.register))
elif isinstance(msg, ReadRegisterCommand):
value = self.cpu.read_register(msg.register)
self.resp_queue.put_nowait(
ReadRegisterResponse(msg.register, value))
elif msg.code == ReadMemoryCommand.code:
elif isinstance(msg, ReadMemoryCommand):
addr = msg.address
length = msg.length
for i in range(addr, length):
print(self.cpu.mmu.get_addr(addr))
buf = bytearray(length)
for i in range(length):
buf[i] = self.cpu.mmu.get_addr(addr+i)
self.resp_queue.put_nowait(
ReadMemoryResponse(addr, bytes(buf)))
elif msg.code == DumpTilesCommand.code:
print('Dumping GPU tiles')
self.cpu.gpu.dump_tileset('tileset.bmp')
self.cpu.gpu.dump_background('background.bmp')
self.cpu.gpu.dump_foreground('foreground.bmp')
buf = bytearray(0x1800)
self.cpu.gpu.dump_tile_memory(buf)
self.dump_mem(buf, 0x8000)
# self.cpu.gpu.dump_tileset('tileset.bmp')
# self.cpu.gpu.dump_background('background.bmp')
# self.cpu.gpu.dump_foreground('foreground.bmp')
# buf = bytearray(0x1800)
# self.cpu.gpu.dump_tile_memory(buf)
# self.dump_mem(buf, 0x8000)
self.cpu.gpu.dump_regs()
elif msg.code == UpdateTilesCommand.code:
for i in range(128):
self.cpu.gpu._update_tile(i)
elif isinstance(msg, SetWatchpointCommand):
self.cpu.mmu.add_watchpoint(msg.addr, msg.read, partial(self.hit_watchpoint, msg.addr))
else:
raise UnrecognizedCommandException()
def hit_watchpoint(self, addr, value, read=False):
# Called from the "main" (UI) thread's context, so we can't just touch resp_queue directly
self.loop.call_soon_threadsafe(
lambda: self.resp_queue.put_nowait(HitWatchpointResponse(addr, value, read)))
# Now put the CPU in trace mode
self.cpu.trace = True
class DebugMessageSender(MessageHandler):
@ -101,7 +112,6 @@ class DebugMessageSender(MessageHandler):
self.protocol_list = protocol_list
def handle_message(self, msg: Response):
print('DebugMessageSender got', msg)
for protocol in self.protocol_list:
protocol.send_message(msg)
@ -127,6 +137,8 @@ class DebugThread(threading.Thread):
def stop(self):
print('DebugThread begins shutdown')
self.loop.call_soon_threadsafe(self.sender.stop)
self.loop.call_soon_threadsafe(self.receiver.stop)
self.loop.call_soon_threadsafe(self.server.close)
def run(self):
print('DebugThread started')
@ -139,26 +151,17 @@ class DebugThread(threading.Thread):
lambda p: protocols.append(p)),
host=host, port=port,
reuse_address=True)
server = loop.run_until_complete(coro)
self.server = server
receiver = DebugMessageReceiver(loop, self.cmd_q, self.resp_q, self.cpu)
receiver_task = receiver.start()
#receiver_task = self.start()
#register_shutdown_callback(lambda: self.server.close())
#register_shutdown_callback(lambda: self.loop.stop())
sender = DebugMessageSender(loop, self.resp_q, protocols)
sender_task = sender.start()
sender.register_shutdown_callback(lambda: server.close())
sender.register_shutdown_callback(lambda: loop.stop())
self.sender = sender
self.server = loop.run_until_complete(coro)
self.receiver = DebugMessageReceiver(loop, self.cmd_q, self.resp_q, self.cpu)
receiver_task = self.receiver.start()
self.sender = DebugMessageSender(loop, self.resp_q, protocols)
sender_task = self.sender.start()
print('DebugServer started on {}'.format(self.debug_address))
#loop.run_forever()
asyncio.gather(sender_task, receiver_task, loop=loop)
loop.run_until_complete(server.wait_closed())
loop.run_until_complete(self.server.wait_closed())
print('DebugThread finished')

360
slowboy/debug/debugger.py

@ -1,12 +1,24 @@
import asyncio
import bisect
import dataclasses
import functools
import sys
from functools import partial
import io
from functools import partial
from typing import Callable, List, Dict, Type, Optional
import numpy as np
from PIL import Image
from slowboy.debug.exceptions import UnrecognizedCommandException
from slowboy.debug.message_handler import MessageHandler
from slowboy.debug.message_protocol import MessageProtocol
from slowboy.debug.messages import (Response, responses, ShutdownCommand, StepCommand,
ContinueCommand, SetBreakpointCommand, ReadRegisterCommand,
ReadMemoryCommand, REGISTERS, HitBreakpointResponse,
ReadRegisterResponse, ReadMemoryResponse, SetWatchpointCommand,
HitWatchpointResponse)
from messages import *
from exceptions import *
from message_protocol import MessageProtocol
from message_handler import MessageHandler
# ------------------------------------------------------------------------------
class ClientProtocol(MessageProtocol):
@ -25,35 +37,194 @@ def split_command(cmd: str):
# ------------------------------------------------------------------------------
def validate_register(reg: str):
return reg.lower() in registers
return reg.lower() in REGISTERS
def get_command(cmd: str):
"""Turns input from stdin into Command objects
"""
if cmd.startswith('q'):
return ShutdownCommand()
elif cmd.startswith('s'):
return StepCommand()
elif cmd.startswith('c'):
return ContinueCommand()
elif cmd.startswith('b'):
cmd, addr = split_command(cmd)
return SetBreakpointCommand(int(addr))
elif cmd.startswith('reg'):
cmd, reg = split_command(cmd)
if not validate_register(reg):
print("Unrecognized register: {}".format(reg))
return ReadRegisterCommand(reg)
elif cmd.startswith('x') or cmd.startswith('ex'):
cmd, addr, length = split_command(cmd)
return ReadMemoryCommand(int(addr), int(length))
elif cmd == 'gpu_dump':
return DumpTilesCommand()
elif cmd == 'update_tiles':
return UpdateTilesCommand()
def int_or_hex(s: str) -> int:
return int(s, 16 if s.startswith('0x') else 10)
# Keep these in an sorted list for now. Since things are only inserted when
# this module is initially loaded, the cost of insertion sort probably doesn't
# matter. If needed, we could use a trie or something (since I'd like to support
# e.g. `c` and `cont` as shorthand for `continue`.
_cli_commands = [] # List[str]
_cli_command_attrs = [] # List[CLICommandAttributes]
def cli_command_search(name, *args):
# TODO this only handles subcommand depth of at most 1
name = tuple(p for p in name.split(' ') if p != '')
i = bisect.bisect_left(_cli_commands, name[0])
if _cli_commands[i] != name[0]:
raise KeyError(f'Command {name} not found')
attrs = _cli_command_attrs[i]
if len(name) > 1 and name[1] in attrs.subcommands:
return attrs.subcommands[name[1]].func(*(name[2:]))
else:
raise UnrecognizedCommandException(cmd)
return attrs.func(*(name[1:] + args))
@dataclasses.dataclass
class CLICommandAttributes:
func: Callable
nargs: int
arg_types: List[Type]
description: str
help: str
subcommands: Dict[str, 'CLICommandAttributes'] = dataclasses.field(
default_factory=dict)
def cli_command(command: str, nargs: int = 0, arg_types: Optional[List] = None,
description: str = '', help: str = ''):
"""Decorator for registering CLI commands.
Args:
command: Command name.
nargs: Number of arguments.
arg_types: List of argument types.
description: Summary of command.
help: Detailed help.
"""
if arg_types is None:
arg_types = []
# Register the command
i = bisect.bisect_left(_cli_commands, command)
if i < len(_cli_commands) and _cli_commands[i] == command:
raise UnrecognizedCommandException(
f'{command} already exists in command dictionary')
_cli_commands.insert(i, command)
if help == '':
help = description
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args):
# noinspection PyShadowingNames
converted_args = tuple(arg_types[i](arg) for i, arg in enumerate(args))
return func(*converted_args)
_cli_command_attrs.insert(i, CLICommandAttributes(
func=wrapper,
nargs=nargs,
arg_types=arg_types,
description=description,
help=help
))
return wrapper
return decorator
def cli_subcommand(command: str, parent: str, nargs: int = 0,
arg_types: Optional[List] = None, description: str = '', help: str = ''):
"""Decorator for registering subcommands under existing CLI commands.
Args:
command: Subcommand name.
parent: Name of parent command.
nargs: Number of arguments.
arg_types: List of argument types.
description: Command brief.
help: Detailed description.
"""
if arg_types is None:
arg_types = []
i = bisect.bisect_left(_cli_commands, parent)
if help == '':
help = description
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args):
# noinspection PyShadowingNames
converted_args = tuple(arg_types[i](arg) for i, arg in enumerate(args))
return func(*converted_args)
_cli_command_attrs[i].subcommands[command] = CLICommandAttributes(
func=wrapper,
nargs=nargs,
arg_types=arg_types,
description=description,
help=help
)
return wrapper
return decorator
def cli_help():
for cmd, attrs in zip(_cli_commands, _cli_command_attrs):
print(f'{cmd} - {attrs.description}')
if attrs.help:
print(f' {attrs.help}')
for subcmd, subattrs in attrs.subcommands.items():
print(f' {subcmd} - {subattrs.description}')
if subattrs.help:
print(f' {subattrs.help}')
@cli_command('help')
def command_help():
cli_help()
@cli_command('quit', description='Quit the debugger.')
def command_quit():
return [ShutdownCommand()]
@cli_command('step', description='In trace mode, steps forward one instruction.')
def command_step():
return [StepCommand()]
@cli_command('continue', description=('In trace mode, continue until a breakpoint is hit or the '
'system halts on its own.'))
def command_continue():
return [ContinueCommand()]
@cli_command('breakpoint', nargs=1, arg_types=[int_or_hex], description='Add a breakpoint.')
def command_breakpoint(addr: int):
return [SetBreakpointCommand(addr)]
@cli_command('reg', nargs=1, arg_types=[str], description='Get the value of a register')
def command_reg(reg: str):
if not validate_register(reg):
print("Unrecognized register: {}".format(reg))
return [ReadRegisterCommand(reg)]
@cli_command('examine', nargs=2, arg_types=[int_or_hex, int],
description='Examine a region of memory',
help='examine <addr> <length>')
def command_examine(addr, length):
return [ReadMemoryCommand(int(addr), int(length))]
@cli_command('gpu')
def command_gpu():
pass
@cli_subcommand('dump', 'gpu', description='Dump GPU tiles and memory')
def command_gpu_dump():
return [ReadMemoryCommand(0x8000, 0x2000)]
@cli_command('watchw', nargs=1, arg_types=[int_or_hex],
description='Watch a byte in memory for writes')
def command_watchw(addr):
return [SetWatchpointCommand(addr, False)]
@cli_command('watchrw', nargs=1, arg_types=[int_or_hex],
description='Watch a byte in memory for reads or writes')
def command_watchrw(addr):
return [SetWatchpointCommand(addr, True)]
# ------------------------------------------------------------------------------
@ -61,10 +232,11 @@ def stdin_handler(readable: io.IOBase, transport: asyncio.Transport,
protocol: asyncio.Protocol, close_cb: Callable[[], None]):
line = readable.readline().rstrip()
cmd = get_command(line) if line else ShutdownCommand()
protocol.send_message(cmd)
cmds = cli_command_search(line) if line else [ShutdownCommand()]
for cmd in cmds:
protocol.send_message(cmd)
if cmd.code == ShutdownCommand.code:
if any(cmd.code == ShutdownCommand.code for cmd in cmds):
transport.write_eof()
close_cb()
@ -77,46 +249,106 @@ class ResponseReceiver(MessageHandler):
def handle_message(self, response: Response):
"""Overridden method called by message handler
"""
if response.code == HitBreakpointResponse.code:
if isinstance(response, HitBreakpointResponse):
print('Hit breakpoint at {:#0x}'.format(response.address))
elif response.code == ReadRegisterResponse.code:
elif isinstance(response, ReadRegisterResponse):
print('Register {}: {:#0x}'.format(response.register, response.value))
elif response.code == ReadMemoryResponse.code:
formatted_values = ' '.join('{:#0x}'.format(value) for value in response.values)
print('Address {:#0x}: {}'.format(response.address,
formatted_values))
elif isinstance(response, ReadMemoryResponse):
mem = response.values
addr = response.address
if addr == 0x8000 and len(mem) == 0x2000:
# Dump GPU memory
tile_data = mem[:0x1800]
tile_map = mem[0x1800:]
self._save_tile_data(tile_data, 'tiledata.bmp')
print('Saved tiles to tiledata.bmp')
self._save_tile_map(tile_map, 'tilemap.bin')
print('Saved tile map to tilemap.bin')
else:
print(f'Address {addr:#0x}: {mem.hex()}')
elif isinstance(response, HitWatchpointResponse):
if response.read:
print(f'Read watchpoint {response.addr:x}: {response.value:x}')
else:
print(f'Wrote watchpoint {response.addr:x}: {response.value:x}')
else:
print(f'Unrecognized response: {response}')
_DEFAULT_PALETTE = [0, 0xff // 4, (0xff // 4) * 2, 0xff]
@staticmethod
def _decode_tile_data(tile_data: bytes) -> np.ndarray:
from slowboy.gfx import decode_tile
tiles = np.frombuffer(tile_data, dtype=np.uint8) \
.reshape((-1, 16))
decoded_tiles = np.empty((tiles.shape[0], 8, 8), dtype=np.uint32)
for i in range(tiles.shape[0]):
encoded_tile = tiles[i, :]
tile = decode_tile(encoded_tile, ResponseReceiver._DEFAULT_PALETTE)
# "convert" to RGBA
tile = np.repeat(tile, 4)
# set alpha=0xff
tile[3::4] = 0xff
tile = np.frombuffer(tile, dtype=np.uint32).reshape((8, 8))
decoded_tiles[i, :, :] = tile[:, :]
return decoded_tiles
_SHEET_WIDTH = 16
@staticmethod
def _save_tile_data(tile_data: bytes, filename: str):
decoded_tiles = ResponseReceiver._decode_tile_data(tile_data)
# Make the tiles into an image
# Number of rows of tiles in the tile sheet
sheet_cols = ResponseReceiver._SHEET_WIDTH
sheet_rows = decoded_tiles.shape[0] // sheet_cols
out_image = np.empty(0, dtype=np.uint32)
for row in range(sheet_rows):
for tile_row in range(decoded_tiles[0].shape[1]):
row_of_tiles = decoded_tiles[row*sheet_cols:(row+1)*sheet_cols, tile_row, :]
out_image = np.append(out_image, row_of_tiles.flatten())
print(set(out_image))
img = Image.frombytes('RGBA', (sheet_cols * 8, sheet_rows * 8), out_image.tobytes())
img.save(filename)
@staticmethod
def _save_tile_map(tile_map: bytes, filename: str):
with open(filename, 'wb') as f:
f.write(tile_map)
# ------------------------------------------------------------------------------
def main():
loop = asyncio.get_event_loop()
# Connect to the server
resp_queue = asyncio.Queue()
receiver = ResponseReceiver(loop, resp_queue)
loop = asyncio.get_event_loop()
# Connect to the server
resp_queue = asyncio.Queue()
receiver = ResponseReceiver(loop, resp_queue)
transport, protocol = loop.run_until_complete(
loop.create_connection(partial(ClientProtocol, resp_queue),
host='127.0.0.1', port=9099))
transport, protocol = loop.run_until_complete(
loop.create_connection(partial(ClientProtocol, resp_queue),
host='127.0.0.1', port=9099))
receiver_task = receiver.start()
receiver_task = receiver.start()
def shutdown_callback():
receiver.stop()
loop.stop()
print('event loop shutdown')
def shutdown_callback():
receiver.stop()
loop.stop()
print('event loop shutdown')
loop.add_reader(sys.stdin, stdin_handler, sys.stdin,
transport, protocol, shutdown_callback)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Shutdown signal received')
receiver.stop()
finally:
tasks = asyncio.Task.all_tasks(loop=loop)
loop.run_until_complete(*tasks)
loop.stop()
loop.close()
loop.add_reader(sys.stdin, stdin_handler, sys.stdin,
transport, protocol, shutdown_callback)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Shutdown signal received')
receiver.stop()
finally:
tasks = asyncio.Task.all_tasks(loop=loop)
loop.run_until_complete(*tasks)
loop.stop()
loop.close()
if __name__ == '__main__':
main()

1
slowboy/debug/message_handler.py

@ -50,7 +50,6 @@ class MessageHandler(metaclass=abc.ABCMeta):
print('Started {}'.format(self))
while True:
msg = await self.queue.get()
print('handle_messages: dequeued {}'.format(msg))
self.handle_message(msg)
except asyncio.CancelledError:
print('Shutting down {}'.format(self))

4
slowboy/debug/message_protocol.py

@ -47,7 +47,6 @@ class MessageProtocol(asyncio.Protocol):
a message. Calls decode_message to get an object from the serialized
data, and adds the object to the queue.
"""
print('received {} bytes: {}'.format(len(data), data))
if self.rx_state == self.RxState.WAITING and len(data) > 0:
# The message could be segmented
buffered_len = len(self._rx_buffer)
@ -56,7 +55,6 @@ class MessageProtocol(asyncio.Protocol):
data = data[len(self._rx_buffer):]
if len(self._rx_buffer) == 4:
self._rx_code, = struct.unpack('!L', self._rx_buffer)
print('got code {}'.format(self._rx_code))
self.rx_state = self.RxState.GOT_CODE
self._rx_buffer = bytes()
if self.rx_state == self.RxState.GOT_CODE and len(data) > 0:
@ -66,7 +64,6 @@ class MessageProtocol(asyncio.Protocol):
data = data[len(self._rx_buffer):]
if len(self._rx_buffer) == 4:
self._rx_size, = struct.unpack('!L', self._rx_buffer)
print('got size', self._rx_size)
self.rx_state = self.RxState.GOT_SIZE
self._rx_buffer = bytes()
if self.rx_state == self.RxState.GOT_SIZE:
@ -82,7 +79,6 @@ class MessageProtocol(asyncio.Protocol):
if self.rx_state == self.RxState.GOT_PAYLOAD:
resp = self.decode_message(self._rx_code, self._rx_size,
self._rx_payload)
print('got payload {}'.format(resp))
self.rx_queue.put_nowait(resp)
self.rx_state = self.RxState.WAITING
self._rx_code = -1

136
slowboy/debug/messages.py

@ -1,16 +1,11 @@
import abc
from typing import Callable
import asyncio
import struct
import enum
from collections import deque
from slowboy.debug.exceptions import *
class Message(metaclass=abc.ABCMeta):
code = 0x00
def __init__(self, payload=bytes()):
self.payload = payload
@ -21,7 +16,7 @@ class Message(metaclass=abc.ABCMeta):
return struct.pack('!LL', self.code, len(self.payload))
@classmethod
def deserialize(cls, payload=bytes()):
def deserialize(cls, payload):
"""Default implementation. May be overridden for messages with non-empty
payloads.
"""
@ -30,10 +25,8 @@ class Message(metaclass=abc.ABCMeta):
# ------------------------------------------------------------------------------
class Command(Message):
pass
class Commands(enum.Enum):
INVALID_COMMAND = 0x00
SHUTDOWN_COMMAND = 0x01
STEP_COMMAND = 0x02
CONTINUE_COMMAND = 0x03
@ -42,6 +35,11 @@ class Commands(enum.Enum):
READ_MEMORY_COMMAND = 0x06
DUMP_TILES_COMMAND = 0x07
UPDATE_TILES_COMMAND = 0x08
SET_WATCHPOINT_COMMAND = 0x09
class Command(Message):
code = Commands.INVALID_COMMAND.value
class ShutdownCommand(Command):
code = Commands.SHUTDOWN_COMMAND.value
@ -59,7 +57,7 @@ class SetBreakpointCommand(Command):
code = Commands.SET_BREAKPOINT_COMMAND.value
def __init__(self, addr: int):
self.payload = struct.pack('!H', addr)
super().__init__(struct.pack('!H', addr))
@classmethod
def deserialize(cls, payload):
@ -70,38 +68,47 @@ class SetBreakpointCommand(Command):
return struct.unpack('!H', self.payload)[0]
registers = [
'a', 'b', 'c', 'd', 'e', 'f', 'h', 'l', # 8-bit registers
'bc', 'de', 'hl', 'sp', 'pc', # 16-bit registers
]
SINGLE_REGISTERS = ('a', 'b', 'c', 'd', 'e', 'f', 'h', 'l')
DOUBLE_REGISTERS = ('bc', 'de', 'hl', 'sp', 'pc')
REGISTERS = SINGLE_REGISTERS + DOUBLE_REGISTERS
class ReadRegisterCommand(Command):
code = Commands.READ_REGISTER_COMMAND.value
def __init__(self, reg: str):
self.payload = struct.pack('!B', ReadRegisterCommand.encode_register(reg))
super(Command, self).__init__(
ReadRegisterCommand.encode_register(reg))
self.register = reg
@classmethod
def deserialize(cls, payload):
regid = struct.unpack('!B', payload)[0]
return cls(ReadRegisterCommand.decode_register(regid))
return cls(ReadRegisterCommand.decode_register(payload))
@staticmethod
def encode_register(reg):
return registers.index(reg)
def encode_register(reg: str) -> bytes:
if reg in SINGLE_REGISTERS:
return reg.encode('ascii') + b'\x00'
elif reg in DOUBLE_REGISTERS:
return reg.encode('ascii')
else:
raise ValueError(f'Invalid register {reg}')
@staticmethod
def decode_register(regid):
return registers[regid]
@property
def register(self):
return struct.unpack('!B', self.payload)[0]
def decode_register(breg: bytes) -> str:
if breg[1] == b'\x00':
breg = breg[:1]
decoded = breg.decode('ascii')
if decoded not in REGISTERS:
raise ValueError(f'Invalid register {decoded}')
else:
return decoded
class ReadMemoryCommand(Command):
code = Commands.READ_MEMORY_COMMAND.value
def __init__(self, addr: int, length: int):
self.payload = struct.pack('!HH', addr, length)
super(Command, self).__init__(struct.pack('!HH', addr, length))
@classmethod
def deserialize(cls, payload):
@ -116,13 +123,35 @@ class ReadMemoryCommand(Command):
def length(self):
return struct.unpack('!HH', self.payload)[1]
class DumpTilesCommand(Command):
code = Commands.DUMP_TILES_COMMAND.value
class UpdateTilesCommand(Command):
code = Commands.UPDATE_TILES_COMMAND.value
class SetWatchpointCommand(Command):
code = Commands.SET_WATCHPOINT_COMMAND.value
def __init__(self, addr: int, read=False):
"""Watch a region of memory. By default, only watch writes.
Args:
addr: Address to watch.
read: Also watch for reads.
"""
super().__init__(struct.pack('!HH', addr, 1 if read else 0))
self.addr = addr
self.read = read
@classmethod
def deserialize(cls, payload):
addr, read = struct.unpack('!HH', payload)
return cls(addr, read == 1)
commands = [
ShutdownCommand,
StepCommand,
@ -132,6 +161,7 @@ commands = [
ReadMemoryCommand,
DumpTilesCommand,
UpdateTilesCommand,
SetWatchpointCommand,
]
@ -144,7 +174,7 @@ class HitBreakpointResponse(Response):
code = 0x01
def __init__(self, addr: int):
self.payload = struct.pack('!H', addr)
super(Response, self).__init__(struct.pack('!H', addr))
@classmethod
def deserialize(cls, payload):
@ -159,26 +189,34 @@ class ReadRegisterResponse(Response):
code = 0x02
def __init__(self, reg: str, value: int):
self.payload = struct.pack('!BB', reg, value)
super().__init__(self.encode_register(reg) + struct.pack('!H', value))
self.register = reg
self.value = value
@classmethod
def deserialize(cls, payload):
reg, value = struct.unpack('!BB', payload)
reg = payload[:2].decode('ascii').rstrip('\x00')
value, = struct.unpack('!H', payload[2:])
if reg not in REGISTERS:
raise ValueError(f'Invalid register {reg}')
return cls(reg, value)
@property
def register(self):
return struct.unpack('!BB', self.payload)[0]
@staticmethod
def encode_register(reg: str) -> bytes:
if reg in SINGLE_REGISTERS:
return reg.encode('ascii') + b'\x00'
elif reg in DOUBLE_REGISTERS:
return reg.encode('ascii')
else:
raise ValueError(f'Invalid register {reg}')
@property
def value(self):
return struct.unpack('!BB', self.payload)[1]
class ReadMemoryResponse(Response):
code = 0x03
def __init__(self, addr: int, values: bytes):
self.payload = struct.pack('!H{}B'.format(len(values)), addr, *values)
super(Response, self).__init__(
struct.pack('!H{}B'.format(len(values)), addr, *values))
@classmethod
def deserialize(cls, payload):
@ -190,17 +228,31 @@ class ReadMemoryResponse(Response):
@property
def address(self):
nvalues = len(self.payload) - 2
return struct.unpack('!H{}B'.format(nvalues), self.payload)[0]
return struct.unpack_from('!H', self.payload)[0]
@property
def values(self):
nvalues = len(self.payload) - 2
return struct.unpack('!H{}B'.format(nvalues), self.payload)[1:]
return self.payload[2:]
class HitWatchpointResponse(Response):
code = 0x04
def __init__(self, addr: int, value: int, read=False):
super().__init__(struct.pack('!HHH', addr, value, 1 if read else 0))
self.addr = addr
self.value = value
self.read = read
@classmethod
def deserialize(cls, payload):
addr, value, read = struct.unpack('!HHH', payload)
return cls(addr, value, read == 1)
responses = [
HitBreakpointResponse,
ReadRegisterResponse,
ReadMemoryResponse,
HitWatchpointResponse,
]

2
slowboy/gfx.py

@ -92,7 +92,7 @@ def decode_2bit(iterable: Iterable[int], palette: Sequence[Color]) \
raise StopIteration()
def decode_tile(tile: ByteString, palette: Sequence[Color]) -> ByteString:
def decode_tile(tile: ByteString, palette: Sequence[int]) -> ByteString:
"""Decode a 2-bit tile to 8-bit grayscale.
:param tile: 16-byte encoded 2-bit tile data.

38
slowboy/gpu.py

@ -158,7 +158,7 @@ class GPU(ClockListener):
self.obp1 = self._obp1
self.mode = Mode.OAM_READ
self.stat |= 0x03
self._stat |= 0x03
self.mode_clock = 0
self.last_time = time()
@ -311,8 +311,7 @@ class GPU(ClockListener):
if value == self.ly:
# LYC interrupt
self._stat |= 1 << STAT_LYC_FLAG_OFFSET
if self.interrupt_controller is not None\
and self.stat & STAT_LYC_IE_MASK:
if self.interrupt_controller is not None and self.stat & STAT_LYC_IE_MASK:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
else:
self._stat &= ~STAT_LYC_FLAG_MASK
@ -358,7 +357,7 @@ class GPU(ClockListener):
| (old_stat & (STAT_LYC_FLAG_MASK | STAT_MODE_MASK))
# ly and mode setters will check this register for their interrupt
# status and notify the interrupt controller if necessary
self.logger.debug('set STAT to %#x', self._stat)
self.logger.info('set STAT to %#x (%#x)', self._stat, new_stat)
@property
def mode(self):
@ -371,16 +370,16 @@ class GPU(ClockListener):
register, causing the stat getter to be called a lot.
"""
stat = self.stat & ~STAT_MODE_MASK
if (new_mode == Mode.OAM_READ or new_mode == Mode.OAM_VRAM_READ)\
and stat & STAT_OAM_IE_MASK\
and self.interrupt_controller is not None:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
elif new_mode == Mode.V_BLANK and stat & STAT_VBLANK_IE_MASK and \
self.interrupt_controller is not None:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
elif new_mode == Mode.H_BLANK and stat & STAT_HBLANK_IE_MASK and \
self.interrupt_controller is not None:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
if self.interrupt_controller is not None:
if (new_mode == Mode.OAM_READ or new_mode == Mode.OAM_VRAM_READ)\
and stat & STAT_OAM_IE_MASK:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
elif new_mode == Mode.V_BLANK:
if stat & STAT_VBLANK_IE_MASK:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
self.interrupt_controller.notify_interrupt(InterruptType.vblank)
elif new_mode == Mode.H_BLANK and stat & STAT_HBLANK_IE_MASK:
self.interrupt_controller.notify_interrupt(InterruptType.stat)
self._mode = new_mode
# We have to "cheat" here to update the STAT mode flag--the stat setter
# considers the mode flag read-only
@ -613,7 +612,7 @@ class GPU(ClockListener):
('SCX', self.scx),
('LY', self.ly),
('LYC', self.lyc),
('MODE', self.mode),
('MODE', self.mode.value),
('WY', self.wy),
('WX', self.wx),
]
@ -691,10 +690,11 @@ class GPU(ClockListener):
rgba_data[4*i+1] = (c >> 16) & 0xff
rgba_data[4*i+2] = (c >> 8) & 0xff
rgba_data[4*i+3] = c & 0xff
tile_surface = sdl2.SDL_CreateRGBSurfaceWithFormatFrom(bytes(rgba_data),
TWIDTH, THEIGHT,
32, TWIDTH*4,
sdl2.SDL_PIXELFORMAT_RGBA32)
tile_surface = sdl2.SDL_CreateRGBSurfaceWithFormatFrom(
bytes(rgba_data),
TWIDTH, THEIGHT,
32, TWIDTH*4,
sdl2.SDL_PIXELFORMAT_RGBA32)
if not tile_surface:
print(sdl2.SDL_GetError())
raise Exception

2
slowboy/interrupts.py

@ -75,7 +75,7 @@ class InterruptController(InterruptListener):
@property
def has_interrupt(self) -> bool:
return self.enabled and (self.if_ & 0x1f) > 0
return self.enabled and (self.if_ & 0x1f) != 0
def get_interrupts(self) -> Sequence[InterruptType]:
for i in range(5):

150
slowboy/mmu.py

@ -2,9 +2,10 @@
import logging
from slowboy.gpu import GPU, VRAM_START, OAM_START
from slowboy.interrupts import InterruptController
from slowboy.interrupts import InterruptController, InterruptType
from slowboy.timer import Timer
JOYP_SELECT_BUTTON_MASK = 0x20
JOYP_SELECT_DIRECTION_MASK = 0x10
@ -30,6 +31,7 @@ class MMU():
self.wram = bytearray(4*1024 + 4*1024)
self.sprite_table = bytearray(160)
self.hram = bytearray(127)
self._sound_mem = bytearray(0x40)
self._joyp = 0
self._buttons = {
@ -45,8 +47,9 @@ class MMU():
self._dma = 0
# Mapping of address to callback
self.watchpoints = {}
# Read watchpoints. Mapping of address to callback.
self._watchpoints_r = {}
self._watchpoints_w = {}
def load_rom(self, romdata):
self.rom = romdata
@ -130,92 +133,95 @@ class MMU():
def load_interrupt_controller(self, interrupt_controller: InterruptController):
self.interrupt_controller = interrupt_controller
def get_addr(self, addr):
#if addr in self.watchpoints:
# self.watchpoints[addr](addr, None)
def add_watchpoint(self, addr, read, cb):
self._watchpoints_w[addr] = cb
if read:
self._watchpoints_r[addr] = lambda value: cb(value, read=False)
def get_addr(self, addr):
if addr < 0:
# invalid
raise ValueError('invalid address {:#04x}'.format(addr))
elif addr < 0x4000:
# ROM Bank 0 (16 KB)
return self.rom[addr]
val = self.rom[addr]
elif addr < 0x8000:
# ROM Bank 1+ (16 KB)
return self.rom[addr]
val = self.rom[addr]
elif addr < 0xa000:
# VRAM (8 KB)
return self.gpu.get_vram(addr - VRAM_START)
val = self.gpu.get_vram(addr - VRAM_START)
elif addr < 0xc000:
# cartridge RAM (8 KB)
return self.cartridge_ram[addr - 0xa000]
val = self.cartridge_ram[addr - 0xa000]
elif addr < 0xd000:
# WRAM 0 (4 KB)
return self.wram[addr - 0xc000]
val = self.wram[addr - 0xc000]
elif addr < 0xe000:
# WRAM 1 (4 KB)
return self.wram[addr - 0xc000]
val = self.wram[addr - 0xc000]
elif addr < 0xfe00:
# echo RAM 0xc000–ddff
return self.get_addr(addr - 0x2000)
val = self.get_addr(addr - 0x2000)
elif addr < 0xfea0:
# sprite table (OAM)
return self.gpu.get_oam(addr - OAM_START)
val = self.gpu.get_oam(addr - OAM_START)
elif addr < 0xff00:
# invalid
self.logger.debug('read from invalid address %#04x', addr)
return 0
val = 0
#raise ValueError('invalid address {}'.format(addr))
elif addr < 0xff80:
# IO
if addr == 0xff00:
return self.joyp
elif addr == 0xff01 | addr == 0xff02:
# print(f'Read joypad {self.joyp:x}')
val = self.joyp
elif addr == 0xff01 or addr == 0xff02:
raise NotImplementedError('Serial transfer registers')
elif addr == 0xff04:
return self.timer.div
val = self.timer.div
elif addr == 0xff05:
return self.timer.tima
val = self.timer.tima
elif addr == 0xff06:
return self.timer.tma
val = self.timer.tma
elif addr == 0xff07:
return self.timer.tac
val = self.timer.tac
elif addr == 0xff0f:
# IF
return self.interrupt_controller.if_
val = self.interrupt_controller.if_
elif addr == 0xff10:
raise NotImplementedError('IF register')
elif addr < 0xff40:
raise NotImplementedError('sound registers')
elif addr == 0xff40:
return self.gpu.lcdc
val = self.gpu.lcdc
elif addr == 0xff41:
return self.gpu.stat
val = self.gpu.stat
elif addr == 0xff42:
return self.gpu.scy
val = self.gpu.scy
elif addr == 0xff43:
return self.gpu.scx
val = self.gpu.scx
elif addr == 0xff44:
return self.gpu.ly
val = self.gpu.ly
elif addr == 0xff45:
return self.gpu.lyc
val = self.gpu.lyc
elif addr == 0xff46:
return self.dma
val = self.dma
elif addr == 0xff47:
return self.gpu.bgp
val = self.gpu.bgp
elif addr == 0xff48:
return self.gpu.obp0
val = self.gpu.obp0
elif addr == 0xff49:
return self.gpu.obp1
val = self.gpu.obp1
elif addr == 0xff4a:
return self.gpu.wy
val = self.gpu.wy
elif addr == 0xff4b:
return self.gpu.wx
val = self.gpu.wx
else:
raise NotImplementedError('memory-mapped IO addr {}'.format(hex(addr)))
elif addr < 0xffff:
# HRAM
return self.hram[addr - 0xff80]
val = self.hram[addr - 0xff80]
elif addr == 0xffff:
# interrupt enable register
# bit 0: v-blank interrupt
@ -224,14 +230,18 @@ class MMU():
# bit 3: serial interrupt
# bit 4: joypad interrupt
if self.interrupt_controller is not None:
return self.interrupt_controller.ie
val = self.interrupt_controller.ie
else:
self.logger.warning('read from interrupt controller when there is '
'not one loaded')
return 0
self.logger.warning('read from interrupt controller when there is not one loaded')
val = 0
else:
raise ValueError('invalid address {:#04x}'.format(addr))
if addr in self._watchpoints_r:
self._watchpoints_r[addr](val)
return val
def set_addr(self, addr, value):
value = value & 0xff
@ -269,6 +279,7 @@ class MMU():
elif addr < 0xff80:
# IO 0xff00-0xff7f
if addr == 0xff00:
# print(f'Write joypad {value:x}')
self.joyp = value
elif addr == 0xff01 | addr == 0xff02:
raise NotImplementedError('Serial transfer registers')
@ -284,7 +295,9 @@ class MMU():
# IF
self.interrupt_controller.if_ = value
elif addr < 0xff40:
self.logger.warn('not implemented: sound registers')
self._sound_mem[addr-0xff10] = value
# TODO
# self.logger.warn('not implemented: sound registers %#04x, %#04x', addr, value)
elif addr == 0xff40:
self.gpu.lcdc = value
elif addr == 0xff41:
@ -324,47 +337,52 @@ class MMU():
if self.interrupt_controller is not None:
self.interrupt_controller.ie = value
else:
self.logger.warning('write to interrupt controller when there '
'is not one loaded')
self.logger.warning('write to interrupt controller when there is not one loaded')
else:
raise ValueError('invalid address {:#04x}'.format(hex(addr)))
if addr in self._watchpoints_w:
self._watchpoints_w[addr](value)
@property
def joyp(self):
joyp = self._joyp & 0xf0
if joyp & JOYP_SELECT_BUTTON_MASK:
if not self._buttons['down']:
joyp |= 0x08
if not self._buttons['up']:
joyp |= 0x04
if not self._buttons['left']:
joyp |= 0x02
if not self._buttons['right']:
joyp |= 0x01
if joyp & JOYP_SELECT_DIRECTION_MASK:
if not self._buttons['start']:
joyp |= 0x08
else:
pass
if not self._buttons['select']:
joyp |= 0x04
if not self._buttons['b']:
joyp |= 0x02
if not self._buttons['a']:
joyp |= 0x01
return joyp
joyp = (self._joyp & 0x30) | 0x0f
buttons = 0
if joyp & JOYP_SELECT_BUTTON_MASK == 0:
if self._buttons['down']:
buttons |= 0x08
if self._buttons['up']:
buttons |= 0x04
if self._buttons['left']:
buttons |= 0x02
if self._buttons['right']:
buttons |= 0x01
elif joyp & JOYP_SELECT_DIRECTION_MASK == 0:
if self._buttons['start']:
buttons |= 0x08
if self._buttons['select']:
buttons |= 0x04
if self._buttons['b']:
buttons |= 0x02
if self._buttons['a']:
buttons |= 0x01
return joyp & ~buttons
@joyp.setter
def joyp(self, value):
# Program can only write bits 4 and 5. Bit 4 (active low) selects start/select/B/A, while
# bit 5 (active low) selects down/up/left/right.
self._joyp = value & 0x30
def press_button(self, button: str):
if not self._buttons[button]:
self.interrupt_controller.notify_interrupt(InterruptType.joypad)
self._buttons[button] = True
print(button, 'DOWN', hex(self.joyp))
self._buttons[button] = True
def unpress_button(self, button: str):
print(button, 'UP', hex(self.joyp))
self._buttons[button] = False
print(button, 'UP', hex(self.joyp))
@property
def dma(self):

38
slowboy/ui.py

@ -2,8 +2,6 @@
import logging
import sdl2
import sdl2.ext
import select
import sys
import argparse as ap
import threading
from collections import deque
@ -61,7 +59,6 @@ class SDLUI():
rom = bytearray(0x8000)
with open(romfile, 'rb') as f:
#f.readinto(rom)
rom_read = f.read()
print('Read {} B from ROM file'.format(len(rom_read)))
rom[0:len(rom_read)] = rom_read
@ -80,7 +77,6 @@ class SDLUI():
self.debug_cmd_q = self.debug_thread.command_queue
self.debug_resp_q = self.debug_thread.response_queue
self.debug_thread.start()
#self.cpu.set_debug_queues(self.cmd_q, self.resp_q)
self.cmd_q = deque()
self.resp_q = deque()
self.emulator_thread = EmulatorThread(self.cpu, self.cmd_q, self.resp_q)
@ -95,17 +91,10 @@ class SDLUI():
print('SDLUI.stop finished')
def start(self):
# self.cpu.go()
self.cpu.state = State.RUN
self.emulator_thread.start()
def step(self):
#try:
# self.cpu.step()
#except Exception as e:
# self.cpu.log_op(log=ui.logger.error)
# self.cpu.log_regs(log=ui.logger.error)
# raise e
if self.cpu.gpu.draw(self.surface):
self.window.refresh()
@ -147,11 +136,8 @@ def command(ui, state):
raise ValueError('Expected "map," "vram," or "oam."')
elif subc == 'display':
src = line[2]
import PIL
from PIL import Image
if src == 'data':
#img = Image.frombytes("L", BACKGROUND_SIZE, bytes(ui.cpu.gpu._bgtileset.to_rgb().data))
#img.show()
from slowboy.gpu import GBTileset
vram = ui.cpu.gpu.vram
start = 0x8800-VRAM_START
@ -240,9 +226,9 @@ if __name__ == '__main__':
parser.add_argument('-d', '--debug', action='store_true',
help='Start the emulator in debug mode.')
parser.add_argument('--debug-port', type=int, default=9099,
help='Debugger listening port')
help='Debugger listening port (default=9099)')
parser.add_argument('--debug-address', type=str, default='127.0.0.1',
help='Debugger listening address')
help='Debugger listening address (default=127.0.0.1)')
parser.add_argument('--profile', action='store_true',
help='Print profiling info on exit.')
parser.add_argument('-v', '--verbose', action='store_true',
@ -253,7 +239,6 @@ if __name__ == '__main__':
import yappi
yappi.start()
# ui = SDLUI(sys.argv[1], logger=root_logger, log_level=logging.DEBUG)
ui = SDLUI(args.rom, debug=args.debug,
debug_address=(args.debug_address, args.debug_port),
log_level=root_logger.level)
@ -268,6 +253,9 @@ if __name__ == '__main__':
root_logger.setLevel(logging.DEBUG)
ui.cpu.logger.setLevel(logging.DEBUG)
if args.debug:
ui.cpu.trace = True
button_map = {
sdl2.SDLK_DOWN: 'down',
sdl2.SDLK_UP: 'up',
@ -292,22 +280,26 @@ if __name__ == '__main__':
if event.type == sdl2.SDL_KEYDOWN:
if event.key.keysym.sym == sdl2.SDLK_s:
ui.cpu.trace = True
ui.cpu.step = True
ui.step()
elif event.key.keysym.sym == sdl2.SDLK_c:
ui.cpu.step = False
ui.cpu.trace = False
elif event.key.keysym.sym == sdl2.SDLK_q:
ui.stop()
ui.cpu.log_regs(log=ui.logger.info)
ui.cpu.log_op(log=ui.logger.info)
#for a in sorted(ui.cpu._calls.keys()):
# for a in sorted(ui.cpu._calls.keys()):
# print(hex(a), ui.cpu._calls[a])
#for branch in sorted(ui.cpu._branches.keys(), key=lambda k: ui.cpu._branches[k]):
# for branch in sorted(ui.cpu._branches.keys(), key=lambda k: ui.cpu._branches[k]):
# src, dst = branch
# print("{:#04x} → {:#04x}: {}".format(src, dst, ui.cpu._branches[branch]))
state['running'] = False
break
elif event.key.keysym.sym == sdl2.SDLK_d:
ui.cpu.logger.setLevel(logging.DEBUG)
ui.cpu.mmu.logger.setLevel(logging.DEBUG)
ui.cpu.gpu.logger.setLevel(logging.DEBUG)
elif event.key.keysym.sym == sdl2.SDLK_i:
ui.cpu.gpu.logger.setLevel(logging.INFO)
@ -325,15 +317,11 @@ if __name__ == '__main__':
if not ui.cpu.trace:
ui.step()
#rlist, _, _ = select.select((sys.stdin,), (), (), 0)
#if rlist:
# command(ui, state)
if ui.cpu.trace:
sdl2.SDL_Delay(100)
# sdl2.SDL_UpdateWindowSurface(ui.window.window)
except KeyboardInterrupt: