基与python的GRPC SSL/TLS加密及Token鉴权

SSL/TLS加密

1 // 创建grpc_ssl_key.pem和grpc_ssl_cert.pem
2 // 其中务必事先指定, 后续需要用到
3 openssl req -subj "/CN=black-ip.yazx.com" -x509 -newkey rsa:4096 -days 7200 \
4     -keyout grpc-ssl-key.pem \
5     -out grpc-ssl-cert.pem
6 // 创建grpc_ssl_server.pem
7 openssl rsa -in grpc-ssl-key.pem -out grpc-ssl-server.pem
server.py

from grpc.experimental import aio
from typing import AnyStr
from dist import hello_world_pb2
from dist import hello_world_pb2_grpc

实现具体的grpc函数
class TestServicer(hello_world_pb2_grpc.TestServicer):

    async def Hello(self, request, context) -> hello_world_pb2.Result:
        return hello_world_pb2.Result(reply=f"hello {request.name}")

读取密钥二进制数据
def read_file(path: str, binary: bool) -> AnyStr:
    with open(path, 'rb' if binary else 'r') as f:
        return f.read()

创建grpc服务端证书
def create_ssl_server_credentials() -> aio.grpc.ChannelCredentials:
    return aio.grpc.ssl_server_credentials(
        private_key_certificate_chain_pairs=(
            (
                read_file('grpc_ssl_server.pem', True),
                read_file('grpc_ssl_cert.pem', True),
            ),
        )
    )

运行grpc服务
async def run(host: str, port: int) -> None:
    server = aio.server()
    server_credetials = create_ssl_server_credentials()
    server.add_secure_port(f'[::]:{port}', server_credetials)
    hello_world_pb2_grpc.add_TestServicer_to_server(TestServicer(), server)
    await server.start()
    await server.wait_for_termination()

客户端加载密钥

client.py

from grpc.experimental import aio
from typing import AnyStr
from dist import hello_world_pb2
from dist import hello_world_pb2_grpc

创建grpc客户端证书
def create_ssl_channel_credentials() -> aio.grpc.ChannelCredentials:
    return aio.grpc.ssl_channel_credentials(
        root_certificates=read_file('grpc_ssl_cert.pem')
    )

运行grpc客户端
async def run(host: str, port: int) -> None:
    options = (('grpc.ssl_target_name_override', ''),)
    kwargs = {
        'target': f"{host}:{port}",
        'options': (
            ("grpc.lb_policy_name", "round_robin"),  # 自动根据dns域名解析服务列表
            *options
        )
    }
    channel_credentials = create_ssl_channel_credentials()
    creds = aio.grpc.composite_channel_credentials(channel_credentials)
    channel = aio.secure_channel(**kwargs, credentials=creds)
    await channel.channel_ready()
    stub = hello_world_pb2_grpc.TestStub(channel)
    result = await stub.Hello(hello_world_pb2.User(name='world'))

Token鉴权

grpc强制Token鉴权必须使用SSL/TLS加密

实现Token校验

token.py

from typing import Callable, List, Any
from grpc.experimental import aio

class BearerToken(object):
    code: aio.grpc.StatusCode = aio.grpc.StatusCode.UNAUTHENTICATED
    details: str = 'bad bearer token'

    def __init__(self, token: str) -> None:
        self.token = token

    def __call__(self, func) -> Callable:
        async def wrapper(inner_self, request, context: aio.ServicerContext) -> Any:
            metadata = context.invocation_metadata()
            for item in metadata:
                if item[0] == 'authorization' and item[1] == f'Bearer {self.token}':
                    return await func(inner_self, request, context)
            await context.abort(
                code=self.code,
                details=self.details,
                trailing_metadata=metadata
            )

        return wrapper

服务端校验Token

server.py

from grpc.experimental import aio
from typing import AnyStr
from dist import hello_world_pb2
from dist import hello_world_pb2_grpc
from token import BearerToken

实现具体的grpc函数
class TestServicer(hello_world_pb2_grpc.TestServicer):

    @BearerToken(token='xxx')
    async def Hello(self, request, context) -> hello_world_pb2.Result:
        return hello_world_pb2.Result(reply=f"hello {request.name}")

客户端注入Token

client.py

from grpc.experimental import aio

token = 'xxx'

创建Token认证
def create_access_token_credentials(cls) -> aio.grpc.CallCredentials:
    return aio.grpc.access_token_call_credentials(token)

运行grpc客户端
async def run(host: str, port: int) -> None:
    options = (('grpc.ssl_target_name_override', ''),)
    kwargs = {
        'target': f"{host}:{port}",
        'options': (
            ("grpc.lb_policy_name", "round_robin"),  # 自动根据dns域名解析服务列表
            *options
        )
    }
    channel_credentials = create_ssl_channel_credentials()
    token_credentials = self.create_access_token_credentials()
    creds = aio.grpc.composite_channel_credentials(
        channel_credentials,
        token_credentials
    )
    channel = aio.secure_channel(**kwargs, credentials=creds)
    await channel.channel_ready()
    stub = hello_world_pb2_grpc.TestStub(channel)
    result = await stub.Hello(hello_world_pb2.User(name='world'))

Original: https://www.cnblogs.com/pungchur/p/16186686.html
Author: Nolinked
Title: 基与python的GRPC SSL/TLS加密及Token鉴权

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/499746/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球