Source code for senzing_grpc.szengine

#! /usr/bin/env python3

"""
``senzing_grpc.szengine.SzEngineGrpc`` is a `gRPC`_ implementation
of the `senzing.szengine.SzEngine`_ interface.

.. _gRPC: https://grpc.io
.. _senzing.szengine.SzEngine: https://garage.senzing.com/sz-sdk-python/senzing.html#module-senzing.szengine
"""

# pylint: disable=E1101,C0302

import json
from types import TracebackType
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union

import grpc
from senzing import SzEngine, SzEngineFlags

from .pb2_grpc import szengine_pb2, szengine_pb2_grpc
from .szhelpers import as_str, new_exception

# Metadata

__all__ = ["SzEngineGrpc"]
__version__ = "0.0.1"  # See https://www.python.org/dev/peps/pep-0396/
__date__ = "2025-01-10"
__updated__ = "2025-01-16"

SENZING_PRODUCT_ID = (
    "5053"  # See https://github.com/senzing-garage/knowledge-base/blob/main/lists/senzing-component-ids.md
)

# -----------------------------------------------------------------------------
# SzEngineGrpc class
# -----------------------------------------------------------------------------


[docs] class SzEngineGrpc(SzEngine): """ Sz engine module access library over gRPC. """ # ------------------------------------------------------------------------- # Python dunder/magic methods # ------------------------------------------------------------------------- def __init__( self, grpc_channel: grpc.Channel, ) -> None: """ Constructor For return value of -> None, see https://peps.python.org/pep-0484/#the-meaning-of-annotations """ self.channel = grpc_channel self.stub = szengine_pb2_grpc.SzEngineStub(self.channel) self.noop = "" def __enter__( self, ) -> Any: # TODO: Replace "Any" with "Self" once python 3.11 is lowest supported python version. """Context Manager method.""" return self def __exit__( self, exc_type: Union[Type[BaseException], None], exc_val: Union[BaseException, None], exc_tb: Union[TracebackType, None], ) -> None: """Context Manager method.""" # ------------------------------------------------------------------------- # SzEngine methods # -------------------------------------------------------------------------
[docs] def add_record( self, data_source_code: str, record_id: str, record_definition: str, flags: int = 0, ) -> str: try: request = szengine_pb2.AddRecordRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), record_definition=as_str(record_definition), flags=flags, ) response = self.stub.AddRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def close_export(self, export_handle: int) -> None: try: request = szengine_pb2.CloseExportRequest( # type: ignore[unused-ignore] export_handle=export_handle, ) self.stub.CloseExport(request) except Exception as err: raise new_exception(err) from err
[docs] def count_redo_records(self) -> int: try: request = szengine_pb2.CountRedoRecordsRequest() # type: ignore[unused-ignore] response = self.stub.CountRedoRecords(request) return int(response.result) except Exception as err: raise new_exception(err) from err
[docs] def delete_record( self, data_source_code: str, record_id: str, flags: int = 0, ) -> str: try: request = szengine_pb2.DeleteRecordRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.DeleteRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
def _destroy(self) -> None: """Null function in the sz-sdk-python-grpc implementation."""
[docs] def export_csv_entity_report( self, csv_column_list: str, flags: int = SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS, ) -> int: try: request = szengine_pb2.ExportCsvEntityReportRequest( # type: ignore[unused-ignore] csv_column_list=as_str(csv_column_list), flags=flags, ) response = self.stub.ExportCsvEntityReport(request) return int(response.result) except Exception as err: raise new_exception(err) from err
[docs] def export_csv_entity_report_iterator( self, csv_column_list: str, flags: int = SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS, ) -> Iterable[str]: """TODO: Add method docstring to export_csv_entity_report_iterator.""" try: request = szengine_pb2.StreamExportCsvEntityReportRequest( # type: ignore[unused-ignore] csv_column_list=as_str(csv_column_list), flags=flags ) for item in self.stub.StreamExportCsvEntityReport(request): if item.result: yield item.result except Exception as err: raise new_exception(err) from err
[docs] def export_json_entity_report(self, flags: int = SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS) -> int: try: request = szengine_pb2.ExportJsonEntityReportRequest( # type: ignore[unused-ignore] flags=flags, ) response = self.stub.ExportJsonEntityReport(request) return int(response.result) except Exception as err: raise new_exception(err) from err
[docs] def export_json_entity_report_iterator( self, flags: int = SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS, ) -> Iterable[str]: """TODO: Add method docstring to export_json_entity_report_iterator.""" try: request = szengine_pb2.StreamExportJsonEntityReportRequest(flags=flags) # type: ignore[unused-ignore] for item in self.stub.StreamExportJsonEntityReport(request): if item.result: yield item.result except Exception as err: raise new_exception(err) from err
[docs] def fetch_next(self, export_handle: int) -> str: try: request = szengine_pb2.FetchNextRequest( # type: ignore[unused-ignore] export_handle=export_handle, ) response = self.stub.FetchNext(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_interesting_entities_by_entity_id(self, entity_id: int, flags: int = 0) -> str: try: request = szengine_pb2.FindInterestingEntitiesByEntityIdRequest( # type: ignore[unused-ignore] entity_id=entity_id, flags=flags, ) response = self.stub.FindInterestingEntitiesByEntityId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_interesting_entities_by_record_id(self, data_source_code: str, record_id: str, flags: int = 0) -> str: try: request = szengine_pb2.FindInterestingEntitiesByRecordIdRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.FindInterestingEntitiesByRecordId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_network_by_entity_id( self, entity_ids: List[int], max_degrees: int, build_out_degrees: int, build_out_max_entities: int, flags: int = SzEngineFlags.SZ_FIND_NETWORK_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.FindNetworkByEntityIdRequest( # type: ignore[unused-ignore] entity_ids=entity_ids_json(entity_ids), max_degrees=max_degrees, build_out_degrees=build_out_degrees, build_out_max_entities=build_out_max_entities, flags=flags, ) response = self.stub.FindNetworkByEntityId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_network_by_record_id( self, record_keys: List[Tuple[str, str]], max_degrees: int, build_out_degrees: int, build_out_max_entities: int, flags: int = SzEngineFlags.SZ_FIND_NETWORK_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.FindNetworkByRecordIdRequest( # type: ignore[unused-ignore] record_keys=record_keys_json(record_keys), max_degrees=max_degrees, build_out_degrees=build_out_degrees, build_out_max_entities=build_out_max_entities, flags=flags, ) response = self.stub.FindNetworkByRecordId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_path_by_entity_id( self, start_entity_id: int, end_entity_id: int, max_degrees: int, # TODO Should accept both entity and record IDs in V4, test avoid_entity_ids: Optional[List[int]] = None, required_data_sources: Optional[List[str]] = None, flags: int = SzEngineFlags.SZ_FIND_PATH_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.FindPathByEntityIdRequest( # type: ignore[unused-ignore] start_entity_id=start_entity_id, end_entity_id=end_entity_id, max_degrees=max_degrees, avoid_entity_ids=avoid_entity_ids_json(avoid_entity_ids), required_data_sources=required_data_sources_json(required_data_sources), flags=flags, ) response = self.stub.FindPathByEntityId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def find_path_by_record_id( self, start_data_source_code: str, start_record_id: str, end_data_source_code: str, end_record_id: str, max_degrees: int, avoid_record_keys: Optional[List[Tuple[str, str]]] = None, required_data_sources: Optional[List[str]] = None, flags: int = SzEngineFlags.SZ_FIND_PATH_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.FindPathByRecordIdRequest( # type: ignore[unused-ignore] start_data_source_code=as_str(start_data_source_code), start_record_id=as_str(start_record_id), end_data_source_code=as_str(end_data_source_code), end_record_id=as_str(end_record_id), max_degrees=max_degrees, avoid_record_keys=avoid_record_keys_json(avoid_record_keys), required_data_sources=required_data_sources_json(required_data_sources), flags=flags, ) response = self.stub.FindPathByRecordId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_active_config_id(self) -> int: try: request = szengine_pb2.GetActiveConfigIdRequest() # type: ignore[unused-ignore] response = self.stub.GetActiveConfigId(request) return int(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_entity_by_entity_id( self, entity_id: int, flags: int = SzEngineFlags.SZ_ENTITY_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.GetEntityByEntityIdRequest( # type: ignore[unused-ignore] entity_id=entity_id, flags=flags, ) response = self.stub.GetEntityByEntityId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_entity_by_record_id( self, data_source_code: str, record_id: str, flags: int = SzEngineFlags.SZ_ENTITY_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.GetEntityByRecordIdRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.GetEntityByRecordId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_record( self, data_source_code: str, record_id: str, flags: int = SzEngineFlags.SZ_RECORD_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.GetRecordRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.GetRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_redo_record(self) -> str: try: request = szengine_pb2.GetRedoRecordRequest() # type: ignore[unused-ignore] response = self.stub.GetRedoRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_stats(self) -> str: try: request = szengine_pb2.GetStatsRequest() # type: ignore[unused-ignore] response = self.stub.GetStats(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_virtual_entity_by_record_id( self, record_keys: List[Tuple[str, str]], flags: int = SzEngineFlags.SZ_VIRTUAL_ENTITY_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.GetVirtualEntityByRecordIdRequest( # type: ignore[unused-ignore] record_keys=record_keys_json(record_keys), flags=flags, ) response = self.stub.GetVirtualEntityByRecordId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def how_entity_by_entity_id( self, entity_id: int, flags: int = SzEngineFlags.SZ_HOW_ENTITY_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.HowEntityByEntityIdRequest( # type: ignore[unused-ignore] entity_id=entity_id, flags=flags, ) response = self.stub.HowEntityByEntityId(request) return str(response.result) except Exception as err: raise new_exception(err) from err
def _initialize( self, instance_name: str, settings: Union[str, Dict[Any, Any]], config_id: Optional[int] = None, verbose_logging: int = 0, ) -> None: """Null function in the sz-sdk-python-grpc implementation.""" _ = instance_name _ = settings _ = config_id _ = verbose_logging
[docs] def preprocess_record( self, record_definition: str, flags: int = SzEngineFlags.SZ_RECORD_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.PreprocessRecordRequest( # type: ignore[unused-ignore] record_definition=as_str(record_definition), flags=flags, ) response = self.stub.PreprocessRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def prime_engine(self) -> None: """Null function in the sz-sdk-python-grpc implementation."""
[docs] def process_redo_record(self, redo_record: str, flags: int = 0) -> str: try: request = szengine_pb2.ProcessRedoRecordRequest( # type: ignore[unused-ignore] redo_record=as_str(redo_record), flags=flags, ) response = self.stub.ProcessRedoRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def reevaluate_entity(self, entity_id: int, flags: int = 0) -> str: try: request = szengine_pb2.ReevaluateEntityRequest( # type: ignore[unused-ignore] entity_id=entity_id, flags=flags, ) response = self.stub.ReevaluateEntity(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def reevaluate_record(self, data_source_code: str, record_id: str, flags: int = 0) -> str: try: request = szengine_pb2.ReevaluateRecordRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.ReevaluateRecord(request) return str(response.result) except Exception as err: raise new_exception(err) from err
def _reinitialize(self, config_id: int) -> None: try: request = szengine_pb2.ReinitializeRequest(config_id=config_id) # type: ignore[unused-ignore] self.stub.Reinitialize(request) except Exception as err: raise new_exception(err) from err
[docs] def search_by_attributes( self, attributes: str, flags: int = SzEngineFlags.SZ_SEARCH_BY_ATTRIBUTES_DEFAULT_FLAGS, search_profile: str = "", ) -> str: try: request = szengine_pb2.SearchByAttributesRequest( # type: ignore[unused-ignore] attributes=as_str(attributes), search_profile=as_str(search_profile), flags=flags, ) response = self.stub.SearchByAttributes(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def why_entities( self, entity_id_1: int, entity_id_2: int, flags: int = SzEngineFlags.SZ_WHY_ENTITIES_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.WhyEntitiesRequest( # type: ignore[unused-ignore] entity_id_1=entity_id_1, entity_id_2=entity_id_2, flags=flags, ) response = self.stub.WhyEntities(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def why_record_in_entity( self, data_source_code: str, record_id: str, flags: int = SzEngineFlags.SZ_WHY_RECORDS_DEFAULT_FLAGS, ) -> str: # TODO: Implement after V3 is published. try: request = szengine_pb2.WhyRecordInEntityRequest( # type: ignore[unused-ignore] data_source_code=as_str(data_source_code), record_id=as_str(record_id), flags=flags, ) response = self.stub.WhyRecordInEntity(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def why_records( self, data_source_code_1: str, record_id_1: str, data_source_code_2: str, record_id_2: str, flags: int = SzEngineFlags.SZ_WHY_RECORDS_DEFAULT_FLAGS, ) -> str: try: request = szengine_pb2.WhyRecordsRequest( # type: ignore[unused-ignore] data_source_code_1=as_str(data_source_code_1), record_id_1=as_str(record_id_1), data_source_code_2=as_str(data_source_code_2), record_id_2=as_str(record_id_2), flags=flags, ) response = self.stub.WhyRecords(request) return str(response.result) except Exception as err: raise new_exception(err) from err
# ----------------------------------------------------------------------------- # Helper functions # ----------------------------------------------------------------------------- def entity_ids_json(entity_ids: List[int]) -> str: entity_list = [] for entity_id in entity_ids: entity_list.append({"ENTITY_ID": entity_id}) return json.dumps({"ENTITIES": entity_list}) def record_keys_json(record_keys: List[Tuple[str, str]]) -> str: record_key_list = [] for record_key in record_keys: record_key_list.append({"DATA_SOURCE": record_key[0], "RECORD_ID": record_key[1]}) return json.dumps({"RECORDS": record_key_list}) def avoid_entity_ids_json(avoid_entity_ids: Optional[List[int]] = None) -> str: result = "" if avoid_entity_ids: avoid_entity_id_list = [] for avoid_entity_id in avoid_entity_ids: avoid_entity_id_list.append({"ENTITY_ID": avoid_entity_id}) result = json.dumps({"ENTITIES": avoid_entity_id_list}) return result def avoid_record_keys_json(avoid_record_keys: Optional[List[Tuple[str, str]]] = None) -> str: result = "" if avoid_record_keys: avoid_record_keys_list = [] for avoid_record_key in avoid_record_keys: avoid_record_keys_list.append( { "DATA_SOURCE": avoid_record_key[0], "RECORD_ID": avoid_record_key[1], } ) result = json.dumps({"RECORDS": avoid_record_keys_list}) return result def required_data_sources_json( required_data_sources: Optional[List[str]] = None, ) -> str: result = "" if required_data_sources: required_data_sources_list = [] for required_data_source in required_data_sources: required_data_sources_list.append(required_data_source) result = json.dumps({"DATA_SOURCES": required_data_sources_list}) return result