Giter VIP home page Giter VIP logo

Comments (21)

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

CAP发现ProjectA没有订阅这个消息的方法就会报异常

CAP启动的时候会检查订阅的方法然后才订阅相关的消息的,如果你没有任何订阅方法的话,应该是什么消息都收不到的。你现在出现的这个异常是由于收到了消息但是没有找到要执行的方法。

你需要给我提供更多信息,我可能才能定位到问题。

Project A 和 Project B 是一个web项目还是两个?

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

两个独立的项目来的,
Project A 只做了发布,没有任何订阅方法:

public Task<bool> SendAccountPeriodMQMessageAsync()
        {
            for (var i = 0; i < 10; i++)
            {
                //_capBus.PublishAsync("sample.finance.mac.acc.detail", DateTime.Now);
                _capBus.Publish("finance.mac_account.detail", i);
            }

            return Task.FromResult(true);
        }

Project B是另外一个项目,写了个订阅的方法:

public class SubscriberService : ISubscriberService, ICapSubscribe
    {
        [CapSubscribe("finance.mac_account.detail", Group = "group1")]
        public void CheckReceivedMessage(int time)
        {
            Console.WriteLine($"1.Subscribe , sent time: {time.ToString()}");
        }
    }

然后同时启动了Project A和Project B窗口,不知道是否描述清楚了?

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

Project A 和 Project B 是连的同一个数据库吗?

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

是同一个数据库

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

多实例连接同一个数据库目前还没有经过很多测试,不过理论上是可能会出现你的这个问题的,这个可能是由于两个实例并发读相同的表引起的,稍后我会尝试修复一下。

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

@ajdwfnhaps 你是用的MySql吧? MySql 对于对于队列表支持很差,你可以尝试一下其他数据库试试,比如Sql Server 或者 PostgreSql 。CAP 2.1 的 MySql扩展 还有一个问题bug,稍后我会修复这个问题提交一个 2.1.1-preview 版本,你也可以等修复过后再次尝试一下。

等发布 2.1.1 预览版,我会在这个 issue 通知你。

谢谢。

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

@yuleyule66 好的,谢谢,我尝试使用每个项目一个cap数据库的方式试一下,如果我Project A和Project B都是独立的数据库,但是连同一个RabbitMQ实例,是否可以在Project A只写发布消息的方法,全部订阅方法都写在Project B里,这时候Project A不知道还会不会报这个问题“can not be found subscriber method.”?

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

这个时候肯定不会的。

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

非常感谢,现在分开了两个数据库就一切正常了!!

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

还有一个问题想请教一下,现在是Project A和Project B是独立的cap数据库,我这个Project B的项目会部署在不同的服务器上,分布式消费Project A发布的消息,但同时Project B也会发布消息到MQ,订阅的方法会写在Project A上,如果是这样的架构,Project B在不同服务器上的部署实例都使用同一个cap数据库可行吗?会不会有什么冲突呢?

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

CAP 是要解决这种多个实例对应一个数据库的冲突问题的,只是我最近有点忙,目前在出差,还没有太多时间来测试

from cap.

ajdwfnhaps avatar ajdwfnhaps commented on July 21, 2024

太好了,坐等不断完善!!!

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024

@ajdwfnhaps 遇到的问题我也遇到了(因为我也用的同一数据库),不同的是我用的MsSql。后来按照讨论内容,B项目使用另外的数据库(同一主机),却并未解决 can not be found subscriber method 的问题。
现象是,刚刚分别启动A,B项目时,A项目发布的第一条消息B项目能够成功消费,分别检查对应的数据库也一切正常,但,A项目发布的第二条消息,报错信息如下:

fail: DotNetCore.CAP.Internal.DefaultSubscriberExecutor[5]
      Consumer method 'Group:cap.default.group, Topic:CapDemo' failed to execute.
System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
   at System.ThrowHelper.ThrowArgumentOutOfRange_IndexException()
   at System.Collections.Generic.List`1.get_Item(Int32 index)
   at DotNetCore.CAP.Internal.DefaultSubscriberExecutor.<ExecuteAsync>d__7.MoveNext() in G:\codes\CAP\src\DotNetCore.CAP\Internal\ISubscriberExecutor.Default.cs:line 45
warn: DotNetCore.CAP.SubscribeQueueExecutor[2]
      Job failed to execute. Will retry.
System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
   at System.ThrowHelper.ThrowArgumentOutOfRange_IndexException()
   at System.Collections.Generic.List`1.get_Item(Int32 index)
   at DotNetCore.CAP.Internal.DefaultSubscriberExecutor.<ExecuteAsync>d__7.MoveNext() in G:\codes\CAP\src\DotNetCore.CAP\Internal\ISubscriberExecutor.Default.cs:line 45

继续发布消息,发现其报错是隔一条消息报一次,项目B也是隔一条消息成功一次(未成功的在项目A报错了),无论先启动项目A还是项目B,都是间隔一条消息报错。

数据库是SqlServer,且项目A和项目B分别用的不同的数据库(同一主机的不同数据库)


源码部分正在看

  • 是不是需要判断一下有没有routing的消费者(目前是按照Queue.Name来判断的<cap.default.group>,且没有cap.default.group消费者会抛异常),现在看来是因为没有CapDemo的消费者导致的异常
  • 是不是应该允许服务实例不去订阅消息(另外的服务实例去订阅)?

我现在正在看代码,想找到“没有CapDemo的订阅为什么消息会存入到项目A的Received表中”的原因

谢谢!

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024

补充:
发布消息时,使用如下形式:

  capPublisher.Publish("", inserted_person, sqlTrans);

接收消息时,使用:

        [CapSubscribe(name: "", Group = RabbitMQOptions.DefaultExchangeName)]
        public void ReceiveMessage(Person person)
        {

        }

不再报错了

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

@zanpen2000 根据你所提供的描述信息,应该不会出现间隔消费以及数组越界的这种问题,请额外提供以下信息
1、使用的CAP 版本?
2、Project A 和 Project B 的 Startup.cs 的配置信息
3、Project A 和 Project B 的方法订阅列表,我需要确实是否有订阅相同Topic。
4、发送示例代码

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024
  1. CAP,CAP.RabbitMQ, 以及CAP.SqlServer的版本均为2.1.0
  2. 配置信息如下:
namespace DemoA
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddCap(x =>
            {
                x.UseRabbitMQ(opt =>
                {
                    opt.HostName = "192.168.0.89";
                    opt.UserName = "admin";
                    opt.Password = "admin";
                });
                x.UseSqlServer("Data Source=192.168.0.250;Initial Catalog=CapDemo;User ID=sa;Password=123123;");
            });

            services.AddMvc();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseCap();
            app.UseMvc();
        }
    }
}
namespace DemoB
{
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddCap(x=> {
                x.UseRabbitMQ(opt=> {
                    opt.HostName = "192.168.0.89";
                    opt.UserName = "admin";
                    opt.Password = "admin";
                });
                x.UseSqlServer("Data Source=192.168.0.250;Initial Catalog=cap_trans_test;User ID=sa;Password=123123;");
            });
            services.AddMvc();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseMvc();
            app.UseCap();
        }
    }
}
  1. ProjectA和ProjectB的方法订阅列表

ProjectA的Controller

namespace DemoA.Controllers
{
    [Route("api/demoa/[controller]")]
    public class PersonController : Controller
    {
        private ICapPublisher capPublisher;

        public PersonController(ICapPublisher _publisher)
        {
            this.capPublisher = _publisher;
        }

        [HttpPost]
        public JsonResult Post()
        {
            var sqlOptions = (SqlServerOptions)HttpContext.RequestServices.GetService(typeof(SqlServerOptions));
            var cstr = sqlOptions.ConnectionString;
            using (var sqlconnection = new SqlConnection(cstr))
            {
                sqlconnection.Open();
                using (var sqlTrans = sqlconnection.BeginTransaction())
                {
                    try
                    {
                        List<Person> persons = new List<Person>();
                        for (int i = 0; i < 10000; i++)
                        {
                            var person = new Person { Name = $"Name_{i.ToString()}", Age = i };
                            var inserted_person = PersonInsert(sqlTrans, person);
                            capPublisher.Publish("", inserted_person, sqlTrans);
                            persons.Add(inserted_person);
                            Console.WriteLine(inserted_person);
                        }
                        sqlTrans.Commit();
                        return Json(persons);
                    }
                    catch (Exception ex)
                    {
                        sqlTrans.Rollback();
                        return Json(ex.Message);
                    }
                }
            }
        }

        /// <summary>
        /// 本地业务逻辑
        /// </summary>
        /// <param name="trans"></param>
        /// <param name="person"></param>
        /// <returns></returns>
        [NonAction]
        private Person PersonInsert(SqlTransaction trans, Person person)
        {
            var sqlconnection = trans.Connection;
            var sql = $"insert into person (Name, Age) output inserted.id, inserted.name, inserted.age values ('{person.Name}', {person.Age})";
            var result = sqlconnection.QueryFirstOrDefault<Person>(sql: sql, transaction: trans);
            return result;
        }

        //没有用到
        [NonAction]
        [CapSubscribe("RemovePerson")]
        public void RemovePerson(Person person)
        {
            var cstr = "Data Source=192.168.0.250;Initial Catalog=CapDemo;User ID=sa;Password=123123;";
            using (var sqlconnection = new SqlConnection(cstr))
            {
                sqlconnection.Open();
                using (var trans = sqlconnection.BeginTransaction())
                {
                    try
                    {
                        int row_count = sqlconnection.Execute($"delete from Person where id={person.id}", null, trans);
                        if (row_count == 1)
                        {
                            Console.WriteLine("删除记录成功, {0}", person.ToString());
                        }
                        else
                            Console.WriteLine("删除记录失败, {0}", person.ToString());
                        trans.Commit();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        trans.Rollback();
                    }
                }
            }
        }
    }
}

ProjectB的Controller

namespace DemoB.Controers
{
    [Route("api/demob/[controller]")]
    public class GradeController : Controller
    {
        [NonAction]
        [CapSubscribe(name: "", Group = RabbitMQOptions.DefaultExchangeName)]
        public void ReceiveMessage(Person person)
        {
            Console.WriteLine("this.HttpContext =>  {0}", this.HttpContext);

            Console.WriteLine("Grade service 接收:" + person);
            var cstr = "Data Source=192.168.0.250;Initial Catalog=cap_trans_test;User ID=sa;Password=123123;";

            using (var sqlconnection = new SqlConnection(cstr))
            {
                sqlconnection.Open();
                using (var trans = sqlconnection.BeginTransaction())
                {
                    try
                    {
                        var grade = new Grade { personId = person.id, name = "新增班级" };
                        var inserted = sqlconnection.QueryFirstOrDefault<Grade>("insert into Grade(personId,Name) output inserted.id, inserted.personId,inserted.Name values(@personId,@Name)", grade, trans);
                        Console.WriteLine("Grade Inserted: {0}", grade);
                        trans.Commit();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        trans.Rollback();
                    }
                }
            }
        }
    }
}
  1. 示例代码在 https://github.com/zanpen2000/CapDemo

为了方便理解,直接引用的源码,没有从Nuget上引用包

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

@zanpen2000 我已经复现到了你的这个情况,你的这个情况是由于在使用RabbitMQ的时候以前 Queue上绑定过 Routing Key,因为 Queue是持久化的,所以之前绑定的RoutingKey不会丢失。当你减少RoutingKey的订阅以后,实际上在RabbitMQ端以前绑定的RoutingKey还存在,所以会收到以前的订阅的消息。

你可以在管理页面看到当前的Queue绑定的RoutingKey有哪些

image

解决方案 :

我尝试通过修改代码的方式来删除所有绑定的RoutingKey然后重新绑定,但是RabbitMQ没有提供相关API。
另外一种方式解绑某个具体的RoutingKey,但是我无法获取到已经绑定的RoutingKey有哪些,这个只能够通过客户端工具来获取,所以此种方案不可行。

通过仔细思考后,我认为不应该由CAP程序来做这个事情,假如有以下场景。Project A 和Project B都订阅了同一Group组的Topic,对应到RabbitMQ也就是他们监听的是同一个队列,当ProjectA由于某种原因减少Topic的订阅了,那么如果解绑了RoutingKey会导致ProejctB也无法收到相关订阅的消息,这显然是错误的行为。

所以我决定让使用者来做这个事情,也是就说如果你减少了订阅的topic,那么你需要手动去rabbitmq的控制台的queue中删除(解绑)旧的topic,那么使用者就必须要清楚消息队列的预期行为。

另外,程序报错的这个地方我会处理为一种更加优雅的提示方式,因为这对使用者来说也是一个警告!。

PS: 如果使用 Kafka , 不存在以上问题。

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024

辛苦了,一直忙到半夜。

这是我刚刚复原的错误信息

System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
   at System.ThrowHelper.ThrowArgumentOutOfRange_IndexException()
   at System.Collections.Generic.List`1.get_Item(Int32 index)
   at DotNetCore.CAP.Internal.DefaultSubscriberExecutor.<ExecuteAsync>d__7.MoveNext() in G:\codes\CAP\src\DotNetCore.CAP\Internal\ISubscriberExecutor.Default.cs:line 45
fail: DotNetCore.CAP.Internal.DefaultSubscriberExecutor[5]
      Consumer method 'Group:cap.default.group, Topic:CapPerson' failed to execute.
System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.

原因是我把ProjectA中的PersonController中的发消息的routingkey改成了 CapPerson

  capPublisher.Publish("CapPerson", inserted_person, sqlTrans);

然后在ProjectB中的GradeController中的接收Attribute 修改为

        [NonAction]
        [CapSubscribe(name: "CapPerson")]
        public void ReceiveMessage(Person person){}

注意这个routingkey 是第一次使用,不是RabbitMQ中持久化的QueueName,运行本次Demo之前,在RabbitMQ控制台删除了Queue和Exchanges

然后就复原了以上错误。

源码中可能出现问题的地方应该是在DotNetCore.Cap的DefaultSubscriberExecutor类中的ExecuteAsync方法,

var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); //这里的Name是CapPerson
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))// Group是cap.default.group
 {
         var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method.";
         throw new SubscriberNotFoundException(error);
}

// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; //这里没有获取到

// ...

谢谢!周末愉快!

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024

我通过如下修改勉强修复了这个问题:

RabbitMQConsumerClient.cs

 public IEnumerable<string> Topics { get; private set; }
...
        public void Subscribe(IEnumerable<string> topics)
        {
            if (topics == null) throw new ArgumentNullException(nameof(topics));

            this.Topics = topics; //赋值

            foreach (var topic in topics)
                _channel.QueueBind(_queueName, _exchageName, topic);
        }

IConsumerHandler.Default.cs的RegisterMessageProcessor方法中增加判断topic,未订阅的重新入队

        private void RegisterMessageProcessor(IConsumerClient client)
        {
            client.OnMessageReceived += (sender, message) =>
            {
                _logger.EnqueuingReceivedMessage(message.Name, message.Content);

                //这里重新入队
                if (!client.Topics.Contains(message.Name))
                {
                    client.Reject();
                }
                else
                {
                    using (var scope = _serviceProvider.CreateScope())
                    {
                        try
                        {
                            StoreMessage(scope, message);
                            client.Commit();
                        }
                        catch (Exception e)
                        {
                            _logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", message);
                            client.Reject();
                        }
                    }
                }
                Pulse();
            };

            client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); };
        }

Demo代码已更新到https://github.com/zanpen2000/CapDemo

另 SubscribeQueuer,PublishQueuer,DefaultDispatcher的 _pollingDelay 我改成了 TimeSpan.FromMilliseconds();,感觉消息相应的时间太长了,所以,是不是可以考虑一下我这种急性子的人呢?

from cap.

yang-xiaodong avatar yang-xiaodong commented on July 21, 2024

@zanpen2000 有关Consumer NotFoundSubscriber的这个问题在 2.1.1版本已经改进。

消息相应的时间太长

我这边测试没有出现这个问题呀,可能第一次的时候会建立连接稍微慢一点,后面再发消息都是秒级以内相应的。

from cap.

zanpen2000 avatar zanpen2000 commented on July 21, 2024

from cap.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.