/** * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to. */ val rpcEnv: RpcEnv
/** * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is * called. And `self` will become `null` when `onStop` is called. * * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called. */ finaldefself: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }
/** * Process messages from `RpcEndpointRef.send` or `RpcCallContext.reply`. If receiving a * unmatched message, `SparkException` will be thrown and sent to `onError`. */ defreceive: PartialFunction[Any, Unit] = { case _ => thrownewSparkException(self + " does not implement 'receive'") }
/** * Process messages from `RpcEndpointRef.ask`. If receiving a unmatched message, * `SparkException` will be thrown and sent to `onError`. */ defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(newSparkException(self + " won't reply anything")) }
/** * Invoked when any exception is thrown during handling messages. */ defonError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause }
/** * Invoked when `remoteAddress` is connected to the current node. */ defonConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked when `remoteAddress` is lost. */ defonDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked when some network error happens in the connection between the current node and * `remoteAddress`. */ defonNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked before [[RpcEndpoint]] starts to handle any message. */ defonStart(): Unit = { // By default, do nothing. }
/** * Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot * use it to send or ask messages. */ defonStop(): Unit = { // By default, do nothing. }
private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
/** * return the address for the [[RpcEndpointRef]] */ defaddress: RpcAddress
defname: String
/** * Sends a one-way asynchronous message. Fire-and-forget semantics. */ defsend(message: Any): Unit
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within the specified timeout. * * This method only sends the message once and never retries. */ defask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within a default timeout. * * This method only sends the message once and never retries. */ defask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a * default timeout, throw an exception if this fails. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * @param message the message to send * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ defaskSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a * specified timeout, throw an exception if this fails. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send * @param timeout the timeout duration * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ defaskSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { val future = ask[T](message, timeout) timeout.awaitResult(future) }
defregisterRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = newNettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { thrownewIllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, newEndpointData(name, endpoint, endpointRef)) != null) { thrownewIllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }
defcreate(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = newJavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = newNettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { caseNonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv } }
/** * Posts a message to a specific endpoint. * * @param endpointName name of the endpoint. * @param message the message to post * @param callbackIfStopped callback function if the endpoint is stopped. */ privatedefpostMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { val error = synchronized { val data = endpoints.get(endpointName) if (stopped) { Some(newRpcEnvStoppedException()) } elseif (data == null) { Some(newSparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) receivers.offer(data) None } } // We don't need to call `onStop` in the `synchronized` block error.foreach(callbackIfStopped) }
/** Message loop used for dispatching messages. */ privateclassMessageLoopextendsRunnable{ overridedefrun(): Unit = { try { while (true) { try { val data = receivers.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it. receivers.offer(PoisonPill) return } data.inbox.process(Dispatcher.this) } catch { caseNonFatal(e) => logError(e.getMessage, e) } } } catch { case ie: InterruptedException => // exit } } }
上面的过程在Dispatcher 内部线程池执行。内部线程池如下
1 2 3 4 5 6 7 8 9
privateval threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i pool.execute(newMessageLoop) } pool }
caseRpcMessage(_sender, content, context) => try { endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => thrownewSparkException(s"Unsupported message $message from ${_sender}") }) } catch { caseNonFatal(e) => context.sendFailure(e) // Throw the exception -- this exception will be caught by the safelyCall function. // The endpoint's onError function will be called. throw e }