Skip to content

Commit

Permalink
feat: remote dependency
Browse files Browse the repository at this point in the history
Signed-off-by: Frost Ming <[email protected]>
  • Loading branch information
frostming committed Sep 13, 2024
1 parent f163dac commit 8991f7f
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 53 deletions.
31 changes: 22 additions & 9 deletions src/_bentoml_impl/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,39 @@ def __init__(
url: str,
*,
service: Service[T] | None = None,
media_type: str = "application/vnd.bentoml+pickle",
) -> None:
from bentoml.container import BentoMLContainer

svc_config: dict[str, ServiceConfig] = BentoMLContainer.config.services.get()
assert service is not None, "service must be provided"
timeout = (
svc_config.get(service.name, {}).get("traffic", {}).get("timeout") or 60
) * 1.01 # get the service timeout add 1% margin for the client
if service is not None:
svc_config: dict[str, ServiceConfig] = (
BentoMLContainer.config.services.get()
)
timeout = (
svc_config.get(service.name, {}).get("traffic", {}).get("timeout") or 60
) * 1.01 # get the service timeout add 1% margin for the client
else:
timeout = 60
self._sync = SyncHTTPClient(
url,
media_type="application/vnd.bentoml+pickle",
media_type=media_type,
service=service,
timeout=timeout,
server_ready_timeout=0,
)
self._async = AsyncHTTPClient(
url,
media_type="application/vnd.bentoml+pickle",
media_type=media_type,
service=service,
timeout=timeout,
server_ready_timeout=0,
)
self._inner = service.inner
self.endpoints = self._async.endpoints
if service is not None:
self._inner = service.inner
self.endpoints = self._async.endpoints
else:
self.endpoints = {}
self._inner = None
super().__init__()

@property
Expand All @@ -79,6 +88,10 @@ def as_service(self) -> T:
return t.cast(T, self)

def call(self, __name: str, /, *args: t.Any, **kwargs: t.Any) -> t.Any:
if self._inner is None:
raise BentoMLException(
"The proxy is not callable when the service is not provided. Please use `.to_async` or `.to_sync` property."
)
original_func = getattr(self._inner, __name)
if not hasattr(original_func, "func"):
raise BentoMLException(f"calling non-api method {__name} is not allowed")
Expand Down
151 changes: 119 additions & 32 deletions src/_bentoml_sdk/service/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,155 @@
from simple_di import Provide
from simple_di import inject

from bentoml._internal.cloud.client import RestApiClient
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml.exceptions import BentoMLException

from .factory import Service

if t.TYPE_CHECKING:
from _bentoml_impl.client.proxy import RemoteProxy

T = t.TypeVar("T")


_dependent_cache: dict[str, t.Any] = {}
_dependencies: list[Dependency[t.Any]] = []


async def cleanup() -> None:
from _bentoml_impl.client.proxy import RemoteProxy
tasks = [dep.close() for dep in _dependencies]
await asyncio.gather(*tasks)
_dependencies.clear()

coros: list[t.Coroutine[t.Any, t.Any, None]] = []
for svc in _dependent_cache.values():
if isinstance(svc, RemoteProxy):
coros.append(svc.close())
await asyncio.gather(*coros)
_dependent_cache.clear()


@attrs.frozen
@attrs.define
class Dependency(t.Generic[T]):
on: Service[T]
on: Service[T] | None = None
deployment: str | None = None
cluster: str | None = None
url: str | None = None
_resolved: t.Any = attrs.field(default=None, init=False)

@t.overload
def get(self: Dependency[None]) -> RemoteProxy[t.Any]: ...

def cache_key(self) -> str:
return self.on.name
@t.overload
def get(self: Dependency[T]) -> T: ...

@inject
def get(
self,
self: Dependency[T],
*,
runner_mapping: dict[str, str] = Provide[
BentoMLContainer.remote_runner_mapping
],
) -> T:
client: RestApiClient = Provide[BentoMLContainer.rest_api_client],
) -> T | RemoteProxy[t.Any]:
from _bentoml_impl.client.proxy import RemoteProxy

key = self.on.name
if key not in _dependent_cache:
if key in runner_mapping:
inst = RemoteProxy(runner_mapping[key], service=self.on).as_service()
media_type = "application/json"
if self.deployment and self.url:
raise BentoMLException("Cannot specify both deployment and url")
if self.deployment:
deployment = client.v2.get_deployment(self.deployment, self.cluster)
try:
self.url = deployment.urls[0]
except IndexError:
raise BentoMLException(
f"Deployment {self.deployment} does not have any URLs"
)
elif not self.url:
if self.on is None:
raise BentoMLException("Must specify one of on, deployment or url")
if (key := self.on.name) in runner_mapping:
self.url = runner_mapping[key]
media_type = "application/vnd.bentoml+pickle"
else:
inst = self.on()
_dependent_cache[key] = inst
return _dependent_cache[key]
return self.on()

return RemoteProxy(
self.url, service=self.on, media_type=media_type
).as_service()

@t.overload
def __get__(self, instance: None, owner: t.Any) -> t.Self: ...

@t.overload
def __get__(self, instance: None, owner: t.Any) -> Dependency[T]: ...
def __get__(
self: Dependency[None], instance: t.Any, owner: t.Any
) -> RemoteProxy[t.Any]: ...

@t.overload
def __get__(self, instance: t.Any, owner: t.Any) -> T: ...
def __get__(self: Dependency[T], instance: t.Any, owner: t.Any) -> T: ...

def __get__(self, instance: t.Any, owner: t.Any) -> Dependency[T] | T:
def __get__(
self: Dependency[T], instance: t.Any, owner: t.Any
) -> Dependency[T] | RemoteProxy[t.Any] | T:
if instance is None:
return self
return self.get()
if self._resolved is None:
self._resolved = self.get()
_dependencies.append(self)
return self._resolved

def __getattr__(self, name: str) -> t.Any:
raise RuntimeError("Dependancy must be accessed as a class attribute")


def depends(on: Service[T]) -> Dependency[T]:
if not isinstance(on, Service):
raise AttributeError("Dependency must be accessed as a class attribute")

async def close(self) -> None:
if self._resolved is None:
return
await t.cast("RemoteProxy[t.Any]", self._resolved).close()


@t.overload
def depends(
*,
url: str | None = ...,
deployment: str | None = ...,
cluster: str | None = ...,
) -> Dependency[None]: ...


@t.overload
def depends(
on: Service[T],
*,
url: str | None = ...,
deployment: str | None = ...,
cluster: str | None = ...,
) -> Dependency[T]: ...


def depends(
on: Service[T] | None = None,
*,
url: str | None = None,
deployment: str | None = None,
cluster: str | None = None,
) -> Dependency[T]:
"""Create a dependency on other service or deployment
Args:
on: Service[T] | None: The service to depend on.
url: str | None: The URL of the service to depend on.
deployment: str | None: The deployment of the service to depend on.
cluster: str | None: The cluster of the service to depend on.
Examples:
.. code-block:: python
@bentoml.service
class MyService:
# depends on a service
svc_a = bentoml.depends(SVC_A)
# depends on a deployment
svc_b = bentoml.depends(deployment="ci-iris")
# depends on a remote service with url
svc_c = bentoml.depends(url="http://192.168.1.1:3000")
# For the latter two cases, the service can be given to provide more accurate types:
svc_d = bentoml.depends(url="http://192.168.1.1:3000", on=SVC_D)
"""
if on is not None and not isinstance(on, Service):
raise TypeError("depends() expects a class decorated with @bentoml.service()")
return Dependency(on)
return Dependency(on, url=url, deployment=deployment, cluster=cluster)
13 changes: 9 additions & 4 deletions src/_bentoml_sdk/service/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ def __attrs_post_init__(self) -> None:
for field in dir(self.inner):
value = getattr(self.inner, field)
if isinstance(value, Dependency):
self.dependencies[field] = t.cast(Dependency[t.Any], value)
self.dependencies[field] = value
elif isinstance(value, StoredModel):
logger.warning(
"`bentoml.models.get()` as the class attribute is not recommended because it requires the model"
f" to exist at import time. Use `{value._name} = BentoModel({str(value.tag)!r})` instead."
f" to exist at import time. Use `{value._attr} = BentoModel({str(value.tag)!r})` instead."
)
self.models.append(BentoModel(value.tag))
elif isinstance(value, Model):
Expand Down Expand Up @@ -141,9 +141,12 @@ def find_dependent(self, name_or_path: str) -> Service[t.Any]:
return self.all_services()[attr_name]
else:
raise ValueError(f"Service {attr_name} not found")
dependent = self.dependencies[attr_name]
if dependent.on is None:
raise ValueError(f"Service {attr_name} not found")
if path:
return self.dependencies[attr_name].on.find_dependent(path)
return self.dependencies[attr_name].on
return dependent.on.find_dependent(path)
return dependent

@property
def url(self) -> str | None:
Expand All @@ -157,6 +160,8 @@ def all_services(self) -> dict[str, Service[t.Any]]:
"""Get a map of the service and all recursive dependencies"""
services: dict[str, Service[t.Any]] = {self.name: self}
for dependency in self.dependencies.values():
if dependency.on is None:
continue
dependents = dependency.on.all_services()
conflict = next(
(
Expand Down
50 changes: 48 additions & 2 deletions src/bentoml/_internal/bento/bento.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

from _bentoml_sdk import Service as NewService
from _bentoml_sdk.service import ServiceConfig
from _bentoml_sdk.service.dependency import Dependency

from ..cloud.schemas.modelschemas import BentoManifestSchema
from ..models import Model as StoredModel
Expand Down Expand Up @@ -605,12 +606,29 @@ def from_bento_model(
)


@attr.frozen
class BentoDependencyInfo:
service: t.Optional[str] = None
deployment: t.Optional[str] = None
cluster: t.Optional[str] = None
url: t.Optional[str] = None

@classmethod
def from_dependency(cls, d: Dependency[t.Any]) -> BentoDependencyInfo:
return cls(
service=d.on.name if d.on is not None else None,
deployment=d.deployment,
cluster=d.cluster,
url=d.url,
)


@attr.frozen
class BentoServiceInfo:
name: str
service: str
models: t.List[BentoModelInfo] = attr.field(factory=list, eq=False)
dependencies: t.List[str] = attr.field(factory=list, eq=False)
dependencies: t.List[BentoDependencyInfo] = attr.field(factory=list, eq=False)
config: ServiceConfig = attr.field(factory=dict, eq=False)

@classmethod
Expand All @@ -619,7 +637,10 @@ def from_service(cls, svc: NewService[t.Any]) -> BentoServiceInfo:
name=svc.name,
service="",
models=[m.to_info() for m in svc.models],
dependencies=[d.on.name for d in svc.dependencies.values()],
dependencies=[
BentoDependencyInfo.from_dependency(d)
for d in svc.dependencies.values()
],
config=svc.config,
)

Expand Down Expand Up @@ -738,6 +759,31 @@ def validate(self):
...


bentoml_cattr.register_unstructure_hook(
BentoDependencyInfo,
make_dict_unstructure_fn(
BentoDependencyInfo,
bentoml_cattr,
service=override(omit_if_default=True),
deployment=override(omit_if_default=True),
cluster=override(omit_if_default=True),
url=override(omit_if_default=True),
),
)


def _convert_bento_dependency_info(
data: str | dict[str, t.Any], typ: type[BentoDependencyInfo]
) -> BentoDependencyInfo:
if isinstance(data, str):
return BentoDependencyInfo(service=data)
return BentoDependencyInfo(**data)


bentoml_cattr.register_structure_hook(
BentoDependencyInfo, _convert_bento_dependency_info
)

bentoml_cattr.register_structure_hook_func(
lambda cls: inspect.isclass(cls) and issubclass(cls, BentoInfo),
make_dict_structure_fn(
Expand Down
4 changes: 3 additions & 1 deletion src/bentoml/_internal/bento/build_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,9 @@ class BentoPathSpec:
recurse_exclude_spec: list[tuple[str, PathSpec]] = attr.field(init=False)
# we want to ignore .git and venv folders in cases they are very large.
extra: PathSpec = attr.field(
default=PathSpec.from_lines("gitwildmatch", [".git/", ".venv/", "venv/"]),
default=PathSpec.from_lines(
"gitwildmatch", [".git/", ".venv/", "venv/", "__pycache__/"]
),
init=False,
)

Expand Down
5 changes: 4 additions & 1 deletion src/bentoml/_internal/configuration/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ def enabled_features() -> list[str]:
def new_index(self) -> bool:
return "new_index" in self.enabled_features.get()

cloud_context = providers.Static[t.Optional[str]](None)
@providers.SingletonFactory
@staticmethod
def cloud_context() -> str | None:
return os.getenv("BENTOML_CLOUD_CONTEXT")

@providers.SingletonFactory
@staticmethod
Expand Down
Loading

0 comments on commit 8991f7f

Please sign in to comment.