Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitPubSub Queue Connection Closes Unexpectedly with FailedPrecondition Error #3388

Open
omerfruk opened this issue Mar 1, 2024 · 7 comments
Labels
needs info Further discussion or clarification is necessary

Comments

@omerfruk
Copy link

omerfruk commented Mar 1, 2024

I've encountered an issue with the gocloud.dev/pubsub/rabbitpubsub project where the queue connection closes unexpectedly after a certain period. Despite the establishChannel function's logic to reopen a channel if it exists and is closed, the issue persists with the following error message:
error=pubsub (code=FailedPrecondition): Exception (504) Reason: "channel/connection is not open"

version: v1.8.1

I've reviewed the documentation and source code but haven't found a clear resolution to this issue. It seems like the connection/channel should be re-established by the establishChannel function if it detects that it's closed, but this doesn't appear to be happening in my case.

I am seeking guidance on how to properly handle this situation or if there's a potential bug in the establishChannel logic that might not be handling certain conditions correctly.

func (s *QueueService) Connect(uri string, queueNames []string) error {
	var err error
	s.rabbitConn, err = amqp091.Dial(uri) // "amqp://guest:guest@localhost:5672/"
	if err != nil {
		return err
	}
	ch, err := s.rabbitConn.Channel()
	if err != nil {
		return errors.Wrap(err, "rabbitConn.Channel")
	}
	defer ch.Close()

	for _, name := range queueNames {
		err = ch.ExchangeDeclare(name, "fanout", true, false, false, false, nil)
		if err != nil {
			return errors.Wrap(err, "ch.ExchangeDeclare")
		}

		_, err = ch.QueueDeclare(name, true, false, false, false, nil)
		if err != nil {
			return errors.Wrap(err, "ch.QueueDeclare")
		}

		err = ch.QueueBind(name, "", name, false, nil)
		if err != nil {
			return errors.Wrap(err, "ch.QueueBind")
		}
	}
	return err
}

Thank you for your assistance.

@vangent
Copy link
Contributor

vangent commented Mar 1, 2024

Can you clarify what you mean by "the issue persists"?

From looking at the code, it looks like we "refresh" the channel before using it, but there are definitely cases where the channel could close after that check, and you'd get an error from that operation (e.g., Send or Receive). But, retrying should end up calling establishChannel again, which should reset things unless your actual server is down.

Overall, you're likely going to have to debug this a bit on your own to figure out where the error is being generated. E.g., you could use https://github.com/rogpeppe/gohack to add some logging/printing to the existing code to see when establishChannel is being called, when the error is returned, and what happens when you retry.

HTH!

@vangent vangent added the needs info Further discussion or clarification is necessary label Mar 1, 2024
@omerfruk
Copy link
Author

omerfruk commented Mar 3, 2024

This is what I mean by the error persists: My code works on my server, but when the channel is not used for a certain period of time (about 1-2 weeks), when I want to send something to the channel again, I get ,

"error=pubsub (code=FailedPrecondition): Exception (504) Reason: "channel/connection is not open".

restarting the app fixes the error. But when it is not used again, it repeats the same bug.

@peczenyj
Copy link
Contributor

peczenyj commented Jun 1, 2024

Hello

I had similar issues using the rabbitmq driver directly and one way to emulate this is connect to a remote server and cut the network.

I find a workaround that is force reopen the connection in such cases.

There are some retriable amqp errors but this one is not. We need to reopen the channel.

However I am not sure that the pubsub driver correctly wrap the original error. That is something to check. Maybe ErrorAs should be able to return the original error and you can decide if you want to close/ open the subscription again.

Or perhaps you should always try to reopen the subscription in case of an error.

But I am not sure if the driver should detect and reopen the channel. What is the behavior of the other pubsub drivers?

To give some context: we use lib called Cony and it works nicely but this issue with closed channels puzzles us for a while. And Cony is quite old, probably abandoned. Thats why I pushed two PRs to this project: to substitute Cony.

But if somehow the driver can detect and restart the channel in such cases that will be wonderful and maybe I can try to send another PR

@omerfruk
Copy link
Author

omerfruk commented Jun 1, 2024

As a workaround, I also try to open a connection request again if the connection is closed and call it as recursive. But it's not the right method, it can give us a serious headache.

func (s *QueueService) Send(ctx context.Context, queueName string, data any, requestID string) error {
	body, err := json.Marshal(&data)
	if err != nil {
		return err
	}

	t := rabbitpubsub.OpenTopic(s.rabbitConn, queueName, nil)

	err = t.Send(ctx, &pubsub.Message{
		Body: body,
		Metadata: map[string]string{
			app.RequestIDKey: requestID,
		},
	})
	if err == nil {
		return err
	}
	if !s.rabbitConn.IsClosed() {
		return err
	}
	err = s.Connect(s.rabbitUri, []string{queueName})
	if err != nil {
		return err
	}
	return s.Send(ctx, queueName, data, requestID)
}

We need to figure out why this is happening. When we don't write data to the queue for a long time, we can't write anything to the queue again. in the logs I couldn't catch the cause of the error

@peczenyj
Copy link
Contributor

peczenyj commented Jun 1, 2024

Hey @omerfruk

Thanks for your reply

In my experience with rabbitmq, this kind of error happens on few cases:

  1. network issue
  2. cluster rebalancing (and this can be related to the previous case)
  3. unknown and randomic cases related to something on rabbitmq itself but we did not understand yet (seems your case).

In your case I would try

func (o *object) Send(ctx context.Context, foo any) error {
   var topic pubsub.Topic
   for {
     if err := ctx.Err(); err != nil { return err }
     if topic == nil { topic = OpenTopic(...) } 

     if err := topic.Send(ctx, ...); err != nil {
       // check error
       // log error
       topic.Shutdown(ctx)
       topic = nil
       continue
     }

     return nil
   }
}

I wrote a POC and it works

One thing that you can check is the error

		if err != nil {
			temp := false

			var terr interface { 
				Temporary() bool
			}

			if errors.As(err, &terr) {
				temp = terr.Temporary()
			}
                        if !temp { 
                          // shutdown topic and set to nil to be reopen
                        }
...

@omerfruk
Copy link
Author

omerfruk commented Jun 1, 2024

Hi @peczenyj

Thank you for your advice, I will try as you say.

@vangent
Copy link
Contributor

vangent commented Jun 2, 2024

Sadly, the amqp package is not very easy to use. Their sample code for how to use the package (https://pkg.go.dev/github.com/rabbitmq/amqp091-go#hdr-Best_practises_for_Connection_and_Channel_notifications_) includes hundreds of lines of code that uses goroutines etc. :-(.

I believe that GoCDK handles recreating channels correctly, but given that the constructor takes a *amqp.Connection, it can't recreate that. You can try following their documentation to set up a channel that will be notified when the connection is closed, and recreate it (and also the Go CDK Topic/Subscripton at the same time).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Further discussion or clarification is necessary
Projects
None yet
Development

No branches or pull requests

3 participants