Files
alrex fbb677a01d use f-strings instead of format (#693)
Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
2021-09-28 19:12:47 +00:00

92 lines
2.8 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
import grpc
from .protobuf import test_server_pb2, test_server_pb2_grpc
SERVER_ID = 1
class TestServer(test_server_pb2_grpc.GRPCTestServerServicer):
# pylint: disable=invalid-name
# pylint: disable=no-self-use
def SimpleMethod(self, request, context):
if request.request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response
def ClientStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response
def ServerStreamingMethod(self, request, context):
if request.request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="server stream error",
)
return test_server_pb2.Response()
# create a generator
def response_messages():
for _ in range(5):
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
yield response
return response_messages()
def BidirectionalStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="bidirectional error",
)
return
for _ in range(5):
yield test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
def create_test_server(port):
# pylint: disable=consider-using-with
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
test_server_pb2_grpc.add_GRPCTestServerServicer_to_server(
TestServer(), server
)
server.add_insecure_port(f"localhost:{port}")
return server