Skip to content

Commit

Permalink
Rename test client to more specific names
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeutin-ledger committed Jul 3, 2023
1 parent 1edf9cd commit ed60354
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 35 deletions.
143 changes: 143 additions & 0 deletions tests/ragger/app/eth_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from enum import IntEnum, auto
from typing import Optional
from ragger.backend import BackendInterface
from ragger.utils import RAPDU
from .eth_command_builder import EthCommandBuilder
from .eth_eip712 import EIP712FieldType
from .eth_tlv import format_tlv
from pathlib import Path
import keychain
import rlp


ROOT_SCREENSHOT_PATH = Path(__file__).parent.parent
WEI_IN_ETH = 1e+18


class StatusWord(IntEnum):
OK = 0x9000
ERROR_NO_INFO = 0x6a00
INVALID_DATA = 0x6a80
INSUFFICIENT_MEMORY = 0x6a84
INVALID_INS = 0x6d00
INVALID_P1_P2 = 0x6b00
CONDITION_NOT_SATISFIED = 0x6985
REF_DATA_NOT_FOUND = 0x6a88

class DOMAIN_NAME_TAG(IntEnum):
STRUCTURE_TYPE = 0x01
STRUCTURE_VERSION = 0x02
CHALLENGE = 0x12
SIGNER_KEY_ID = 0x13
SIGNER_ALGO = 0x14
SIGNATURE = 0x15
DOMAIN_NAME = 0x20
COIN_TYPE = 0x21
ADDRESS = 0x22


class EthClient:
def __init__(self, backend: BackendInterface):
self._client = backend
self._apdu_builder = EthCommandBuilder()

def _send(self, payload: bytearray):
return self._client.exchange_async_raw(payload)

def response(self) -> RAPDU:
return self._client._last_async_response

def eip712_send_struct_def_struct_name(self, name: str):
return self._send(self._apdu_builder.eip712_send_struct_def_struct_name(name))

def eip712_send_struct_def_struct_field(self,
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: [],
key_name: str):
return self._send(self._apdu_builder.eip712_send_struct_def_struct_field(
field_type,
type_name,
type_size,
array_levels,
key_name))

def eip712_send_struct_impl_root_struct(self, name: str):
return self._send(self._apdu_builder.eip712_send_struct_impl_root_struct(name))

def eip712_send_struct_impl_array(self, size: int):
return self._send(self._apdu_builder.eip712_send_struct_impl_array(size))

def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
chunks = self._apdu_builder.eip712_send_struct_impl_struct_field(raw_value)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])

def eip712_sign_new(self, bip32_path: str, verbose: bool):
return self._send(self._apdu_builder.eip712_sign_new(bip32_path))

def eip712_sign_legacy(self,
bip32_path: str,
domain_hash: bytes,
message_hash: bytes):
return self._send(self._apdu_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash))

def eip712_filtering_activate(self):
return self._send(self._apdu_builder.eip712_filtering_activate())

def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes):
return self._send(self._apdu_builder.eip712_filtering_message_info(name, filters_count, sig))

def eip712_filtering_show_field(self, name: str, sig: bytes):
return self._send(self._apdu_builder.eip712_filtering_show_field(name, sig))

def send_fund(self,
bip32_path: str,
nonce: int,
gas_price: int,
gas_limit: int,
to: bytes,
amount: float,
chain_id: int):
data = list()
data.append(nonce)
data.append(gas_price)
data.append(gas_limit)
data.append(to)
data.append(int(amount * WEI_IN_ETH))
data.append(bytes())
data.append(chain_id)
data.append(bytes())
data.append(bytes())

chunks = self._apdu_builder.sign(bip32_path, rlp.encode(data))
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])

def get_challenge(self):
return self._send(self._apdu_builder.get_challenge())

def provide_domain_name(self, challenge: int, name: str, addr: bytes):
payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName
payload += format_tlv(DOMAIN_NAME_TAG.STRUCTURE_VERSION, 1)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1
payload += format_tlv(DOMAIN_NAME_TAG.CHALLENGE, challenge)
payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44
payload += format_tlv(DOMAIN_NAME_TAG.DOMAIN_NAME, name)
payload += format_tlv(DOMAIN_NAME_TAG.ADDRESS, addr)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNATURE,
keychain.sign_data(keychain.Key.DOMAIN_NAME, payload))

chunks = self._apdu_builder.provide_domain_name(payload)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import IntEnum, auto
from typing import Iterator, Optional
from .eip712 import EIP712FieldType
from .eth_eip712 import EIP712FieldType
from ragger.bip import pack_derivation_path
import struct

Expand Down Expand Up @@ -29,7 +29,7 @@ class P2Type(IntEnum):
FILTERING_CONTRACT_NAME = 0x0f
FILTERING_FIELD_NAME = 0xff

class CommandBuilder:
class EthCommandBuilder:
_CLA: int = 0xE0

def _serialize(self,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
30 changes: 15 additions & 15 deletions tests/ragger/eip712/InputData.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import sys
import re
import hashlib
from app.client import EthAppClient, EIP712FieldType
from app.eth_client import EthClient, EIP712FieldType
import keychain
from typing import Callable
import signal

# global variables
app_client: EthAppClient = None
eth_client: EthClient = None
filtering_paths = None
current_path = list()
sig_ctx = {}
Expand Down Expand Up @@ -101,7 +101,7 @@ def send_struct_def_field(typename, keyname):
type_enum = EIP712FieldType.CUSTOM
typesize = None

with app_client.eip712_send_struct_def_struct_field(type_enum,
with eth_client.eip712_send_struct_def_struct_field(type_enum,
typename,
typesize,
array_lvls,
Expand Down Expand Up @@ -196,7 +196,7 @@ def send_struct_impl_field(value, field):
if path in filtering_paths.keys():
send_filtering_show_field(filtering_paths[path])

with app_client.eip712_send_struct_impl_struct_field(data):
with eth_client.eip712_send_struct_impl_struct_field(data):
enable_autonext()
disable_autonext()

Expand All @@ -208,7 +208,7 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True):
if new_level:
current_path.append(field["name"])
if len(array_lvls) > 0 and lvls_left > 0:
with app_client.eip712_send_struct_impl_array(len(data)):
with eth_client.eip712_send_struct_impl_array(len(data)):
pass
idx = 0
for subdata in data:
Expand Down Expand Up @@ -260,7 +260,7 @@ def send_filtering_message_info(display_name: str, filters_count: int):
to_sign.append(ord(char))

sig = keychain.sign_data(keychain.Key.CAL, to_sign)
with app_client.eip712_filtering_message_info(display_name, filters_count, sig):
with eth_client.eip712_filtering_message_info(display_name, filters_count, sig):
enable_autonext()
disable_autonext()

Expand All @@ -280,7 +280,7 @@ def send_filtering_show_field(display_name):
for char in display_name:
to_sign.append(ord(char))
sig = keychain.sign_data(keychain.Key.CAL, to_sign)
with app_client.eip712_filtering_show_field(display_name, sig):
with eth_client.eip712_filtering_show_field(display_name, sig):
pass

def read_filtering_file(domain, message, filtering_file_path):
Expand Down Expand Up @@ -326,7 +326,7 @@ def next_timeout(_signum: int, _frame):

def enable_autonext():
seconds = 1/4
if app_client._client.firmware.device == 'stax': # Stax Speculos is slow
if eth_client._client.firmware.device == 'stax': # Stax Speculos is slow
interval = seconds * 3
else:
interval = seconds
Expand All @@ -336,15 +336,15 @@ def disable_autonext():
signal.setitimer(signal.ITIMER_REAL, 0, 0)


def process_file(aclient: EthAppClient,
def process_file(aclient: EthClient,
input_file_path: str,
filtering_file_path = None,
autonext: Callable = None) -> bool:
global sig_ctx
global app_client
global eth_client
global autonext_handler

app_client = aclient
eth_client = aclient
with open(input_file_path, "r") as data:
data_json = json.load(data)
domain_typename = "EIP712Domain"
Expand All @@ -363,19 +363,19 @@ def process_file(aclient: EthAppClient,

# send types definition
for key in types.keys():
with app_client.eip712_send_struct_def_struct_name(key):
with eth_client.eip712_send_struct_def_struct_name(key):
pass
for f in types[key]:
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
send_struct_def_field(f["type"], f["name"])

if filtering_file_path:
with app_client.eip712_filtering_activate():
with eth_client.eip712_filtering_activate():
pass
prepare_filtering(filtr, message)

# send domain implementation
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
with eth_client.eip712_send_struct_impl_root_struct(domain_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, domain, domain_typename):
Expand All @@ -388,7 +388,7 @@ def process_file(aclient: EthAppClient,
send_filtering_message_info(domain["name"], len(filtering_paths))

# send message implementation
with app_client.eip712_send_struct_impl_root_struct(message_typename):
with eth_client.eip712_send_struct_impl_root_struct(message_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, message, message_typename):
Expand Down
27 changes: 14 additions & 13 deletions tests/ragger/test_domain_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from ragger.firmware import Firmware
from ragger.backend import BackendInterface
from ragger.navigator import Navigator, NavInsID
from app.client import EthAppClient, StatusWord, ROOT_SCREENSHOT_PATH
from app.settings import SettingID, settings_toggle
import app.response_parser as ResponseParser

from app.eth_client import EthClient, StatusWord, ROOT_SCREENSHOT_PATH
from app.eth_settings import SettingID, settings_toggle
import app.eth_response_parser as EthResponseParser
import struct

# Values used across all tests
Expand All @@ -24,20 +25,20 @@
def verbose(request) -> bool:
return request.param

def common(app_client: EthAppClient) -> int:
def common(app_client: EthClient) -> int:
if app_client._client.firmware.device == "nanos":
pytest.skip("Not supported on LNS")
with app_client.get_challenge():
pass
return ResponseParser.challenge(app_client.response().data)
return EthResponseParser.challenge(app_client.response().data)


def test_send_fund(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator,
test_name: str,
verbose: bool):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

if verbose:
Expand Down Expand Up @@ -72,7 +73,7 @@ def test_send_fund(firmware: Firmware,
def test_send_fund_wrong_challenge(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
caught = False
challenge = common(app_client)

Expand All @@ -89,7 +90,7 @@ def test_send_fund_wrong_addr(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator,
test_name: str):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

with app_client.provide_domain_name(challenge, NAME, ADDR):
Expand Down Expand Up @@ -121,7 +122,7 @@ def test_send_fund_non_mainnet(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator,
test_name: str):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

with app_client.provide_domain_name(challenge, NAME, ADDR):
Expand Down Expand Up @@ -149,7 +150,7 @@ def test_send_fund_non_mainnet(firmware: Firmware,
def test_send_fund_domain_too_long(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

try:
Expand All @@ -164,7 +165,7 @@ def test_send_fund_domain_too_long(firmware: Firmware,
def test_send_fund_domain_invalid_character(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

try:
Expand All @@ -179,7 +180,7 @@ def test_send_fund_domain_invalid_character(firmware: Firmware,
def test_send_fund_uppercase(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

try:
Expand All @@ -194,7 +195,7 @@ def test_send_fund_uppercase(firmware: Firmware,
def test_send_fund_domain_non_ens(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
app_client = EthAppClient(backend)
app_client = EthClient(backend)
challenge = common(app_client)

try:
Expand Down

0 comments on commit ed60354

Please sign in to comment.