"""
Module containing the definition of known / supported "blocks" of the
pcap-ng format.
Each block is a struct-like object with some fields and possibly
a variable amount of "items" (usually options).
They can optionally expose some other properties, used eg. to provide
better access to decoded information, ...
"""
import io
import itertools
import struct
from pcapng.structs import (
struct_decode, RawBytes, IntField, OptionsField, PacketDataField,
ListField, NameResolutionRecordField, SimplePacketDataField)
from pcapng.constants import link_types
KNOWN_BLOCKS = {}
[docs]class Block(object):
"""Base class for blocks"""
schema = []
def __init__(self, raw):
self._raw = raw
self._decoded = None
@classmethod
[docs] def from_context(cls, raw, ctx):
return cls(raw)
def _decode(self):
return struct_decode(self.schema, io.BytesIO(self._raw),
endianness=self.section.endianness)
def __getattr__(self, name):
if self._decoded is None:
self._decoded = self._decode()
try:
return self._decoded[name]
except KeyError:
raise AttributeError(name)
def __repr__(self):
args = []
for item in self.schema:
name = item[0]
value = getattr(self, name)
try:
value = repr(value)
except:
value = '<{0} (repr failed)>'.format(type(value).__name__)
args.append('{0}={1}'.format(name, value))
return '{0}({1})'.format(self.__class__.__name__, ', '.join(args))
[docs]class SectionMemberBlock(Block):
def __init__(self, raw, section):
super(SectionMemberBlock, self).__init__(raw)
self.section = section
@classmethod
[docs] def from_context(cls, raw, ctx):
return cls(raw, section=ctx.current_section)
[docs]def register_block(block):
"""Handy decorator to register a new known block type"""
KNOWN_BLOCKS[block.magic_number] = block
return block
@register_block
@register_block
[docs]class InterfaceDescription(SectionMemberBlock):
magic_number = 0x00000001
schema = [
('link_type', IntField(16, False)), # todo: enc/decode
('reserved', RawBytes(2)),
('snaplen', IntField(32, False)),
('options', OptionsField([
(2, 'if_name', 'string'),
(3, 'if_description', 'string'),
(4, 'if_IPv4addr', 'ipv4+mask'),
(5, 'if_IPv6addr', 'ipv6+prefix'),
(6, 'if_MACaddr', 'macaddr'),
(7, 'if_EUIaddr', 'euiaddr'),
(8, 'if_speed', 'u64'),
(9, 'if_tsresol', 'u8'),
(10, 'if_tzone', 'u32'),
(11, 'if_filter', 'string'),
(12, 'if_os', 'string'),
(13, 'if_fcslen', 'u8'),
(14, 'if_tsoffset', 'i64'),
]))]
@property # todo: cache this property
[docs] def timestamp_resolution(self):
# ts_resol is a 8-bit integer representing the power of ten
# of the timestamp multiplier. If not specified, -6 is assumed
if 'ts_resol' in self.options:
resol = self.options['ts_resol']
return struct.unpack('b', resol)[0]
return -6
@property
[docs] def statistics(self):
# todo: ensure we always have an interface id -> how??
return self.section.interface_stats.get(self.interface_id)
@property
[docs] def link_type_description(self):
try:
return link_types.LINKTYPE_DESCRIPTIONS[self.link_type]
except KeyError:
return 'Unknown link type: 0x{0:04X}'.format(self.link_type)
[docs]class BlockWithTimestampMixin(object):
"""
Block mixin adding properties to better access timestamps
of blocks that provide one.
"""
@property
[docs] def timestamp(self):
# First, get the accuracy from the ts_resol option
return (((self.timestamp_high << 32) + self.timestamp_low)
* (10 ** self.timestamp_resolution))
@property
[docs] def timestamp_resolution(self):
return self.interface.timestamp_resolution
# todo: add some property returning a datetime() with timezone..
[docs]class BlockWithInterfaceMixin(object):
@property
[docs] def interface(self):
# We need to get the correct interface from the section
# by looking up the interface_id
return self.section.interfaces[self.interface_id]
[docs]class BasePacketBlock(
SectionMemberBlock,
BlockWithInterfaceMixin,
BlockWithTimestampMixin):
"""Base class for the "EnhancedPacket" and "Packet" blocks"""
pass
@register_block
[docs]class EnhancedPacket(BasePacketBlock):
magic_number = 0x00000006
schema = [
('interface_id', IntField(32, False)),
('timestamp_high', IntField(32, False)),
('timestamp_low', IntField(32, False)),
('packet_payload_info', PacketDataField()),
('options', OptionsField([
(2, 'epb_flags'), # todo: is this endianness dependent?
(3, 'epb_hash'), # todo: process the hash value
(4, 'epb_dropcount', 'u64'),
]))
]
@property
[docs] def captured_len(self):
return self.packet_payload_info[0]
@property
[docs] def packet_len(self):
return self.packet_payload_info[1]
@property
[docs] def packet_data(self):
return self.packet_payload_info[2]
@register_block
[docs]class SimplePacket(SectionMemberBlock):
magic_number = 0x00000003
schema = [
('packet_simple_payload_info', SimplePacketDataField()),
]
@property
[docs] def packet_len(self):
return self.packet_simple_payload_info[1]
@property
[docs] def packet_data(self):
return self.packet_simple_payload_info[2]
@register_block
[docs]class Packet(BasePacketBlock):
magic_number = 0x00000002
schema = [
('interface_id', IntField(16, False)),
('drops_count', IntField(16, False)),
('timestamp_high', IntField(32, False)),
('timestamp_low', IntField(32, False)),
('packet_payload_info', PacketDataField()),
('options', OptionsField([
(2, 'epb_flags', 'u32'), # A flag!
(3, 'epb_hash'), # Variable size!
]))
]
@property
[docs] def captured_len(self):
return self.packet_payload_info[0]
@property
[docs] def packet_len(self):
return self.packet_payload_info[1]
@property
[docs] def packet_data(self):
return self.packet_payload_info[2]
@register_block
[docs]class NameResolution(SectionMemberBlock):
magic_number = 0x00000004
schema = [
('records', ListField(NameResolutionRecordField())),
('options', OptionsField([
(2, 'ns_dnsname', 'string'),
(3, 'ns_dnsIP4addr', 'ipv4'),
(4, 'ns_dnsIP6addr', 'ipv6'),
])),
]
@register_block
[docs]class InterfaceStatistics(SectionMemberBlock, BlockWithTimestampMixin,
BlockWithInterfaceMixin):
magic_number = 0x00000005
schema = [
('interface_id', IntField(32, False)),
('timestamp_high', IntField(32, False)),
('timestamp_low', IntField(32, False)),
('options', OptionsField([
(2, 'isb_starttime', 'u64'), # todo: consider resolution
(3, 'isb_endtime', 'u64'),
(4, 'isb_ifrecv', 'u64'),
(5, 'isb_ifdrop', 'u64'),
(6, 'isb_filteraccept', 'u64'),
(7, 'isb_osdrop', 'u64'),
(8, 'isb_usrdeliv', 'u64'),
])),
]
[docs]class UnknownBlock(Block):
"""
Class used to represent an unknown block.
Its block type and raw data will be stored directly with no further
processing.
"""
def __init__(self, block_type, data):
self.block_type = block_type
self.data = data
def __repr__(self):
return ('UnknownBlock(0x{0:08X}, {1!r})'
.format(self.block_type, self.data))