Source code for dhcpkit_vpp.protocols.layer2

"""
Classes and constants for layer 2 frames
"""
from struct import pack, unpack_from

from dhcpkit.protocol_element import ElementDataRepresentation, ProtocolElement, UnknownProtocolElement
from dhcpkit.utils import normalise_hex

from dhcpkit_vpp.protocols import Layer2Frame


[docs]class Ethernet(Layer2Frame): """ The class for ethernet frames. """ def __init__(self, destination: bytes = b'\x00\x00\x00\x00\x00\x00', source: bytes = b'\x00\x00\x00\x00\x00\x00', ethertype: int = 0, payload: ProtocolElement = None): super().__init__() self.destination = destination self.source = source self.ethertype = ethertype self.payload = payload @classmethod
[docs] def determine_class(cls, buffer: bytes, offset: int = 0) -> type: """ Return the appropriate class to parse this element with. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :return: The best known class for this data """ return Ethernet
[docs] def display_source(self) -> ElementDataRepresentation: """ Nicer representation of source :return: Representation of source """ return ElementDataRepresentation(normalise_hex(self.source, include_colons=True))
[docs] def display_destination(self) -> ElementDataRepresentation: """ Nicer representation of destination :return: Representation of destination """ return ElementDataRepresentation(normalise_hex(self.destination, include_colons=True))
[docs] def display_ethertype(self) -> ElementDataRepresentation: """ Nicer representation of ethertype :return: Representation of ethertype """ return ElementDataRepresentation("{:x} ({})".format(self.ethertype, self.ethertype))
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ # Check if the source and destination are 6 bytes if not isinstance(self.source, bytes) or len(self.source) != 6: raise ValueError("Source must be a sequence of 6 bytes") if not isinstance(self.destination, bytes) or len(self.destination) != 6: raise ValueError("Destination must be a sequence of 6 bytes") # Check if the ethertype is two bytes if not isinstance(self.ethertype, int) or not (0 <= self.ethertype < 2 ** 16): raise ValueError("Ethertype must be an unsigned 16 bit integer") # Check if the payload is a protocol element if not isinstance(self.payload, ProtocolElement): raise ValueError("Payload must be a protocol element") # Check if all options are allowed self.validate_contains([self.payload]) self.payload.validate()
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset = 0 max_length = length or (len(buffer) - offset) if max_length < 14: raise ValueError("Ethernet frames must be at least 14 bytes long") self.destination = buffer[offset + my_offset:offset + my_offset + 6] my_offset += 6 self.source = buffer[offset + my_offset:offset + my_offset + 6] my_offset += 6 self.ethertype = unpack_from('!H', buffer[offset + my_offset:offset + my_offset + 2])[0] my_offset += 2 # Determine the layer 3 type based on the ethertype from dhcpkit_vpp.protocols.layer3 import UnknownLayer3Packet from dhcpkit_vpp.protocols.layer3_registry import protocol_layer3_registry layer3_class = protocol_layer3_registry.get(self.ethertype, UnknownLayer3Packet) max_payload_len = max_length - my_offset payload_len, self.payload = layer3_class.parse(buffer, offset=offset + my_offset, length=max_payload_len) my_offset += payload_len return my_offset
[docs] def save(self) -> bytes: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ return self.destination + self.source + pack('!H', self.ethertype) + self.payload.save()
Layer2Frame.add_may_contain(UnknownProtocolElement)