Knative源码阅读:eventing的mt-broker-ingress源码走读 2021-07-04 Serverless 暂无评论 2702 次阅读 > 本文基于knative/eventing [v.0.23.0](https://github.com/knative/eventing/tree/v0.23.3) `mt-broker-ingress`是eventing请求的入口,主要功能是监听端口,获取事件,将事件转发给Channel。主要方法在`pkg/broker/ingress/ingress_handler.go:` ```golang func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) ``` - 首先是请求的校验: ```golang // validate request method if request.Method != http.MethodPost { h.Logger.Warn("unexpected request method", zap.String("method", request.Method)) writer.WriteHeader(http.StatusMethodNotAllowed) return } // validate request URI if request.RequestURI == "/" { writer.WriteHeader(http.StatusNotFound) return } ``` - 从请求的url中获取Broker的信息 ```golang nsBrokerName := strings.Split(request.RequestURI, "/") if len(nsBrokerName) != 3 { h.Logger.Info("Malformed uri", zap.String("URI", request.RequestURI)) writer.WriteHeader(http.StatusBadRequest) return } ``` - 将req转为message,再将message转为Cloud Event格式的event并校验 ```golang ctx := request.Context() message := cehttp.NewMessageFromHttpRequest(request) defer message.Finish(nil) event, err := binding.ToEvent(ctx, message) if err != nil { h.Logger.Warn("failed to extract event from request", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) return } // run validation for the extracted event validationErr := event.Validate() if validationErr != nil { h.Logger.Warn("failed to validate extractd event", zap.Error(validationErr)) writer.WriteHeader(http.StatusBadRequest) return } ``` - 打印trace信息 ```golang ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName)) defer span.End() if span.IsRecordingEvents() { span.AddAttributes( tracing.MessagingSystemAttribute, tracing.MessagingProtocolHTTP, tracing.BrokerMessagingDestinationAttribute(brokerNamespacedName), tracing.MessagingMessageIDAttribute(event.ID()), ) span.AddAttributes(opencensusclient.EventTraceAttributes(event)...) } ``` - 从Broker的`status.annotations.knative.dev/channelAddress`中获取Channel地址,将event发送给Channel ```golang statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName) ``` 打赏: 微信, 支付宝 标签: Knative 本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。本站myrat.top所有文章均为原创,转载请注明出处。