Source code for senzing_grpc.szabstractfactory

#! /usr/bin/env python3

"""
``senzing_grpc.szabstractfactory.SzAbstractFactoryGrpc`` is a `gRPC`_ implementation
of the `senzing.szabstractfactory.SzAbstractFactory`_ interface.

.. _gRPC: https://grpc.io
.. _senzing.szabstractfactory.SzAbstractFactory: https://garage.senzing.com/sz-sdk-python/senzing.html#module-senzing.szabstractfactory
"""

# pylint: disable=E1101

from types import TracebackType
from typing import Any, Type, TypedDict, Union

import grpc
from senzing import (
    SzAbstractFactory,
    SzConfigManager,
    SzDiagnostic,
    SzEngine,
    SzProduct,
)

from .szconfigmanager import SzConfigManagerGrpc
from .szdiagnostic import SzDiagnosticGrpc
from .szengine import SzEngineGrpc
from .szproduct import SzProductGrpc

# Metadata

__all__ = ["SzAbstractFactoryGrpc", "SzAbstractFactoryParametersGrpc"]
__version__ = "0.0.1"  # See https://www.python.org/dev/peps/pep-0396/
__date__ = "2025-01-10"
__updated__ = "2025-01-16"


# -----------------------------------------------------------------------------
# SzAbstractFactoryParametersGrpc class
# -----------------------------------------------------------------------------


[docs] class SzAbstractFactoryParametersGrpc(TypedDict, total=False): """ SzAbstractFactoryParameters is used to create a dictionary that can be unpacked when creating an SzAbstractFactory. """ grpc_channel: grpc.Channel
# ----------------------------------------------------------------------------- # SzAbstractFactoryGrpc class # -----------------------------------------------------------------------------
[docs] class SzAbstractFactoryGrpc(SzAbstractFactory): """ SzAbstractFactory module is a factory pattern for accessing Senzing over gRPC. """ # ------------------------------------------------------------------------- # Python dunder/magic methods # ------------------------------------------------------------------------- def __init__( self, grpc_channel: grpc.Channel, ) -> None: """ Constructor For return value of -> None, see https://peps.python.org/pep-0484/#the-meaning-of-annotations """ self.channel = grpc_channel def __enter__( self, ) -> Any: # TODO: Replace "Any" with "Self" once python 3.11 is lowest supported python version. """Context Manager method.""" return self def __exit__( self, exc_type: Union[Type[BaseException], None], exc_val: Union[BaseException, None], exc_tb: Union[TracebackType, None], ) -> None: """Context Manager method.""" # ------------------------------------------------------------------------- # SzAbstractFactory methods # -------------------------------------------------------------------------
[docs] def create_configmanager(self) -> SzConfigManager: return SzConfigManagerGrpc(grpc_channel=self.channel)
[docs] def create_diagnostic(self) -> SzDiagnostic: return SzDiagnosticGrpc(grpc_channel=self.channel)
[docs] def create_engine(self) -> SzEngine: return SzEngineGrpc(grpc_channel=self.channel)
[docs] def create_product(self) -> SzProduct: return SzProductGrpc(grpc_channel=self.channel)
[docs] def reinitialize(self, config_id: int) -> None: sz_diagonstic = SzDiagnosticGrpc(grpc_channel=self.channel) sz_diagonstic.reinitialize(config_id=config_id) # pylint: disable=W0212 sz_engine = SzEngineGrpc(grpc_channel=self.channel) sz_engine.reinitialize(config_id=config_id) # pylint: disable=W0212