Linx's Solution

.net-grpc-android.md

Android 连接 .Net 的gRPC服务

.Net开启ssl gRPC

webBuilder.UseKestrel().ConfigureKestrel(e =>
{
    e.Listen(IPAddress.Any, 23333, listenConfigure =>
    {
        listenConfigure.Protocols = HttpProtocols.Http2;
        listenConfigure.UseHttps(new X509Certificate2("1.pfx","123456"));
    });
});

Android 使用ssl

val caKeyStore: KeyStore = KeyStore.getInstance("PKCS12", "BC").apply {
            load(assets.open("1.pfx"), "123456".toCharArray())
            //setCertificateEntry("server", serverCertificate)
        }

        val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()).apply {
            init(caKeyStore)
        }

        val sslContext = SSLContext.getInstance("TLS").apply {
            init(null, trustManagerFactory.trustManagers, null)
        }
        
        managedChannel =
            OkHttpChannelBuilder.forAddress("192.168.3.233", 23333)
                    .hostnameVerifier { hostname, session ->
                        true
                    }
                    .sslSocketFactory(sslContext.socketFactory)
                    .keepAliveTime(10, TimeUnit.SECONDS)
                    .useTransportSecurity()
                    .keepAliveWithoutCalls(true)
                    .build()

安卓项目配置

ext.kotlin_version = '1.5.30'
ext.grpc_version = '1.40.0'


maven { url 'https://maven.aliyun.com/repository/public/'}
maven { url "https://jitpack.io" }
maven { url 'http://maven.aliyun.com/nexus/content/repositories/releases/'}
google()
mavenCentral()

classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17'
apply plugin: 'com.google.protobuf'

android {
    sourceSets {
        main {
            java {
                srcDir 'src/main/java'
            }
            proto  {
                srcDir 'src/main/java/com/example/caller/proto' //指定proto文件位置
                include '**/*.proto'
            }
        }
    }
    lintOptions {
        // Do not complain about outdated deps, so that this can javax.annotation-api can be same
        // as other projects in this repo. Your project is not required to do this, and can
        // upgrade the dep.
        disable 'GradleDependency'
        // The Android linter does not correctly detect resources used in Kotlin.
        // See:
        //   - https://youtrack.jetbrains.com/issue/KT-7729
        //   - https://youtrack.jetbrains.com/issue/KT-12499
        disable 'UnusedResources'
        textReport true
        textOutput "stdout"
    }
}
protobuf {
    protoc { artifact = 'com.google.protobuf:protoc:3.17.3' }
    plugins {
        javalite { artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0" }
        grpc { artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version" // CURRENT_GRPC_VERSION
        }
    }
    generateProtoTasks {
        all().each { task ->
            task.builtins {
                java { option 'lite' }
            }
            task.plugins {
                grpc { option 'lite' }
            }
        }
    }
}
dependencies {
    implementation "io.grpc:grpc-okhttp:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "io.grpc:grpc-protobuf-lite:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "io.grpc:grpc-stub:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "javax.annotation:javax.annotation-api:1.3.2"
}

proto文件放置位置

srcDir所指定的位置即src/main/java/me/example/packagename/proto,我这里是放在当前包名下的proto目录下

proto生成的Stub文件的包名

在proto文件中增加一行option java_package="com.example.caller";即可。com.example.caller 为包名.

生成

点击Android Studio顶部的build即可生成,即使有报错(依赖或代码错误),stub文件也是可以生成的,之后可以到generated目录下查看文件

AndroidChannelBuilder

Android上建议使用这个来作为channel,可以检测到网络连接的变化,代码如下

package com.example.caller.grpc;

import android.annotation.TargetApi;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.net.Network;
import android.os.Build;
import android.util.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.internal.GrpcUtil;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
 * Builds a {@link ManagedChannel} that, when provided with a {@link Context}, will automatically
 * monitor the Android device's network state to smoothly handle intermittent network failures.
 *
 * <p>Currently only compatible with gRPC's OkHttp transport, which must be available at runtime.
 *
 * <p>Requires the Android ACCESS_NETWORK_STATE permission.
 *
 * @since 1.12.0
 */
public final class AndroidChannelBuilder extends ForwardingChannelBuilder<AndroidChannelBuilder> {

    private static final String LOG_TAG = "AndroidChannelBuilder";

    @Nullable
    private static final Class<?> OKHTTP_CHANNEL_BUILDER_CLASS = findOkHttp();

    private static Class<?> findOkHttp() {
        try {
            return Class.forName("io.grpc.okhttp.OkHttpChannelBuilder");
        } catch (ClassNotFoundException e) {
            return null;
        }
    }

    private final ManagedChannelBuilder<?> delegateBuilder;

    @Nullable private Context context;

    /**
     * Creates a new builder with the given target string that will be resolved by
     * {@link io.grpc.NameResolver}.
     */
    public static AndroidChannelBuilder forTarget(String target) {
        return new AndroidChannelBuilder(target);
    }

    /**
     * Creates a new builder with the given host and port.
     */
    public static AndroidChannelBuilder forAddress(String name, int port) {
        return forTarget(GrpcUtil.authorityFromHostAndPort(name, port));
    }

    /**
     * Creates a new builder, which delegates to the given ManagedChannelBuilder.
     *
     * @deprecated Use {@link #usingBuilder(ManagedChannelBuilder)} instead.
     */
    @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6043")
    @Deprecated
    public static AndroidChannelBuilder fromBuilder(ManagedChannelBuilder<?> builder) {
        return usingBuilder(builder);
    }

    /**
     * Creates a new builder, which delegates to the given ManagedChannelBuilder.
     *
     * <p>The provided {@code builder} becomes "owned" by AndroidChannelBuilder. The caller should
     * not modify the provided builder and AndroidChannelBuilder may modify it. That implies reusing
     * the provided builder to build another channel may result with unexpected configurations. That
     * usage should be discouraged.
     *
     * @since 1.24.0
     */
    public static AndroidChannelBuilder usingBuilder(ManagedChannelBuilder<?> builder) {
        return new AndroidChannelBuilder(builder);
    }

    private AndroidChannelBuilder(String target) {
        if (OKHTTP_CHANNEL_BUILDER_CLASS == null) {
            throw new UnsupportedOperationException("No ManagedChannelBuilder found on the classpath");
        }
        try {
            delegateBuilder =
                    (ManagedChannelBuilder)
                            OKHTTP_CHANNEL_BUILDER_CLASS
                                    .getMethod("forTarget", String.class)
                                    .invoke(null, target);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create ManagedChannelBuilder", e);
        }
    }

    private AndroidChannelBuilder(ManagedChannelBuilder<?> delegateBuilder) {
        this.delegateBuilder = Preconditions.checkNotNull(delegateBuilder, "delegateBuilder");
    }

    /**
     * Enables automatic monitoring of the device's network state.
     */
    public AndroidChannelBuilder context(Context context) {
        this.context = context;
        return this;
    }

    @Override
    protected ManagedChannelBuilder<?> delegate() {
        return delegateBuilder;
    }

    /**
     * Builds a channel with current configurations.
     */
    @Override
    public ManagedChannel build() {
        return new AndroidChannel(delegateBuilder.build(), context);
    }

    /**
     * Wraps an OkHttp channel and handles invoking the appropriate methods (e.g., {@link
     * ManagedChannel#enterIdle) when the device network state changes.
     */
    @VisibleForTesting
    static final class AndroidChannel extends ManagedChannel {

        private final ManagedChannel delegate;

        @Nullable private final Context context;
        @Nullable private final ConnectivityManager connectivityManager;

        private final Object lock = new Object();

        @GuardedBy("lock")
        private Runnable unregisterRunnable;

        @VisibleForTesting
        AndroidChannel(final ManagedChannel delegate, @Nullable Context context) {
            this.delegate = delegate;
            this.context = context;

            if (context != null) {
                connectivityManager =
                        (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
                try {
                    configureNetworkMonitoring();
                } catch (SecurityException e) {
                    Log.w(
                            LOG_TAG,
                            "Failed to configure network monitoring. Does app have ACCESS_NETWORK_STATE"
                                    + " permission?",
                            e);
                }
            } else {
                connectivityManager = null;
            }
        }

        @GuardedBy("lock")
        private void configureNetworkMonitoring() {
            // Android N added the registerDefaultNetworkCallback API to listen to changes in the device's
            // default network. For earlier Android API levels, use the BroadcastReceiver API.
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N && connectivityManager != null) {
                final DefaultNetworkCallback defaultNetworkCallback = new DefaultNetworkCallback();
                connectivityManager.registerDefaultNetworkCallback(defaultNetworkCallback);
                unregisterRunnable =
                        new Runnable() {
                            @TargetApi(Build.VERSION_CODES.LOLLIPOP)
                            @Override
                            public void run() {
                                connectivityManager.unregisterNetworkCallback(defaultNetworkCallback);
                            }
                        };
            } else {
                final NetworkReceiver networkReceiver = new NetworkReceiver();
                @SuppressWarnings("deprecation")
                IntentFilter networkIntentFilter =
                        new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
                context.registerReceiver(networkReceiver, networkIntentFilter);
                unregisterRunnable =
                        new Runnable() {
                            @TargetApi(Build.VERSION_CODES.LOLLIPOP)
                            @Override
                            public void run() {
                                context.unregisterReceiver(networkReceiver);
                            }
                        };
            }
        }

        private void unregisterNetworkListener() {
            synchronized (lock) {
                if (unregisterRunnable != null) {
                    unregisterRunnable.run();
                    unregisterRunnable = null;
                }
            }
        }

        @Override
        public ManagedChannel shutdown() {
            unregisterNetworkListener();
            return delegate.shutdown();
        }

        @Override
        public boolean isShutdown() {
            return delegate.isShutdown();
        }

        @Override
        public boolean isTerminated() {
            return delegate.isTerminated();
        }

        @Override
        public ManagedChannel shutdownNow() {
            unregisterNetworkListener();
            return delegate.shutdownNow();
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return delegate.awaitTermination(timeout, unit);
        }

        @Override
        public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
                MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
            return delegate.newCall(methodDescriptor, callOptions);
        }

        @Override
        public String authority() {
            return delegate.authority();
        }

        @Override
        public ConnectivityState getState(boolean requestConnection) {
            return delegate.getState(requestConnection);
        }

        @Override
        public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
            delegate.notifyWhenStateChanged(source, callback);
        }

        @Override
        public void resetConnectBackoff() {
            delegate.resetConnectBackoff();
        }

        @Override
        public void enterIdle() {
            delegate.enterIdle();
        }

        /** Respond to changes in the default network. Only used on API levels 24+. */
        @TargetApi(Build.VERSION_CODES.N)
        private class DefaultNetworkCallback extends ConnectivityManager.NetworkCallback {
            @Override
            public void onAvailable(Network network) {
                delegate.enterIdle();
            }
        }

        /** Respond to network changes. Only used on API levels < 24. */
        private class NetworkReceiver extends BroadcastReceiver {
            private boolean isConnected = false;

            @SuppressWarnings("deprecation")
            @Override
            public void onReceive(Context context, Intent intent) {
                ConnectivityManager conn =
                        (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
                android.net.NetworkInfo networkInfo = conn.getActiveNetworkInfo();
                boolean wasConnected = isConnected;
                isConnected = networkInfo != null && networkInfo.isConnected();
                if (isConnected && !wasConnected) {
                    delegate.enterIdle();
                }
            }
        }
    }
}

管理Channel

这里整了个单例

import com.qijin.evaluating.grpc.AndroidChannelBuilder
import io.grpc.ManagedChannel
import java.util.concurrent.TimeUnit

class GrpcService private constructor(){
    companion object {
        val instance: GrpcService by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { GrpcService() }
    }
    var channel: ManagedChannel?=null
    var _serverIP:String = ""

    fun makeChannel(serverIP:String) {
        _serverIP = serverIP
        if (channel==null) {
            createChannel()
        } else {
            channel = null
            createChannel()
        }
    }
    private fun createChannel() {
        var serverIP = ""
        var port = 23333
        if (_serverIP.contains(":")) {
            serverIP = _serverIP.split(":")[0]
            port = _serverIP.split(":")[1].toInt()
        } else {
            serverIP = _serverIP
        }
        channel = AndroidChannelBuilder.forAddress(serverIP,port)
            .usePlaintext()
            .context(App.instance)
            .keepAliveTimeout(5, TimeUnit.SECONDS)
            .keepAliveTime(1, TimeUnit.SECONDS)
            .keepAliveWithoutCalls(true)
            .build()
    }

    fun getMyChannel(forceRecreate:Boolean=false):ManagedChannel {
        if (forceRecreate) {
            channel=null
        }
        if (channel==null && _serverIP!="") {
            createChannel()
        }
        return channel!!
    }
}

只要在开始时makeChannel传入服务器ip和端口即可,只传ip则使用默认端口

getMyChannel 的 forceRecreate 参数用于在连接断开后的重新创建连接, 这样比较暴力,不过可以达到目的,关于此,详细看下面的内容

调用示例

异步调用

PS: 1. StreamObserver的三个方法都是线程中的方法,因此在方法内执行ui相关的代码时需要 runOnUiThread 2. 同时onNext中发生的异常会到onError中抛出,而正常情况下,onError中我们只希望捕获到接口返回的异常,因此onNext请妥善处理异常情况。否则onNext的异常会导致channel的连接诡异断开(在streams部分解释为什么)

val client = EvaluateDeviceGrpc.newStub(GrpcService.instance.getMyChannel())
val req = EvaluateDeviceOuterClass.GetUserInfoRequest.newBuilder()
.setEveluteDeviceID(evaluateId)
.setUserNo(et_no.text.toString())
.build()

client.getUserInfo(req, object :StreamObserver<EvaluateDeviceOuterClass.UserResponse> {
    override fun onNext(value: EvaluateDeviceOuterClass.UserResponse?) {

    }
    override fun onError(t: Throwable?) {

    }
    override fun onCompleted() {

    }
}

streams订阅连接

private fun startConfigSub(force: Boolean = false) {
    val c = DeviceInfoServiceGrpc.newStub(GrpcService.instance.getMyChannel(force))
            val req = DeviceInfo.DeviceConfigRequest.newBuilder().setDeviceType("P")
                .setMac(DeviceUtils.getMacAddress()).build()
    c.deviceConfig(req, object : StreamObserver<DeviceInfo.DeviceConfigResponse> {
                override fun onNext(value: DeviceInfo.DeviceConfigResponse?) {
                    
                }

                override fun onError(t: Throwable?) {
                    Log.d("WorkFragment", "报错重连")
                    if (t?.message?.contains("Keepalive") == true) {
                        startConfigSub(true)
                    } else {
                        startConfigSub()
                    }
                }

                override fun onCompleted() {
                    
                }

            })

}

streams接口在连接不正常时不会通过keepalive而重新进行调用, 因此需要在发生特定异常时,重新调用方法, 并且需要使channel重新创建,即force传true。 如果异常不是连接类异常(业务异常),则无需重新创建channel直接重新调用即可。

同步调用

fun deviceReg(): Boolean {
        try {
            val c = DeviceInfoServiceGrpc.newBlockingStub(GrpcService.instance.getMyChannel())
            val req = DeviceInfo.DeviceInfoRegistRequest.newBuilder()
                .setDeviceType("P")
                .setMac(DeviceUtils.getMacAddress())
                .setDeviceIP(NetworkUtils.getIPAddress(true))
                .setHostName(DeviceUtils.getAndroidID())
                .setVersion(App.instance.getAppVersionName())
                .setDeviceID(DeviceUtils.getAndroidID())
                .setDesc("请修改此字段描述设备位置")
                .setDeviceName(DeviceUtils.getManufacturer() + "-" + DeviceUtils.getModel())
                .build()
            val resp = c.deviceRegist(req)
            Log.d("WorkFragment", resp.toString())
            return resp.code == "SUCCESS"
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
            return false
        }

    }

.Net 维护streams长连接

// 维护设备连接
private static readonly ConcurrentDictionary<DeviceConfigRequest, IServerStreamWriter<DeviceConfigResponse>> _subscriptions =
            new ConcurrentDictionary<DeviceConfigRequest, IServerStreamWriter<DeviceConfigResponse>>();

public override async Task DeviceConfig(DeviceConfigRequest request, IServerStreamWriter<DeviceConfigResponse> responseStream, ServerCallContext context)
        {
            _logger.LogWarning($"{context.Peer} DeviceConfig");
            if (!_subscriptions.TryAdd(request,responseStream))
            {
                _logger.LogWarning("加入连接管理失败, 连接已存在");
                //连接存在说明 此设备是第二次连接过来了, 此时可以清除掉旧的连接, 添加新的连接
                //管理的连接 需要永远以最新的为准
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 清理旧连接");
                _subscriptions.TryRemove(request, out IServerStreamWriter<DeviceConfigResponse> value1);
                _subscriptions.TryAdd(request, responseStream);
            } else
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 已连接, 首次下发配置");
                var config = getDeviceConfig(request.DeviceType, request.Mac);
                await responseStream.WriteAsync(config);
            }
            
            
            var count = 0;
            // Keep the stream open so we can continue writing new Messages as they are pushed
            while (!context.CancellationToken.IsCancellationRequested)
            {
                
                count++;
                // 当连接没断开时, 每1秒发送当前秒级时间
                if (count%10==0)
                {
                    await responseStream.WriteAsync(new DeviceConfigResponse()
                    {
                        Time = DateTime.Now.ToUnix2(),
                        Config = "",
                    });
                }
                if (count%100==0)
                {
                    count = 0;
                    // 当连接没断开时, 每x (10) 秒更新心跳数据
                    UpdateHeartData(request.DeviceType, request.Mac);
                }
                
                // Avoid pegging CPU
                await Task.Delay(100); //这里短一点可以让连接更快断开
            }
            // Cancellation was requested, remove the stream from stream map
            _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 连接断开, 正在清理");
            _subscriptions.TryGetValue(request, out IServerStreamWriter<DeviceConfigResponse> value);
            if (value!=null && value.Equals(responseStream))
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 原始连接, 正在清理");
                _subscriptions.TryRemove(request, out IServerStreamWriter<DeviceConfigResponse> _);
            } else
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 已是新连接, 不清理 {_subscriptions.Count}");
            }
        }

这里实现了一个设备只有一个连接的逻辑,DeviceConfigRequest 作为_subscriptions的key,只有mac地址和设备类型,也就是一个mac地址只能有一个连接在维持

  • 当设备主动断开连接(正常退出等),则跳出while循环到原始连接, 正在清理部分
  • 当设备异常断开,诸如网络问题或强制关闭,则旧连接可能还没有被清理,新连接进来后将到 加入连接管理失败, 连接已存在,此时将_subscriptions里的responseStream更新一下,已是新连接, 不清理就是为了防止新连接已建立,但是旧的连接才刚退出循环的情况,此时不能将_subscriptions里唯一的连接给清理掉
  • 服务端重启,则只要保证客户端能重新连接即可

WinForm

using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace PrintNumber
{
    public class GrpcService
    {
        private GrpcService() { }
        private static GrpcService _instance = null;
        public static GrpcService Instance
        {
            get {
                if (_instance == null)
                {
                    _instance = new GrpcService();
                }
                return _instance;
            }
        }


        private Channel channel;
        private string _serverIP = "";

        public void makeChannel(string serverIP)
        {
            _serverIP = serverIP;
            if (channel==null)
            {
                createChannel();
            }
        }

        private void createChannel()
        {
            var opts = new List<ChannelOption>();
            opts.Add(new ChannelOption("grpc.keepalive_time_ms", 1000));
            opts.Add(new ChannelOption("grpc.keepalive_timeout_ms", 5000));
            opts.Add(new ChannelOption("grpc.keepalive_permit_without_calls", 1));
            channel = new Channel($"{_serverIP}:23333", ChannelCredentials.Insecure,opts);
        }

        public Channel getMyChannel(bool forceRecreate = false)
        {
            if (forceRecreate)
            {
                channel = null;
            }
            if (channel==null && _serverIP!="")
            {
                createChannel();
            }
            return channel;

        }
    }
}
// 用于程序退出后取消streams连接
static CancellationTokenSource cancelTokenSource = new CancellationTokenSource();

private async void startConfigSubAsync(CancellationTokenSource token, bool force= false)
        {
            if (token.IsCancellationRequested || !isMain)
            {
                return;
            }
            var aClient = new DeviceInfoServiceClient(GrpcService.Instance.getMyChannel(force));
            var req = new DeviceConfigRequest();
            req.DeviceType = "Q";
            req.Mac = NetHelper.GetMacByNetworkInterface();
            using (var client = aClient.DeviceConfig(req,null,null, cancelTokenSource.Token))
            {
                try
                {
                    while (!token.IsCancellationRequested && await client.ResponseStream.MoveNext())
                    {
                        var result = client.ResponseStream.Current;
                        ///。。。
                    }
                }
                catch (RpcException e)
                {
                    System.Diagnostics.Debug.WriteLine("异常:" + e.Message);
                    if (e.StatusCode==StatusCode.Cancelled)
                    {
                        // 触发取消时, 需要退出否则容易引发异常
                        return;
                    } else
                    {
                        // 其他异常则重连
                        System.Diagnostics.Debug.WriteLine("重连接:" + e.Message);
                        startConfigSubAsync(cancelTokenSource,true);
                    }
                    

                }
                catch  (Exception e)
                {
                    //非RPC异常 则重连接
                    System.Diagnostics.Debug.WriteLine("异常:" + e.Message);
                    startConfigSubAsync(cancelTokenSource, true);
                }
                
            }


            //startConfigSubAsync(cancelTokenSource);


        }

// 程序关闭时需要清理streams连接,否则会抛出异常
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (isMain)
            {
                cancelTokenSource.Cancel();
                GrpcEnvironment.KillServersAsync();
            }
            

        }