:py:mod:`flow.record.base` ========================== .. py:module:: flow.record.base Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: flow.record.base.FieldType flow.record.base.Record flow.record.base.GroupedRecord flow.record.base.RecordField flow.record.base.RecordFieldSet flow.record.base.RecordDescriptor flow.record.base.DynamicFieldtypeModule Functions ~~~~~~~~~ .. autoapisummary:: :nosignatures: flow.record.base.set_ignored_fields_for_comparison flow.record.base.ignore_fields_for_comparison flow.record.base.is_valid_field_name flow.record.base.parse_def flow.record.base.DynamicDescriptor flow.record.base.open_stream flow.record.base.find_adapter_for_stream flow.record.base.open_path_or_stream flow.record.base.open_path flow.record.base.RecordAdapter flow.record.base.RecordReader flow.record.base.RecordWriter flow.record.base.stream flow.record.base.fieldtype flow.record.base.merge_record_descriptors flow.record.base.extend_record flow.record.base.normalize_fieldname flow.record.base.iter_timestamped_records Attributes ~~~~~~~~~~ .. autoapisummary:: flow.record.base.HAS_LZ4 flow.record.base.HAS_BZ2 flow.record.base.HAS_ZSTD flow.record.base.HAS_AVRO flow.record.base.log flow.record.base.RECORD_VERSION flow.record.base.RESERVED_FIELDS flow.record.base.GZIP_MAGIC flow.record.base.BZ2_MAGIC flow.record.base.LZ4_MAGIC flow.record.base.ZSTD_MAGIC flow.record.base.AVRO_MAGIC flow.record.base.RECORDSTREAM_MAGIC flow.record.base.RECORDSTREAM_MAGIC_DEPTH flow.record.base.RE_VALID_FIELD_NAME flow.record.base.RE_VALID_RECORD_TYPE_NAME flow.record.base.RECORD_CLASS_TEMPLATE flow.record.base.IGNORE_FIELDS_FOR_COMPARISON flow.record.base.net flow.record.base.dynamic_fieldtype flow.record.base.TimestampRecord .. py:data:: HAS_LZ4 :value: True .. py:data:: HAS_BZ2 :value: True .. py:data:: HAS_ZSTD :value: True .. py:data:: HAS_AVRO :value: True .. py:data:: log .. py:data:: RECORD_VERSION :value: 1 .. py:data:: RESERVED_FIELDS .. py:data:: GZIP_MAGIC :value: b'\x1f\x8b' .. py:data:: BZ2_MAGIC :value: b'BZh' .. py:data:: LZ4_MAGIC :value: b'\x04"M\x18' .. py:data:: ZSTD_MAGIC :value: b'(\xb5/\xfd' .. py:data:: AVRO_MAGIC :value: b'Obj' .. py:data:: RECORDSTREAM_MAGIC :value: b'RECORDSTREAM\n' .. py:data:: RECORDSTREAM_MAGIC_DEPTH .. py:data:: RE_VALID_FIELD_NAME .. py:data:: RE_VALID_RECORD_TYPE_NAME .. py:data:: RECORD_CLASS_TEMPLATE :value: Multiline-String .. raw:: html
Show Value .. code-block:: python """ class {name}(Record): _desc = None _field_types = {field_types} __slots__ = {slots_tuple} def __init__(__self, {args}): {init_code} @classmethod def _unpack(__cls, {args}): {unpack_code} """ .. raw:: html
.. py:data:: IGNORE_FIELDS_FOR_COMPARISON .. py:function:: set_ignored_fields_for_comparison(ignored_fields: Iterable[str]) -> None Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope .. py:function:: ignore_fields_for_comparison(ignored_fields: Iterable[str]) Context manager to temporarily ignore fields for comparison. .. py:class:: FieldType .. py:method:: default() :classmethod: Return the default value for the field in the Record template. .. py:class:: Record .. py:attribute:: __slots__ :value: () .. py:method:: __eq__(other) Return self==value. .. py:method:: __setattr__(k, v) Enforce setting the fields to their respective types. .. py:method:: __hash__() -> int Return hash(self). .. py:method:: __repr__() Return repr(self). .. py:class:: GroupedRecord(name, records) Bases: :py:obj:`Record` GroupedRecord acts like a normal Record, but can contain multiple records. See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail. .. py:method:: get_record_by_type(type_name) Get record in a GroupedRecord by type_name. :param type_name: The record type name (for example wq/meta). :type type_name: str :returns: None or the record .. py:method:: __repr__() Return repr(self). .. py:method:: __setattr__(attr, val) Enforce setting the fields to their respective types. .. py:method:: __getattr__(attr) .. py:function:: is_valid_field_name(name, check_reserved=True) .. py:function:: parse_def(definition) .. py:class:: RecordField(name: str, typename: str) .. py:attribute:: name .. py:attribute:: typename .. py:attribute:: type .. py:method:: __repr__() Return repr(self). .. py:class:: RecordFieldSet Bases: :py:obj:`list` Built-in mutable sequence. If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified. .. py:class:: RecordDescriptor(name: str, fields: Optional[Sequence[tuple[str, str]]] = None) Record Descriptor class for defining a Record type and its fields. .. py:property:: fields :type: Mapping[str, RecordField] Get fields mapping (without required fields). eg: { "foo": RecordField("foo", "string"), "bar": RecordField("bar", "varint"), } :returns: Mapping of Record fields .. py:property:: descriptor_hash :type: int Returns the (cached) descriptor hash .. py:property:: identifier :type: tuple[str, int] Returns a tuple containing the descriptor name and hash .. py:attribute:: name :type: str .. py:attribute:: recordType :type: type .. py:method:: get_required_fields() -> Mapping[str, RecordField] :staticmethod: Get required fields mapping. eg: { "_source": RecordField("_source", "string"), "_classification": RecordField("_classification", "datetime"), "_generated": RecordField("_generated", "datetime"), "_version": RecordField("_version", "vaeint"), } :returns: Mapping of required fields .. py:method:: get_all_fields() -> Mapping[str, RecordField] Get all fields including required meta fields. eg: { "ts": RecordField("ts", "datetime"), "foo": RecordField("foo", "string"), "bar": RecordField("bar", "varint"), "_source": RecordField("_source", "string"), "_classification": RecordField("_classification", "datetime"), "_generated": RecordField("_generated", "datetime"), "_version": RecordField("_version", "varint"), } :returns: Mapping of all Record fields .. py:method:: getfields(typename: str) -> RecordFieldSet Get fields of a given type. :param typename: The typename of the fields to return. eg: "string" or "datetime" :returns: RecordFieldSet of fields with the given typename .. py:method:: __call__(*args, **kwargs) -> Record Create a new Record initialized with `args` and `kwargs`. .. py:method:: init_from_dict(rdict: dict[str, Any], raise_unknown=False) -> Record Create a new Record initialized with key, value pairs from `rdict`. If `raise_unknown=True` then fields on `rdict` that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False) :returns: Record with data from `rdict` .. py:method:: init_from_record(record: Record, raise_unknown=False) -> Record Create a new Record initialized with data from another `record`. If `raise_unknown=True` then fields on `record` that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False) :returns: Record with data from `record` .. py:method:: extend(fields: Sequence[tuple[str, str]]) -> RecordDescriptor Returns a new RecordDescriptor with the extended fields :returns: RecordDescriptor with extended fields .. py:method:: get_field_tuples() -> tuple[tuple[str, str]] Returns a tuple containing the (typename, name) tuples, eg: (('boolean', 'foo'), ('string', 'bar')) :returns: Tuple of (typename, name) tuples .. py:method:: calc_descriptor_hash(name, fields: Sequence[tuple[str, str]]) -> int :staticmethod: Calculate and return the (cached) descriptor hash as a 32 bit integer. The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types. .. py:method:: __hash__() -> int Return hash(self). .. py:method:: __eq__(other: RecordDescriptor) -> bool Return self==value. .. py:method:: __repr__() -> str Return repr(self). .. py:method:: definition(reserved: bool = True) -> str Return the RecordDescriptor as Python definition string. If `reserved` is True it will also return the reserved fields. :returns: Descriptor definition string .. py:method:: base(**kwargs_sink) .. py:function:: DynamicDescriptor(name, fields) .. py:function:: open_stream(fp: BinaryIO, mode: str) -> BinaryIO .. py:function:: find_adapter_for_stream(fp: BinaryIO) -> tuple[BinaryIO, Optional[str]] .. py:function:: open_path_or_stream(path: Union[str, pathlib.Path, BinaryIO], mode: str, clobber: bool = True) -> IO .. py:function:: open_path(path: str, mode: str, clobber: bool = True) -> IO Open ``path`` using ``mode`` and returns a file object. It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream. :param path: Filename or path to filename to open :param mode: Could be "r", "rb" to open file for reading, "w", "wb" for writing :param clobber: Overwrite file if it already exists if `clobber=True`, else raises IOError. .. py:function:: RecordAdapter(url: Optional[str] = None, out: bool = False, selector: Optional[str] = None, clobber: bool = True, fileobj: Optional[BinaryIO] = None, **kwargs) -> Union[flow.record.adapter.AbstractWriter, flow.record.adapter.AbstractReader] .. py:function:: RecordReader(url: Optional[str] = None, selector: Optional[str] = None, fileobj: Optional[BinaryIO] = None, **kwargs) -> flow.record.adapter.AbstractReader .. py:function:: RecordWriter(url: Optional[str] = None, clobber: bool = True, **kwargs) -> flow.record.adapter.AbstractWriter .. py:function:: stream(src, dst) .. py:function:: fieldtype(clspath: str) -> FieldType Return the FieldType class for the given field type class path. :param clspath: class path of the field type. eg: ``uint32``, ``net.ipaddress``, ``string[]`` :returns: The FieldType class. .. py:function:: merge_record_descriptors(descriptors: tuple[RecordDescriptor], replace: bool = False, name: Optional[str] = None) -> RecordDescriptor Create a newly merged RecordDescriptor from a list of RecordDescriptors. This function uses a cache to avoid creating the same descriptor multiple times. Duplicate fields are ignored in ``descriptors`` unless ``replace=True``. :param descriptors: Tuple of RecordDescriptors to merge. :param replace: if ``True``, it will replace existing field names. Last descriptor always wins. :param name: rename the RecordDescriptor name to ``name``. Otherwise, use name from first descriptor. :returns: Merged RecordDescriptor .. py:function:: extend_record(record: Record, other_records: list[Record], replace: bool = False, name: Optional[str] = None) -> Record Extend ``record`` with fields and values from ``other_records``. Duplicate fields are ignored in ``other_records`` unless ``replace=True``. :param record: Initial Record to extend. :param other_records: List of Records to use for extending/replacing. :param replace: if ``True``, it will replace existing fields and values in ``record`` from fields and values from ``other_records``. Last record always wins. :param name: rename the RecordDescriptor name to ``name``. Otherwise, use name from initial ``record``. :returns: Extended Record .. py:function:: normalize_fieldname(field_name: str) -> str Returns a normalized version of ``field_name``. Some (field) names are not allowed in flow.record, while they can be allowed in other formats. This normalizes the name so it can still be used in flow.record. Reserved field_names are not normalized. >>> normalize_fieldname("my-variable-name-with-dashes") 'my_variable_name_with_dashes' >>> normalize_fieldname("_my_name_starting_with_underscore") 'x__my_name_starting_with_underscore' >>> normalize_fieldname("1337") 'x_1337' >>> normalize_fieldname("my name with spaces") 'my_name_with_spaces' >>> normalize_fieldname("my name (with) parentheses") 'my_name__with__parentheses' >>> normalize_fieldname("_generated") '_generated' .. py:class:: DynamicFieldtypeModule(path='') .. py:method:: __getattr__(path) .. py:method:: gettypename() .. py:method:: __call__(*args, **kwargs) .. py:data:: net .. py:data:: dynamic_fieldtype .. py:data:: TimestampRecord .. py:function:: iter_timestamped_records(record: Record) -> Iterator[Record] Yields timestamped annotated records for each `datetime` fieldtype in `record`. If `record` does not have any `datetime` fields the original record is returned. :param record: Record to add timestamp fields for. :Yields: Record annotated with `ts` and `ts_description` fields for each `datetime` fieldtype.