Author: CarGurus Revved

  • Actor Pattern Controlling Connection Pool in Kotlin to Honor API Rate Limit Exceptions

    Actor Pattern Controlling Connection Pool in Kotlin to Honor API Rate Limit Exceptions

    Original written by Don Mitchell, Cargurus Technical Fellow (ret.).

    Computers have up to a dozen cpu cores which can all work in parallel (concurrently). It’s hard to write code to take advantage of that concurrency. Additionally, every time code makes a request of an external system, it has to wait a long time relative to computational speed for a response. The best systems will put that request to the side and begin working on the next request or an unprocessed response.

    It’s not unfathomable for a well-written program to have around 60 outstanding requests at a time. Sometimes this volume can overwhelm the systems on the other side and they can ask your system to stop sending requests for a few seconds during which your system would typically make hundreds of new requests. If you’ve written your code concurrently, it’s hard to have all the separate request processors coordinate. Naive coordination approaches will often cut concurrency by an order of magnitude thus defeating all your hard work coding the concurrency. Failure to coordinate may provoke the remote server to refuse to accept requests or to slow down request processing to such an extent that, once again, all your concurrency coding is for not.

    As we’ve made our system highly concurrent using Kotlin coroutines, we’ve occasionally stressed third-party APIs thus causing rate limit exceptions. Kotlin is great at concurrent processing with context switching on every API request, database query, or other network call. Kotlin’s dispatch mechanism allows an order of magnitude more processing stacks than there are CPUs.

    Our online advertisement generation and optimization system has billions of ads grouped into thousands of accounts distributed over various advertising channels. Our calls to these

    • request fine-grained spending and performance data for each account every hour,
    • adjust bids, budgets, and other limits for every campaign (over 30,000) several times daily,
    • check and update content for every ad every day.

    This activity adds up to a lot of remote requests and sometimes causes the advertising channel to send a rate-limiting exception.

    Our pre-actor implementation caught these exceptions per coroutine process (processing context, call stack). It handled them just within that call stack by delaying slightly more than whatever the error said. This meant that sibling call stacks hitting the same API endpoint would provoke many remote servers to issue higher exceptions. For example, GoogleAds will initially request a 15-second wait but exponentially increase that up to 15 minutes as requests come within that cessation period. Obviously waiting 15 minutes in this high-paced environment is unacceptable.

    By implementing the actor pattern to govern each remote call, we’ve been able to reduce rate limit escalations to almost none without impeding concurrent processing.

    Algorithm sketch

    Control flow
    Control flow

    The basic architecture is a fan-in channel, a single very simple and quick dispatcher, and a connection pool (fan-out analog) per external service. Per external service means not the fine-grained service but whatever granularity the remote endpoint is expecting to honor the rate limit. So, for example, in our system it’s each advertising host: Google, Facebook, MSN, etc.

    Calling methods must all be suspendable (details later). The calling methods send an ApiRequest object (defined below) to the fan-in channel owned by the actor. The actor iterates the channel, delays1 if there’s been a rate limit exception, requests a connection from the owned connection pool or delays if one is not available, and lastly dispatches the request to the connection within a try..catch block catching any rate-limiting errors but letting other errors through.

    Actor code

    The actor uses the obsolete but not yet replaced Kotlin CoroutineScope.actor. It’s defined in an abstract class for which there should be an implementing class per advertising host type. Implementing classes must define delayError(e: Exception): Long? method which takes any and all exceptions and returns null if the exception is not a rate limit or returns how many milliseconds to delay before allowing the next request.

    The code must be re-entrant even though the process loop never runs simultaneously because the exception handlers access the volatile variables. It also needs to segregate the coroutine dispatch pools of its callers from the various actors’ processing and their connection pools. If it tries to share (say just using Dispatchers.IO) and the code above the callers creates more than 643 coroutine children, there will be no Dispatchers left for the actors themselves nor their async invocations on the connection pools.

    The main actor loop is:

         /**
         * Call `queue.send(ApiRequest(poolSelector) { vendorClient -> ... result })`, and then
         * you can call `APIRequest.result()` on the request to have it wait for the actor to allocate
         * a pool object, call the body you passed, and assign the result to [result()].
         */
        // Note: the channel has no opinion about the return type of ApiRequest but the sender does; however, there
        // will be many senders with different opinions for the same channel.
        @OptIn(ObsoleteCoroutinesApi::class)
        val queue: SendChannel<ApiRequest<Any>> = actor(
            // actor needs to not compete w its users otherwise it may stall if they consume all dispatchers
            queueDispatcher,
            onCompletion = QueueCompletionHandler()
        ) {
            for (request in channel) {
                if (delayMsec > 0) { // did we catch a rate limit? if so, swap for duration
                    logger.debug { "Delay $delayMsec" }
                    delay(delayMsec)
                    delayMsec = 0 // reset after delay is done
                    logger.debug { "Delay done" }
                }
                // a bit race condition vulnerable if anything can possibly claim a connection between this loop
                // and the `availableConnection` call; however, actor itself is not concurrent; so, that should be impossible.
                while (!isConnectionAvailable()) {
                    logger.debug { "Waiting for connection to become available" }
                    delay(WAIT_DELAY)
                }
    
                logger.debug { "Connection available" }
    
                when (val eitherConnection = availableConnection(request.poolSelector)) {
                    is Either.Left -> {
                        logger.warn { "Could not get a connection for ${request.poolSelector}" }
                        request.recordException(eitherConnection.value)
                    }
    
                    is Either.Right -> {
                        val connection = eitherConnection.value
                        launch(poolDispatcher) {
                            // note this needs separate dispatcher pool from callers so that it can do work even if
                            // callers consume all threads
                            execBlockOnConnection(catchDelay(), request, connection)
                        }
                    }
                }
            }
        }

    This loop

    1. gets the next request,
    2. checks whether it needs to delay due to rate limit,
    3. gets or waits (process swaps) for a connection from the pool,
    4. launches a child coroutine, and
    5. has that child execute the request on the given connection.

    The code for most of the referenced symbols is collapsed below.Code details

        companion object {
          // how many children connection processes can there be (really the connection pool capacity)
          private val poolDispatcher = Dispatchers.IO.limitedParallelism(CONNECTION_POOL_LIMIT)
        
          // how many actors can run simultaneously (more can run via process swap)
          private val queueDispatcher = Dispatchers.Default.limitedParallelism(3)
        }
        @Volatile
        protected var delayMsec = 0 // same as Atomic in Java
        
        private suspend fun execBlockOnConnection(
            catchDelay: RetryPolicy<Throwable>,
            request: ApiRequest<Any>,
            connection: C
        ) {
            try {
                retry(limitAttempts(RETRY_LIMIT) + catchDelay) {
                    request.saveResult(tryRequest(connection, request.block))
                }
            } catch (e: CancellationException) {
                throw e
            } catch (e: Exception) {
                if (delayError(e) != null) {
                    logger.warn(e) { "Gave up delaying" }
                    request.recordException(DelayErrorHandlerFailed(e))
                } else {
                    request.recordException(e)
                }
            } finally {
                logger.debug { "Returning connection to pool" }
                returnConnection(request, connection)
            }
        }
    
        private fun catchDelay(): suspend RetryFailure<Throwable>.() -> RetryInstruction =
        {
            if (reason is Exception && reason !is CancellationException) {
                // catch delayable error: delay whole channel and this stack. Retry this stack request
                // after delay. (would really like to requeue request but even if that was possible it 
                // could lead to infinite loops)
                delayError(reason as Exception)?.let { msec ->
                    // max to protect against race conditions
                    if (msec > delayMsec) delayMsec = msec
                    delay(msec)
                    ContinueRetrying
                } ?: StopRetrying
            } else {
                StopRetrying
            }
        }
        
        inner class QueueCompletionHandler : CompletionHandler {
            override fun invoke(cause: Throwable?) {
                closeAll() // close all children coroutines
            }
        }

    Hopefully you can map the retry DSL to your retry mechanism.

    I’m not going to go into how the pool selector works2 other than to mention that the model allows for pool connection CRUD via any arbitrary key. Some APIs won’t have various connections. Some will have distinct connections per service. In the latter case, the service would be the key. Another example would be a read-only versus read-write connection pool. The pool manager checks if a connection of that type is available. If it’s not available, but we haven’t exhausted capacity, it allocates a new connection. If we’ve exhausted capacity via create, but some are idle, it destroys an arbitrary idle connection and allocates a new one for the given selector.

    Calling the actor and getting results

    The methods calling the VendorQueue should be running in their own coroutines set to suspend and swap upon a call to the VendorQueue. We usually wrap the calling method in runBlocking rather than define it as a suspend to minimize the colored functions requirement. The code sends the request to the queue (actor) and then returns the request’s result.

    ApiRequest and sample method for calling a specific GoogleAds service:

        /**
         * The object API which callers put into the actor's queue. Call [result] to suspend and wait for
         * the call to propagate through. If the call got an uncaught error, [result] will rethrow the error.
         * @param poolSelector if the actor should maintain different pools for various access, then this should
         *    have a non-null value. Not all actors have a poolSelector, but MSN uses read-only vs read-write. Google
         *    uses service class as pool selectors.
         * @param block what to call when the actor provides a connection.
         */
        inner class ApiRequest<R>(
            val poolSelector: T,
            val block: suspend (C) -> R,
        ) {
            private val resultHolder: CompletableDeferred<R> = CompletableDeferred()
    
            /**
             * Suspend waiting for the result. If there was an exception, this re-throws the exception.
             */
            suspend fun result(): R = resultHolder.await()
            fun saveResult(value: R): Boolean = resultHolder.complete(value)
            fun recordException(exception: Exception): Boolean = resultHolder.completeExceptionally(exception)
        }
    
        /**
        * Pass the given query to the Google search API (read-only).
        */
        fun readFromGoogle(
            externalAccountId: Long,
            query: String,
            transform: (GoogleAdsRow) -> SyncedContent
        ): List<SyncedContent> = runBlocking {
            val request = this.ApiRequest<SearchPagedResponse>(GoogleAdsServiceClient::class) { client ->
                check(client is GoogleAdsServiceClient)
                client.search(
                    SearchGoogleAdsRequest.newBuilder()
                        .setCustomerId(externalAccountId.toString())
                        .setQuery(query)
                        .build()
                )
            }
            this.queue.send(
                request as VendorQueue<KClass<out BackgroundResource>, BackgroundResource>.ApiRequest<Any>
            )
            return request.result().iterateAll().map(transform)
        }

    In the previous actor code, you’ll see that queue is the actor. The actor calls saveResult or recordException. The ApiRequest really only contains the connection selector. The payload is in the block which you see in the readFromGoogle example where query is the payload.

    Note that readFromGoogle will rethrow any caught error other than a rate limit error. The caller should handle all standard exceptions. It’s the call to request.result() which rethrows the error which recordException saved into the result.

    Extensions

    An obvious extension is converting to whatever replacement Kotlin provides for actor although it wouldn’t be too hard to replace actor with a SendChannel whose listener is the loop and it pushes to a Channel.

    Since GoogleAds uses streamable protobuf, we implement streamSearch over this pattern which returns a Flow<GoogleAdsRow>.

  • How CarGurus is Supercharging Our Microservice Developer Experience

    How CarGurus is Supercharging Our Microservice Developer Experience

    Originally written by Jahvon Dockery, Principal Software Development Engineer.

    As you may expect, maintaining a continuously growing distributed system architecture does come with developer experience challenges. For instance, running the services in development may require additional services or you may need multiple backing data stores with realistic data. Also, building and deploying the microservices across environments may become more challenging as the expected configuration or underlying platform may be different. As Frank Fodera described in the last Revved blog post, Decomposition Journey at CarGurus, over the last couple of years CarGurus has invested significantly into decomposing our monolithic services into many smaller microservices. You must be wondering how we were able to make our development team way more effective given this shift. Let me walk you through how we are supercharging the developer experience at CarGurus!

    There are many tools that seek to solve some of these challenges and in some organizations, a large number of shared shell scripts fill the gaps that the tools don’t fill – that’s exactly what we were doing at CarGurus for a large part of our decomposition journey and we still leverage some of those scripts and tools today. However, that still leaves a lot of undesired complexity for software engineers who may need to use many tools and many scripts during the development process.

    Enter Mach5 – an internal tool which serves to simplify a lot of the complexity involved with developing, configuring, and releasing microservices that are part of CarGurus’ distributed systems.

    Introducing Mach5

    Over the last couple of years, the Engineering Platform team at CarGurus has worked with the CarGurus product engineers to understand their typical development workflows and pains. Given what we learned, we developed Mach5. Named after the Mach Five from the 1960s manga and animated TV series “Speed Racer”, its main goal is to simplify and supercharge the developer experience for our software engineers. Much like the supercharged car from “Speed Racer,” Mach5 has many features designed to help the user overcome challenges – in our case, development process challenges.

    From the developer’s perspective, Mach5 is a command line interface that acts against a “workspace” of configuration files that coexist with the microservice’s source code. Under the hood, the command line interface is running various processes locally, resolves service dependencies on demand, and triggers infrastructure operations based on the service’s Mach5 configuration through a backing Mach5 registry service.

    Understanding Mach5 Environments

    A key component of Mach5 is the “Environment” concept. In Mach5, microservices are deployed to environments, therefore most Mach5 operations are done within the environment scope. Each Mach5 environment serves a specific purpose. For instance, each of our development teams have their own dedicated testing, staging, and production environments (for service deployments within those stages of the release cycle). In addition, each engineer also has their own dedicated development environments.

    Mach5 is able to map its own internal environments to the CarGurus’ infrastructure environments when acting on users’ requests. Given this, Mach5 can also provide guardrails around infrastructure environments that the typical user should not be modifying themself (e.g. production) . The below diagram illustrates this at a high level.

    Overview of mach5 environments
    Overview of mach5 environments

    We automate the creation of every environment based on an internal registry of development teams and software engineers. By automating this, we can guarantee that every new engineer onboarding into CarGurus and every new team that is created will have their Mach5 environments ready without any additional work on their part.

    Example use case – Mach5 deploy

    By far, the most used Mach5 CLI command by developers is mach5 deploy. This command simplifies many of the largest challenges around developing microservices in a distributed system.

    Overview of mach5 deployments
    Overview of mach5 deployments

    As shown in the above diagram, this single command triggers a multi-step workflow handled by the Mach5 CLI client and the backing registry service. We’ve designed Mach5 and the deploy command in a way that allows for extending and customizing the microservice deployment process without requiring the user to know too many specifics about the underlying platform that these services are running on. Now, let’s dive in deeper to understand some of the key parts of this workflow.

    Artifact build and publish

    At CarGurus, we have a variety of applications – including Java, Node, and Golang applications. We mostly leverage Bazel to build and publish these applications’ images. Often the first thing engineers will need to do if they wish to deploy a service for testing is set the correct Bazel tags or run the correct target. However, if you are working on multiple microservices, having to remember the various command syntaxes can increase an engineer’s cognitive load when context switching.

    This becomes even more challenging to do when teams need their own custom build logic or if they are using Maven instead. Mach5 is agnostic about the build and artifact publishing systems it uses. The underlying logic of those steps can be configured through scripts by our product engineers as part of the deployment’s preconditions. When an engineer runs a mach5 deploy, the client will automatically run those preconditions against the current code that the user has locally. The published artifacts are then used for the next step of the deploy, the workload deployment.

    Following a deployment, Mach5 will also run any configured postconditions for that service. A common use case for postconditions is syncing local assets to an external file system for the deployed service to use. Preconditions and postconditions lowers the barrier to entry for engineers across teams who may want to get a service that they don’t actively work on running without having to follow a series of manual steps for bootstrapping the service.

    Workload deployment

    A core principle we have for Mach5 is that its deployment is meant to be infrastructure-agnostic. Currently, CarGurus use Kubernetes to run most of our application workloads and have clusters for our production, staging, and development environments. Within each of those clusters, we associate namespaces to the various Mach5 “environments” described above. Our main use case currently is for deploying within these Kubernetes clusters but we have plans on extending Mach5 using our internal provider interface to enable users to deploy different types of workloads, such as AWS Lambda functions.

    Kubernetes can be a complex system to work with for many product engineers. There are many tools for deploying a workload to Kubernetes; including using kubectl or Helm directly. However, given the complexity of Kubernetes, it may not be safe to allow every software engineer to have access to applying changes into a Kubernetes cluster directly. With the provider interface I mentioned, Mach5 can be configured to deploy a raw Kubernetes manifest or it can take in a Helm chart and values files for the deployment.

    This gives us a great chance to do some pre-deployment processing and validation and to limit the permissions down to our single Mach5 backing registry. We are also planning on supercharging this process even more by enabling Mach5 to integrate with Kubernetes operators that respond to specific Custom Resource Definitions.

    Service Dependencies and Delegates

    One of the biggest challenges with developing a microservice is understanding service dependencies. At CarGurus our product engineers maintain over 80 microservices and that number continues to grow. In addition, each service may need a backing data store or they may need to leverage external systems like Kafka for messages. This is where Mach5 really drives a more seamless developer experience! Before we get to that, let’s revisit the “environment” concept.

    At a high level, each environment ends up being a collection of deployed services. With what we call “delegation”, that collection of services is conceptually expanded into a larger collection. Here is a simplified YAML representation of how a Mach5 environment is typically composed:

    name: engineerA
    selectors: [development]
    data:
      delegatedEnvironments:
      - name: global
        selectors: [staging]
      - name: teamA
        selectors: [staging, internal]
      deployments:
      - name: serviceA
      - name: serviceA
        selectors: [testingChange1]
      - name: serviceB
        selectors: [testingChange1]
      kubernetesData:
        cluster: dev-cluster-na
        namespace: engineerA-user-namespace
      owner: engineerA

    As part of an individual Mach5 service’s configuration, you can specify the dependencies on other services. The Mach5 registry that backs the CLI operations knows about all environments and all currently existing deployments. If it detects that service X depends on service Y, it will first check the current Mach5 environment for service Y. If it does not exist there, then it will check the delegated environments noted above to find the next closest match. All development Mach5 user environments are delegated to our staging environment.

    We represent the data stores and external systems as services that can be added to specific environments so the same searching will be applied to those dependencies. Thanks to Mach5, once the workload for service X has been deployed, it routes to the identified dependent services without requiring the engineer to separately deploy all of its dependencies.

    Overview of mach5 service resolution
    Overview of mach5 service resolution

    As you’d imagine, this provides some great benefits. If an engineer would like to test changes to two services together, all they have to do is deploy both of those services to their environment. They can even deploy their own instance of the backing data store if they do not want to use the delegated staging data store.

    At any point, they can use a mach5 undeploy command against their services when they are done testing or if they want to fall back to using the delegated service.

    Selectors

    Many software engineers may run into cases where they would like to do testing against one git branch, pass it off for feedback or testing, and continue additional development while they wait. That’s where our selector capability comes in. Selectors allow different variants of a single service to be deployed and linked to other similar variants.

    A user can specify an optional selector with any Mach5 deployment. That will follow the same flow as described above but it will keep any existing deployment without the specified selector untouched. As shown in the diagram above, the selector is also used as part of the service dependency search; so you can deploy multiple versions of a service within your environment (with differing selectors) but Mach5 will prefer the dependent services with a matching selector over deployments without one.

    For Mach5 environments, we use selectors to describe the various purposes of those environments. For instance, teamA will likely have 3 environments as noted above. The name is always teamA but the various selectors would be testingstaging, or production to represent those stages of the release cycle.

    Additional Debugging Tools

    The Mach5 CLI and the backing registry service are continuing to grow based on feedback from our internal product engineers. This has shaped Mach5 into a much more robust tool for all software engineers in the organization. In addition to managing microservice deployments, the CLI can be used to get information like available ingress hosts and logs for existing deployments.

    We are continuously learning, iterating, and improving Mach5 as a means to improve our overall developer experience at CarGurus. If this is something you’re interested in then I recommend checking out our open roles!