Source code for senzing_grpc.szproduct

#! /usr/bin/env python3

"""
``senzing_grpc.szproduct.SzProductGrpc`` is a `gRPC`_ implementation
of the `senzing.szproduct.SzProduct`_ interface.

.. _gRPC: https://grpc.io
.. _senzing.szproduct.SzProduct: https://garage.senzing.com/sz-sdk-python/senzing.html#module-senzing.szproduct
"""

# pylint: disable=E1101

from types import TracebackType
from typing import Any, Dict, Type, Union

import grpc
from senzing import SzProduct

from .pb2_grpc import szproduct_pb2, szproduct_pb2_grpc
from .szhelpers import new_exception

# Metadata

__all__ = ["SzProductGrpc"]
__version__ = "0.0.1"  # See https://www.python.org/dev/peps/pep-0396/
__date__ = "2025-01-10"
__updated__ = "2025-01-16"

SENZING_PRODUCT_ID = (
    "5056"  # See https://github.com/senzing-garage/knowledge-base/blob/main/lists/senzing-component-ids.md
)

# -----------------------------------------------------------------------------
# SzProductGrpc class
# -----------------------------------------------------------------------------


[docs] class SzProductGrpc(SzProduct): """ SzProduct module access library over gRPC. """ # ------------------------------------------------------------------------- # Python dunder/magic methods # ------------------------------------------------------------------------- def __init__( self, grpc_channel: grpc.Channel, ) -> None: """ Constructor For return value of -> None, see https://peps.python.org/pep-0484/#the-meaning-of-annotations """ # pylint: disable=W0613 self.channel = grpc_channel self.stub = szproduct_pb2_grpc.SzProductStub(self.channel) def __enter__( self, ) -> Any: # TODO: Replace "Any" with "Self" once python 3.11 is lowest supported python version. """Context Manager method.""" return self def __exit__( self, exc_type: Union[Type[BaseException], None], exc_val: Union[BaseException, None], exc_tb: Union[TracebackType, None], ) -> None: """Context Manager method.""" # ------------------------------------------------------------------------- # SzProduct methods # ------------------------------------------------------------------------- def _destroy(self) -> None: """Null function in the sz-sdk-python-grpc implementation.""" def _initialize( self, instance_name: str, settings: Union[str, Dict[Any, Any]], verbose_logging: int = 0, ) -> None: """Null function in the sz-sdk-python-grpc implementation.""" _ = instance_name _ = settings _ = verbose_logging
[docs] def get_license(self) -> str: try: request = szproduct_pb2.GetLicenseRequest() # type: ignore[unused-ignore] response = self.stub.GetLicense(request) return str(response.result) except Exception as err: raise new_exception(err) from err
[docs] def get_version(self) -> str: try: request = szproduct_pb2.GetVersionRequest() # type: ignore[unused-ignore] response = self.stub.GetVersion(request) return str(response.result) except Exception as err: raise new_exception(err) from err