# Copyright (c) 2016 Uber Technologies, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from opentracing_instrumentation.interceptors import ClientInterceptors
from tornado import gen
from tchannel.tracing import (
ClientTracer, TChannelOpenTracingClientInterceptor)
from ..event import EventType
from . import THRIFT
[docs]class ThriftArgScheme(object):
"""Handler registration and serialization for Thrift.
Use :py:func:`tchannel.thrift.load` to parse your Thrift IDL and compile
it into a module dynamically.
.. code:: python
from tchannel import thrift
keyvalue = thrift.load('keyvalue.thrift', service='keyvalue')
To register a Thrift handler, use the ``register()`` decorator, providing
a reference to the compiled service as an argument. The name of the
service method should match the name of the decorated function.
.. code:: python
tchannel = TChannel(...)
@tchannel.thrift.register(keyvalue.KeyValue)
def setValue(request):
data[request.body.key] = request.body.value
Use methods on the compiled service to generate requests to remote
services and execute them via ``TChannel.thrift()``.
.. code:: python
response = yield tchannel.thrift(
keyvalue.KeyValue.setValue(key='foo', value='bar')
)
"""
NAME = THRIFT
def __init__(self, tchannel):
self._tchannel = tchannel
self.tracer = ClientTracer(channel=tchannel)
@gen.coroutine
[docs] def __call__(
self,
request,
headers=None,
timeout=None,
retry_on=None,
retry_limit=None,
shard_key=None,
trace=None,
hostport=None,
routing_delegate=None,
caller_name=None,
):
"""Make a Thrift TChannel request.
Returns a ``Response`` containing the return value of the Thrift
call (if any). If the remote server responded with a Thrift exception,
that exception is raised.
:param string request:
Request obtained by calling a method on service objects generated
by :py:func:`tchannel.thrift.load`.
:param dict headers:
Dictionary of header key-value pairs.
:param float timeout:
How long to wait (in seconds) before raising a ``TimeoutError`` -
this defaults to ``tchannel.glossary.DEFAULT_TIMEOUT``.
:param string retry_on:
What events to retry on - valid values can be found in
``tchannel.retry``.
:param int retry_limit:
How many attempts should be made (in addition to the initial
attempt) to re-send this request when retryable error conditions
(specified by ``retry_on``) are encountered.
Defaults to ``tchannel.retry.DEFAULT_RETRY_LIMIT`` (4).
Note that the maximum possible time elapsed for a request is thus
``(retry_limit + 1) * timeout``.
:param string shard_key:
Set the ``sk`` transport header for Ringpop request routing.
:param int trace:
Flags for tracing.
:param string hostport:
A 'host:port' value to use when making a request directly to a
TChannel service, bypassing Hyperbahn. This value takes precedence
over the ``hostport`` specified to
:py:func:`tchannel.thrift.load`.
:param routing_delegate:
Name of a service to which the request router should forward the
request instead of the service specified in the call req.
:param caller_name:
Name of the service making the request. Defaults to the name
provided when the TChannel was instantiated.
:rtype: Response
"""
if not headers:
headers = {}
span, headers = self.tracer.start_span(
service=request.service, endpoint=request.endpoint,
headers=headers, hostport=hostport, encoding='thrift'
)
yield self._tchannel._dep_tchannel.event_emitter.fire(
EventType.before_serialize_request_headers,
headers,
request.service,
)
# fire interceptors
for interceptor in ClientInterceptors.get_interceptors():
if isinstance(interceptor, TChannelOpenTracingClientInterceptor):
interceptor.process(span=span, headers=headers,
service=request.service, encoding='thrift')
serializer = request.get_serializer()
# serialize
try:
headers = serializer.serialize_header(headers=headers)
except (AttributeError, TypeError):
raise ValueError(
'headers must be a map[string]string (a shallow dict'
' where keys and values are strings)'
)
body = serializer.serialize_body(request.call_args)
# TODO There's only one yield. Drop in favor of future+callback.
response = yield self._tchannel.call(
scheme=self.NAME,
service=request.service,
arg1=request.endpoint,
arg2=headers,
arg3=body,
timeout=timeout,
retry_on=retry_on,
retry_limit=retry_limit,
hostport=hostport or request.hostport,
shard_key=shard_key,
trace=trace,
tracing_span=span, # span is finished in PeerClientOperation.send
routing_delegate=routing_delegate,
caller_name=caller_name,
)
response.headers = serializer.deserialize_header(
headers=response.headers
)
body = serializer.deserialize_body(body=response.body)
response.body = request.read_body(body)
raise gen.Return(response)
def register(self, thrift_module, **kwargs):
# dat circular import
from tchannel.thrift import rw as thriftrw
if isinstance(thrift_module, thriftrw.Service):
# Dirty hack to support thriftrw and old API
return thriftrw.register(
# TODO drop deprecated tchannel
self._tchannel._dep_tchannel._handler,
thrift_module,
**kwargs
)
else:
return self._tchannel.register(
scheme=self.NAME,
endpoint=thrift_module,
**kwargs
)