Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray[serve] gRPC] Unable to pickle protobuf objects #45351

Open
mgierada opened this issue May 15, 2024 · 8 comments · May be fixed by #45862
Open

[Ray[serve] gRPC] Unable to pickle protobuf objects #45351

mgierada opened this issue May 15, 2024 · 8 comments · May be fixed by #45862
Assignees
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical serve Ray Serve Related Issue

Comments

@mgierada
Copy link

mgierada commented May 15, 2024

What happened + What you expected to happen

I went through the documentation with 1 to 1 proto file but I keep getting the following errors when running the app.

    Serializing 'Multiplexing' <function GrpcDeployment.Multiplexing at 0x111c63240>...
    !!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
    Detected 2 global variables. Checking serializability...
        Serializing 'serve' <module 'ray.serve' from '/Users/.../...app-v2/.venv/lib/python3.11/site-packages/ray/serve/__init__.py'>...
        Serializing 'UserDefinedResponse2' <class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>...
        !!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
            Serializing 'RegisterExtension' <function Message.RegisterExtension at 0x101aedda0>...
            Serializing '_SetListener' <function Message._SetListener at 0x101aede40>...
            Serializing '__getstate__' <function Message.__getstate__ at 0x101aedee0>...
            Serializing '__reduce__' <function Message.__reduce__ at 0x101aee020>...
            Serializing '__setstate__' <function Message.__setstate__ at 0x101aedf80>...
            Serializing '__unicode__' <function Message.__unicode__ at 0x101aed120>...
            Serializing 'ByteSize' <method 'ByteSize' of 'google._upb._message.Message' objects>...
            Serializing 'Clear' <method 'Clear' of 'google._upb._message.Message' objects>...
            Serializing 'ClearExtension' <method 'ClearExtension' of 'google._upb._message.Message' objects>...
            Serializing 'ClearField' <method 'ClearField' of 'google._upb._message.Message' objects>...
            Serializing 'CopyFrom' <method 'CopyFrom' of 'google._upb._message.Message' objects>...
            Serializing 'DESCRIPTOR' <google._upb._message.Descriptor object at 0x111c7ae40>...
            !!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
================================================================================
Variable:
                                                                                                                                                                                                           
        FailTuple(DESCRIPTOR [obj=<google._upb._message.Descriptor object at 0x111c7ae40>, parent=<class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>])
FailTuple(DESCRIPTOR [obj=<google._upb._message.Descriptor object at 0x111c7ae40>, parent=<class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>])
                                                                                                                                                                                                           
was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
================================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================


Similar issue was posted here by someone else https://discuss.ray.io/t/keep-getting-error-typeerror-cannot-pickle-classmethod-descriptor-object/10153

Versions / Dependencies

grpcio-tools==1.60.0
grpcio==1.60.0
protobuf==4.25.1
ray[serve]==2.22.0

but I tired so many version and combination including the latest grpcio-tools, grpcio and protobuf as well as the very old one. The issue is still the same.

I am running python 3.11.9 and apple silicon.

Reproduction script

Please follow the official documentation https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html

Issue Severity

High: It blocks me from completing my task.

@mgierada mgierada added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels May 15, 2024
@anyscalesam anyscalesam added the serve Ray Serve Related Issue label May 20, 2024
@shrekris-anyscale shrekris-anyscale added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels May 28, 2024
@GeneDer GeneDer self-assigned this May 28, 2024
@GeneDer
Copy link
Contributor

GeneDer commented May 28, 2024

@mgierada I just tried it again on Ray master and it has no issue. The only difference in the dependencies is I'm on protobuf==4.21.12 so not sure if that's the cause.

Otherwise you can probably try those files generated from my version to see if it solves the issue

#### user_defined_protos_pb2.py ####

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: user_defined_protos.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x19user_defined_protos.proto\x12\x11userdefinedprotos\"?\n\x12UserDefinedMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06origin\x18\x02 \x01(\t\x12\x0b\n\x03num\x18\x03 \x01(\x03\"4\n\x13UserDefinedResponse\x12\x10\n\x08greeting\x18\x01 \x01(\t\x12\x0b\n\x03num\x18\x02 \x01(\x03\"\x15\n\x13UserDefinedMessage2\"(\n\x14UserDefinedResponse2\x12\x10\n\x08greeting\x18\x01 \x01(\t\"*\n\tImageData\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\x10\n\x08\x66ilename\x18\x02 \x01(\t\"4\n\nImageClass\x12\x0f\n\x07\x63lasses\x18\x01 \x03(\t\x12\x15\n\rprobabilities\x18\x02 \x03(\x02\x32\xae\x02\n\x12UserDefinedService\x12Y\n\x08__call__\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse\x12_\n\x0cMultiplexing\x12&.userdefinedprotos.UserDefinedMessage2\x1a\'.userdefinedprotos.UserDefinedResponse2\x12\\\n\tStreaming\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse0\x01\x32\x64\n\x1aImageClassificationService\x12\x46\n\x07Predict\x12\x1c.userdefinedprotos.ImageData\x1a\x1d.userdefinedprotos.ImageClassB:\n#io.ray.examples.user_defined_protosB\x11UserDefinedProtosP\x01\x62\x06proto3')

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_defined_protos_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  DESCRIPTOR._serialized_options = b'\n#io.ray.examples.user_defined_protosB\021UserDefinedProtosP\001'
  _USERDEFINEDMESSAGE._serialized_start=48
  _USERDEFINEDMESSAGE._serialized_end=111
  _USERDEFINEDRESPONSE._serialized_start=113
  _USERDEFINEDRESPONSE._serialized_end=165
  _USERDEFINEDMESSAGE2._serialized_start=167
  _USERDEFINEDMESSAGE2._serialized_end=188
  _USERDEFINEDRESPONSE2._serialized_start=190
  _USERDEFINEDRESPONSE2._serialized_end=230
  _IMAGEDATA._serialized_start=232
  _IMAGEDATA._serialized_end=274
  _IMAGECLASS._serialized_start=276
  _IMAGECLASS._serialized_end=328
  _USERDEFINEDSERVICE._serialized_start=331
  _USERDEFINEDSERVICE._serialized_end=633
  _IMAGECLASSIFICATIONSERVICE._serialized_start=635
  _IMAGECLASSIFICATIONSERVICE._serialized_end=735
# @@protoc_insertion_point(module_scope)

#### user_defined_protos_pb2_grpc.py ####
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import user_defined_protos_pb2 as user__defined__protos__pb2


class UserDefinedServiceStub(object):
    """Missing associated documentation comment in .proto file."""

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.__call__ = channel.unary_unary(
                '/userdefinedprotos.UserDefinedService/__call__',
                request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
                response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
                )
        self.Multiplexing = channel.unary_unary(
                '/userdefinedprotos.UserDefinedService/Multiplexing',
                request_serializer=user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
                response_deserializer=user__defined__protos__pb2.UserDefinedResponse2.FromString,
                )
        self.Streaming = channel.unary_stream(
                '/userdefinedprotos.UserDefinedService/Streaming',
                request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
                response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
                )


class UserDefinedServiceServicer(object):
    """Missing associated documentation comment in .proto file."""

    def __call__(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')

    def Multiplexing(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')

    def Streaming(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_UserDefinedServiceServicer_to_server(servicer, server):
    rpc_method_handlers = {
            '__call__': grpc.unary_unary_rpc_method_handler(
                    servicer.__call__,
                    request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
                    response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
            ),
            'Multiplexing': grpc.unary_unary_rpc_method_handler(
                    servicer.Multiplexing,
                    request_deserializer=user__defined__protos__pb2.UserDefinedMessage2.FromString,
                    response_serializer=user__defined__protos__pb2.UserDefinedResponse2.SerializeToString,
            ),
            'Streaming': grpc.unary_stream_rpc_method_handler(
                    servicer.Streaming,
                    request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
                    response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'userdefinedprotos.UserDefinedService', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class UserDefinedService(object):
    """Missing associated documentation comment in .proto file."""

    @staticmethod
    def __call__(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/__call__',
            user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
            user__defined__protos__pb2.UserDefinedResponse.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

    @staticmethod
    def Multiplexing(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/Multiplexing',
            user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
            user__defined__protos__pb2.UserDefinedResponse2.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

    @staticmethod
    def Streaming(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_stream(request, target, '/userdefinedprotos.UserDefinedService/Streaming',
            user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
            user__defined__protos__pb2.UserDefinedResponse.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)


class ImageClassificationServiceStub(object):
    """Missing associated documentation comment in .proto file."""

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.Predict = channel.unary_unary(
                '/userdefinedprotos.ImageClassificationService/Predict',
                request_serializer=user__defined__protos__pb2.ImageData.SerializeToString,
                response_deserializer=user__defined__protos__pb2.ImageClass.FromString,
                )


class ImageClassificationServiceServicer(object):
    """Missing associated documentation comment in .proto file."""

    def Predict(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_ImageClassificationServiceServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'Predict': grpc.unary_unary_rpc_method_handler(
                    servicer.Predict,
                    request_deserializer=user__defined__protos__pb2.ImageData.FromString,
                    response_serializer=user__defined__protos__pb2.ImageClass.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'userdefinedprotos.ImageClassificationService', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class ImageClassificationService(object):
    """Missing associated documentation comment in .proto file."""

    @staticmethod
    def Predict(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.ImageClassificationService/Predict',
            user__defined__protos__pb2.ImageData.SerializeToString,
            user__defined__protos__pb2.ImageClass.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@mgierada
Copy link
Author

mgierada commented May 31, 2024

Hey @GeneDer Thanks for taking a look. This is still not working for me. I made sure I am running the same version of protobuf as you. Also, I copy-paste the compiled code you shared. I am still facing issues with serialization. Is there anything else you do that I might be missing? Any additional conifg/setup? Apparently, ray is not able to serialize the protobuf objects.

@GeneDer
Copy link
Contributor

GeneDer commented May 31, 2024

oh actually another difference, I'm running on Python 3.10.12. Maybe try it out. Also my pickle version is 4.0 not sure if that makes a difference.

@mgierada
Copy link
Author

mgierada commented Jun 4, 2024

I tired with python 3.10.12 and with pickle 4.0 @GeneDer but I still got the same error.

Here's what I am running, maybe I missed something obvious.

  • I am copy pasting protos defined in docs to ./src/protos/user_defined_protos.proto
// user_defined_protos.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.ray.examples.user_defined_protos";
option java_outer_classname = "UserDefinedProtos";

package userdefinedprotos;

message UserDefinedMessage {
  string name = 1;
  string origin = 2;
  int64 num = 3;
}

message UserDefinedResponse {
  string greeting = 1;
  int64 num = 2;
}

message UserDefinedMessage2 {}

message UserDefinedResponse2 {
  string greeting = 1;
}

message ImageData {
  string url = 1;
  string filename = 2;
}

message ImageClass {
  repeated string classes = 1;
  repeated float probabilities = 2;
}

service UserDefinedService {
  rpc __call__(UserDefinedMessage) returns (UserDefinedResponse);
  rpc Multiplexing(UserDefinedMessage2) returns (UserDefinedResponse2);
  rpc Streaming(UserDefinedMessage) returns (stream UserDefinedResponse);
}

service ImageClassificationService {
  rpc Predict(ImageData) returns (ImageClass);
}
  • compiling protos
python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./src/protos/user_defined_protos.proto
  • Starting ray cluster making sure gRPC is enabled
	ray start --head
	serve start \
  	  --grpc-port 9000 \
  	  --grpc-servicer-functions protos.user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server

That correctly starts the ray cluster.

  • Creating a simple deployment script (again, copy paste from docs) in ./src/simple_deploy.py
import time

from typing import Generator
from protos.user_defined_protos_pb2 import (
    UserDefinedMessage,
    UserDefinedMessage2,
    UserDefinedResponse,
    UserDefinedResponse2,
)

import ray
from ray import serve


@serve.deployment
class GrpcDeployment:
    def __call__(self, user_message: UserDefinedMessage) -> UserDefinedResponse:
        greeting = f"Hello {user_message.name} from {user_message.origin}"
        num = user_message.num * 2
        user_response = UserDefinedResponse(
            greeting=greeting,
            num=num,
        )
        return user_response

    @serve.multiplexed(max_num_models_per_replica=1)
    async def get_model(self, model_id: str) -> str:
        return f"loading model: {model_id}"

    async def Multiplexing(self, user_message: UserDefinedMessage2) -> UserDefinedResponse2:
        model_id = serve.get_multiplexed_model_id()
        model = await self.get_model(model_id)
        user_response = UserDefinedResponse2(
            greeting=f"Method2 called model, {model}",
        )
        return user_response

    def Streaming(
        self, user_message: UserDefinedMessage
    ) -> Generator[UserDefinedResponse, None, None]:
        for i in range(10):
            greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
            num = user_message.num * 2 + i
            user_response = UserDefinedResponse(
                greeting=greeting,
                num=num,
            )
            yield user_response

            time.sleep(0.1)


g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
  • trying to run python ./src/simple_deploy and got the error from the issue description above

@GeneDer
Copy link
Contributor

GeneDer commented Jun 4, 2024

What if you don't put those in a sub-directory and instead just generate in the same root?

@mgierada
Copy link
Author

mgierada commented Jun 7, 2024

Nah, unfortunately that is not helping much,

@mgierada
Copy link
Author

mgierada commented Jun 11, 2024

Oh I finally solved that issue. Docs are very misleading on this. The cause was that my proto imports were out of scope. Moving them close to the place they are used made a difference. So instead of importing on top of the file like this

import time

from typing import Generator
from protos.user_defined_protos_pb2 import (
    UserDefinedMessage,
    UserDefinedMessage2,
    UserDefinedResponse,
    UserDefinedResponse2,
)

import ray
from ray import serve
# ... ray server logic

do this

# file name ./src/deploy.py
import time
from typing import Generator

from ray import serve


@serve.deployment
class GrpcDeployment:
    def __call__(self, user_message):
        from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse

        greeting = f"Hello {user_message.name} from {user_message.origin}"
        num = user_message.num * 2
        user_response = UserDefinedResponse(
            greeting=greeting,
            num=num,
        )
        return user_response

    @serve.multiplexed(max_num_models_per_replica=1)
    async def get_model(self, model_id: str) -> str:
        return f"loading model: {model_id}"

    async def Multiplexing(self, user_message):
        from user_defined_protos_pb2 import UserDefinedMessage2, UserDefinedResponse2

        model_id = serve.get_multiplexed_model_id()
        model = await self.get_model(model_id)
        user_response = UserDefinedResponse2(
            greeting=f"Method2 called model, {model}",
        )
        return user_response

    def Streaming(self, user_message) -> Generator:
        from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse

        for i in range(10):
            greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
            num = user_message.num * 2 + i
            user_response = UserDefinedResponse(
                greeting=greeting,
                num=num,
            )
            yield user_response
            time.sleep(0.1)


g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")

Then, assuming your ray is available on the localhost or whatever, deploy by running the file

python src/deploy.py

@mgierada mgierada linked a pull request Jun 11, 2024 that will close this issue
8 tasks
@mgierada
Copy link
Author

The following PR fixes the docs:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical serve Ray Serve Related Issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants