Numerous existing works have shown that, key to the efficiency of distributed machine learning (ML) is proper system and algorithm co-design: system design should be tailored to the unique mathematical properties of ML algorithms, and algorithms can be re-designed to better exploit the system architecture. While existing research has made attempts along this direction, many algorithmic and system properties that are characteristic of ML problems remain to be explored. Through an exploration of system-algorithm co-design, we build a new decentralized system Orpheus to support distributed training of a general class of ML models whose parameters are represented with large matrices. Training such models at scale is challenging: transmitting and checkpointing large matrices incur substantial network traffic and disk IO, which aggravates the inconsistency among parameter replicas. To cope with these challenges, Orpheus jointly exploits system and algorithm designs which (1) reduce the size and number of network messages for efficient communication, 2) incrementally checkpoint vectors for light-weight and fine-grained fault tolerance without blocking computation, 3) improve the consistency among parameter copies via periodic centralized synchronization and parameter-replicas rotation. As a result of these co-designs, communication and fault tolerance costs are linear to both matrix dimension and number of machines in the network, as opposed to being quadratic in existing systems. And the improved parameter consistency accelerates algorithmic convergence. Empirically, we show our system outperforms several existing baseline systems on training several representative large-scale ML models.
Operating at a large scale, data analytics has become an essential tool for gaining insights from operational data, such as user online activities. With the volume of data growing exponentially, data analytic jobs have expanded from a single datacenter to multiple geographically distributed datacenters. Unfortunately, designed originally for a single datacenter, the software stack that supports data analytics is oblivious to on-the-fly resource variations on inter-datacenter networks, which negatively affects the performance of analytic queries. Existing solutions that optimize query execution plans before their execution are not able to quickly adapt to resource variations at query runtime.
In this paper, we present Turbo, a lightweight and non-intrusive data-driven system that dynamically adjusts query execution plans for geo-distributed analytics in response to runtime resource variations across datacenters. A highlight of Turbo is its ability to use machine learning at runtime to accurately estimate the time cost of query execution plans, so that adjustments can be made when necessary. Turbo is non-intrusive in the sense that it does not require modifications to the existing software stack for data analytics. We have implemented a real-world prototype of Turbo, and evaluated it on a cluster of 33 instances across 8 regions in the Google Cloud platform. Our experimental results have shown that Turbo can achieve a cost estimation accuracy of over 95% and reduce query completion times by 41%.
Many machine learning applications operate in dynamic environments that change over time, in which models must be continually updated to capture the recent trend in data. However, most of today's learning frameworks perform training offline, without a system support for continual model updating.
In this paper, we design and implement Continuum, a general-purpose platform that streamlines the implementation and deployment of continual model updating across existing learning frameworks. In pursuit of fast data incorporation, we further propose two update policies, cost-aware and best-effort, that judiciously determine when to perform model updating, with and without accounting for the training cost (machine-time), respectively. Theoretical analysis shows that cost-aware policy is 2-competitive. We implement both polices in Continuum, and evaluate their performance through EC2 deployment and trace-driven simulations. The evaluation shows that Continuum results in reduced data incorporation latency, lower training cost, and improved model quality in a number of popular online learning applications that span multiple application domains, programming languages, and frameworks.
Distributed deep neural network (DDNN) training constitutes an increasingly important workload that frequently runs in the cloud. Larger DNN models and faster compute engines are shifting DDNN training bottlenecks from computation to communication. This paper characterizes DDNN training to precisely pinpoint these bottlenecks. We found that timely training requires high performance parameter servers (PSs) with optimized network stacks and gradient processing pipelines, as well as server and network hardware with balanced computation and communication resources. We therefore propose PHub, a high performance multi-tenant, rack-scale PS design. PHub co-designs the PS software and hardware to accelerate rack-level and hierarchical cross-rack parameter exchange, with an API compatible with many DDNN training frameworks. PHub provides a performance improvement of up to 2.7x compared to state-of-the-art cloud-based distributed training techniques for image classification workloads, with 25% better throughput per dollar.
Web services that enable users in multiple regions to collaborate can increase availability and decrease latency by replicating data across data centers. If such a service spreads its data across multiple cloud providers---for the associated performance, cost, and reliability benefits---it cannot rely on cloud providers to keep the data globally consistent.
Therefore, in this paper, we present an alternate approach to realizing global consistency in the cloud, which relies on cloud providers to only offer a strongly consistent storage service within each data center. A client library then accesses replicas stored in different data stores in a manner that preserves global consistency. To do so, our key contribution lies in rethinking the Paxos replication protocol to account for the limited interface offered by cloud storage. Compared to approaches not tailored for use in the cloud, our system CRIC can either halve median write latency or lower cost by up to 60%.
Existing state machine replication protocols are confronting two major challenges in geo-replication: (1) limited performance caused by load imbalance, and (2) severe performance degradation in heterogeneous environments or under high-contention workloads. This paper presents a new semi-decentralized approach to addressing both the challenges at the same time. Our protocol, SDPaxos, divides the task of a replication protocol into two parts: durably replicating each command across replicas without global order, and ordering all commands to enforce the consistency guarantee. We decentralize the process of replicating commands, which accounts for the largest proportion of load, to provide high performance. In contrast, we centralize the process of ordering commands, which is lightweight but needs a global view, for better performance stability against heterogeneity or contention. The key novelty lies in that SDPaxos achieves the optimal one-round-trip latency under realistic configurations, despite the two separated steps, replicating and ordering, which are both based on Paxos. We also design a recovery protocol to do rapid failover under failures, and a series of optimizations to boost performance. We show via a prototype implementation the significant advantage of SDPaxos on both throughput and latency, facing different environments and workloads.
State machine replication, a classic approach to fault tolerance, requires replicas to execute operations deterministically. Deterministic execution is typically ensured by having replicas execute operations serially in the same total order. Two classes of techniques have extended state machine replication to execute operations concurrently: late scheduling and early scheduling. With late scheduling, operations are scheduled for execution after they are ordered across replicas. With early scheduling, part of the scheduling decisions are made before requests are ordered; after requests are ordered, their scheduling must respect these restrictions. This paper generalizes early scheduling techniques. We propose an automated mechanism to schedule operations on worker threads at replicas, integrate our contributions to a popular state machine replication framework, and experimentally compare the resulting system to late scheduling.
We are seeing an explosion of uncertain data---i.e., data that is more properly represented by probability distributions or estimated values with error bounds rather than exact values---from sensors in IoT, sampling-based approximate computations and machine learning algorithms. In many cases, performing computations on uncertain data as if it were exact leads to incorrect results. Unfortunately, developing applications for processing uncertain data is a major challenge from both the mathematical and performance perspectives. This paper proposes and evaluates an approach for tackling this challenge in DAG-based data processing systems. We present a framework for uncertainty propagation (UP) that allows developers to modify precise implementations of DAG nodes to process uncertain inputs with modest effort. We implement this framework in a system called UP-MapReduce, and use it to modify ten applications, including AI/ML, image processing and trend analysis applications to process uncertain data. Our evaluation shows that UP-MapReduce propagates uncertainties with high accuracy and, in many cases, low performance overheads. For example, a social network trend analysis application that combines data sampling with UP can reduce execution time by 2.3x when the user can tolerate a maximum relative error of 5% in the final answer. These results demonstrate that our UP framework presents a compelling approach for handling uncertain data in DAG processing.
Modern data analytics systems use long-running executors to run an application's entire DAG. Executors exhibit salient time-varying resource requirements. Yet, existing schedulers simply reserve resources for executors statically, and use the peak resource demand to guide executor placement. This leads to low utilization and poor application performance.
We present Elasecutor, a novel executor scheduler for data analytics systems. Elasecutor dynamically allocates and explicitly sizes resources to executors over time according to the predicted time-varying resource demands. Rather than placing executors using their peak demand, Elasecutor strategically assigns them to machines based on a concept called dominant remaining resource to minimize resource fragmentation. Elasecutor further adaptively reprovisions resources in order to tolerate inaccurate demand prediction. Testbed evaluation on a 35-node cluster with our Spark-based prototype implementation shows that Elasecutor reduces makespan by more than 42% on average, reduces median application completion time by up to 40%, and improves cluster utilization by up to 55% compared to existing work.
Stratus is a new cluster scheduler specialized for orchestrating batch job execution on virtual clusters, dynamically allocated collections of virtual machine instances on public IaaS platforms. Unlike schedulers for conventional clusters, Stratus focuses primarily on dollar cost considerations, since public clouds provide effectively unlimited, highly heterogeneous resources allocated on demand. But, since resources are charged-for while allocated, Stratus aggressively packs tasks onto machines, guided by job runtime estimates, trying to make allocated resources be either mostly full (highly utilized) or empty (so they can be released to save money). Simulation experiments based on cluster workload traces from Google and TwoSigma show that Stratus reduces cost by 17-44% compared to state-of-the-art approaches to virtual cluster scheduling.
The vast majority of data center schedulers use task runtime estimates to improve the quality of their scheduling decisions. Knowledge about runtimes allows the schedulers, among other things, to achieve better load balance and to avoid head-of-line blocking. Obtaining accurate runtime estimates is, however, far from trivial, and erroneous estimates lead to sub-optimal scheduling decisions. Techniques to mitigate the effect of inaccurate estimates have shown some success, but the fundamental problem remains.
This paper presents Kairos, a novel data center scheduler that assumes no prior information on task runtimes. Kairos introduces a distributed approximation of the Least Attained Service (LAS) scheduling policy. Kairos consists of a centralized scheduler and per-node schedulers. The per-node schedulers implement LAS for tasks on their node, using preemption as necessary to avoid head-of-line blocking. The centralized scheduler distributes tasks among nodes in a manner that balances the load and imposes on each node a workload in which LAS provides favorable performance.
We have implemented Kairos in YARN. We compare its performance against the YARN FIFO scheduler and Big-C, an open-source state-of-the-art YARN-based scheduler that also uses preemption. Compared to YARN FIFO, Kairos reduces the median job completion time by 73% and the 99th percentile by 30%. Compared to Big-C, the improvements are 37% for the median and 57% for the 99th percentile. We evaluate Kairos at scale by implementing it in the Eagle simulator and comparing its performance against Eagle. Kairos improves the 99th percentile of short job completion times by up to 55% for the Google trace and 85% for the Yahoo trace.
Effective overload control for large-scale online service system is crucial for protecting the system backend from overload. Conventionally the design of overload control is ad-hoc for individual service. However, service-specific overload control could be detrimental to the overall system due to intricate service dependencies or flawed implementation of service. Service developers usually have difficulty to accurately estimate the dynamics of actual workload during the development of service. Therefore, it is essential to decouple the overload control from service logic. In this paper, we propose DAGOR, an overload control scheme designed for the account-oriented microservice architecture. DAGOR is service agnostic and system-centric. It manages overload at the microservice granule such that each microservice monitors its load status in real time and triggers load shedding in a collaborative manner among its relevant services when overload is detected. DAGOR has been used in the WeChat backend for five years. Experimental results show that DAGOR can benefit high success rate of service even when the system is experiencing overload, while ensuring fairness in the overload control.
Today's cloud database systems are not designed for seamless cost-performance trade-offs for changing SLOs. Database engineers have a limited number of trade-offs due to the limited storage types offered by cloud vendors, and switching to a different storage type requires a time-consuming data migration to a new database. We propose Mutant, a new storage layer for log-structured merge tree (LSM-tree) data stores that dynamically balances database cost and performance by organizing SSTables (files that store a subset of records) into different storage types based on SSTable access frequencies. We implemented Mutant by extending RocksDB and found in our evaluation that Mutant delivers seamless cost-performance trade-offs with the YCSB workload and a real-world workload trace. Moreover, through additional optimizations, Mutant lowers the user-perceived latency significantly compared with the unmodified database.
Container management frameworks, such as Docker, package diverse applications and their complex dependencies in self-contained images, which facilitates application deployment, distribution, and sharing. Currently, Docker employs a shared-nothing storage architecture, i.e. every Docker-enabled host requires its own copy of an image on local storage to create and run containers. This greatly inflates storage utilization, network load, and job completion times in the cluster. In this paper, we investigate the option of storing container images in and serving them from a distributed file system. By sharing images in a distributed storage layer, storage utilization can be reduced and redundant image retrievals from a Docker registry become unnecessary. We introduce Wharf, a middleware to transparently add distributed storage support to Docker. Wharf partitions Docker's runtime state into local and global parts and efficiently synchronizes accesses to the global state. By exploiting the layered structure of Docker images, Wharf minimizes the synchronization overhead. Our experiments show that compared to Docker on local storage, Wharf can speed up image retrievals by up to 12x, has more stable performance, and introduces only a minor overhead when accessing data on distributed storage.
We consider a common setting where storage is disaggregated from the compute in data-parallel systems. Colocating caching tiers with the compute machines can reduce load on the interconnect but doing so leads to new resource management challenges. We design a system Netco, which prefetches data into the cache (based on workload predictability), and appropriately divides the cache space and network bandwidth between the prefetches and serving ongoing jobs. Netco makes various decisions (what content to cache, when to cache and how to apportion bandwidth) to support end-to-end optimization goals such as maximizing the number of jobs that meet their service-level objectives (e.g., deadlines). Our implementation of these ideas is available within the open-source Apache HDFS project. Experiments on a public cloud, with production-trace inspired workloads, show that Netco uses up to 5x less remote I/O compared to existing techniques and increases the number of jobs that meet their deadlines up to 80%.
System virtualization (e.g., the virtual machine abstraction) has been established as the de facto standard form of isolation in multi-tenant clouds. More recently, unikernels have emerged as a way to reuse VM isolation while also being lightweight by eliminating the general purpose OS (e.g., Linux) from the VM. Instead, unikernels directly run the application (linked with a library OS) on the virtual hardware. In this paper, we show that unikernels do not actually require a virtual hardware abstraction, but can achieve similar levels of isolation when running as processes by leveraging existing kernel system call whitelisting mechanisms. Moreover, we show that running unikernels as processes reduces hardware requirements, enables the use of standard process debugging and management tooling, and improves the already impressive performance that unikernels exhibit.
Network Function Virtualization (NFV) aims to reduce costs and increase flexibility of networks by moving functionality traditionally implemented in custom hardware into software packet processing applications, or virtual network functions (VNFs), running on commodity servers in a cloud. This paper describes the design and implementation of libVNF, a library to build high performance, horizontally scalable VNFs. Unlike existing frameworks for VNF development, our library (i) can be used for the development of L2/L3 middleboxes as well as VNFs that are transport layer endpoints; (ii) seamlessly supports multiple network stacks in the backend; and (iii) enables distributed implementation of VNFs via functions for distributed state and replica management. We have implemented a variety of VNFs using our library to demonstrate the expressiveness of our API. Our evaluation shows that building VNFs using libVNF can reduce the number of lines of code in the VNF by up to 50%. Further, optimizations in our library ensure that the performance of VNFs built with our library scales well with increasing number of CPU cores and distributed replicas.
RDMA over Converged Ethernet (RoCE) promises low latency and low CPU utilization over commodity networks, and is attractive for cloud infrastructure services. Current implementations require Priority Flow Control (PFC) that uses backpressure-based congestion control to provide lossless networking to RDMA. Unfortunately, PFC compromises network stability. As a result, RoCE's adoption has been slow and requires complex network management. Recent efforts, such as DCQCN, reduce the risk to the network, but do not completely solve the problem.
We describe RoGUE, a new congestion control and recovery mechanism for RDMA over Ethernet that does not rely on PFC. RoGUE is implemented in software to support backward compatibility and accommodate network evolution, yet allows the use of RDMA for high performance, supporting both the RC and UC RDMA transports. Our experiments show that RoGUE achieves performance and CPU utilization matching or outperforming native RDMA protocols but gracefully tolerates congested networks.
A power management policy aims to improve energy efficiency by choosing an appropriate performance (voltage/frequency) state for a given core. In current virtualized environments, multiple virtual machines (VMs) running on the same core must follow a single power management policy governed by the hypervisor. However, we observe that such a per-core power management policy has two limitations. First, it cannot offer the flexibility of choosing a desirable power management policy for each VM (or client). Second, it often hurts the power efficiency of some or even all VMs especially when the VMs desire conflicting power management policies. To tackle these limitations, we propose a per-VM power management mechanism, VIP supporting Virtual Performance-state for each VM. Specifically, for VMs sharing a core, VIP allows each VM's guest OS to deploy its own desired power management policy while preventing such VMs from interfering/influencing each other's power management policy. That is, VIP can also facilitate a pricing model based on the choice of a power management policy. Second, identifying some inefficiency in strictly enforcing per-VM power management policies, we propose hypervisor-assisted techniques to further improve power and energy efficiency without compromising the key benefits of per-VM power management. To demonstrate the efficacy of VIP, we take a case that some VMs run CPU-intensive applications and other VMs run latency-sensitive applications sharing the same cores. Our evaluation shows that VIP reduces the overall energy consumption and improves the execution time of CPU-intensive applications compared with the default ondemand governor of Xen hypervisor up to 27% and 32%, respectively, without violating service level agreement (SLA) of latency-sensitive applications.
We present Henge, a system to support intent-based multi-tenancy in modern distributed stream processing systems. Henge supports multi-tenancy as a first-class citizen: everyone in an organization can now submit their stream processing jobs to a single, shared, consolidated cluster. Secondly, Henge allows each job to specify its own intents (i.e., requirements) as a Service Level Objective (SLO) that captures latency and/or throughput needs. In such an intent-driven multi-tenant cluster, the Henge scheduler adapts continually to meet jobs' respective SLOs in spite of limited cluster resources, and under dynamically varying workloads. SLOs are soft and are based on utility functions. Henge's overall goal is to maximize the total system utility achieved by all jobs in the system. Henge is integrated into Apache Storm and we present experimental results using both production jobs from Yahoo! and real datasets from Twitter.
Sprocket is a highly configurable, stage-based, scalable, serverless video processing framework that exploits intra-video parallelism to achieve low latency. Sprocket enables developers to program a series of operations over video content in a modular, extensible manner. Programmers implement custom operations, ranging from simple video transformations to more complex computer vision tasks, in a simple pipeline specification language to construct custom video processing pipelines. Sprocket then handles the underlying access, encoding and decoding, and processing of video and image content across operations in a highly parallel manner. In this paper we describe the design and implementation of the Sprocket system on the AWS Lambda serverless cloud infrastructure, and evaluate Sprocket under a variety of conditions to show that it delivers its performance goals of high parallelism, low latency, and low cost (10s of seconds to process a 3,600 second video 1000-way parallel for less than $3).
Many Data-Intensive Scalable Computing (DISC) systems do not support sophisticated cost-based query optimizers because they lack the necessary data statistics. Consequently many crucial optimizations, such as join order and plan selection, are not well supported in DISC systems. RIOS is a Runtime Integrated Optimizer for Spark that lazily binds to execution plans at runtime, after collecting the statistics needed to make more optimal decisions. We evaluate the efficacy of our approach and show that better plans can be derived at runtime, achieving more than an order-of-magnitude performance improvement compared to compile time generated plans produced by the Apache Spark rule-base optimizer.
To cope with today's large scale of data, parallel dataflow engines such as Hadoop, and more recently Spark and Flink, have been proposed. They offer scalability and performance, but require data scientists to develop analysis pipelines in unfamiliar programming languages and abstractions. To overcome this hurdle, dataflow engines have introduced some forms of multi-language integrations, e.g., for Python and R. However, this results in data exchange between the dataflow engine and the integrated language runtime, which requires inter-process communication and causes high runtime overheads. In this paper, we present ScootR, a novel approach to execute R in dataflow systems. ScootR tightly integrates the dataflow and R language runtime by using the Truffle framework and the Graal compiler. As a result, ScootR executes R scripts directly in the Flink data processing engine, without serialization and inter-process communication. Our experimental study reveals that ScootR outperforms state-of-the-art systems by up to an order of magnitude.
Streaming graph analysis extracts timely insights from evolving graphs, and has gained increasing popularity. For current streaming graph analytics systems, incoming updates are simply cached in a buffer, until being applied onto existing graph structure to construct a new snapshot. Iterative graph algorithms then work on the new snapshot to produce up-to-date analysis result. Nevertheless, we find that for widely used monotonic graph algorithms, the buffered updates can be effectively preprocessed to achieve fast and accurate analysis on new snapshots.
To this end, we propose GraPU, a streaming graph analytics system for monotonic graph algorithms. Before applying updates, GraPU preprocesses buffered updates in two sequential phases: 1) Components-based Classification first identifies the effective graph data that are actually affected by current updates, by classifying the vertices involved in buffered updates according to the predetermined connected components in underlying graph; 2) In-buffer Precomputation then generates safe and profitable intermediate values that can be later merged onto underlying graph to facilitate the convergence on new snapshots, by precomputing the values of vertices involved in buffered updates. After all updates are applied, GraPU calculates new vertex values in subgraph-centric manner. GraPU further presents Load-factors Guided Balancing to achieve subgraph-level load balance, by efficiently reassigning some vertices and edges among subgraphs beforehand. Evaluation shows that GraPU outperforms state-of-the-art KineoGraph by up to 19.67x, when running four monotonic graph algorithms on real-word graphs.
Cloud server systems such as Hadoop and Cassandra have enabled many real-world data-intensive applications running inside computing clouds. However, those systems present many data-corruption and performance problems which are notoriously difficult to debug due to the lack of diagnosis information. In this paper, we present DScope, a tool that statically detects data-corruption related software hang bugs in cloud server systems. DScope statically analyzes I/O operations and loops in a software package, and identifies loops whose exit conditions can be affected by I/O operations through returned data, returned error code, or I/O exception handling. After identifying those loops which are prone to hang problems under data corruption, DScope conducts loop bound and loop stride analysis to prune out false positives. We have implemented DScope and evaluated it using 9 common cloud server systems. Our results show that DScope can detect 42 real software hang bugs including 29 newly discovered software hang bugs. In contrast, existing bug detection tools miss detecting most of those bugs.
End-to-end tracing has emerged recently as a valuable tool to improve the dependability of distributed systems, by performing dynamic verification and diagnosing correctness and performance problems. Contrary to logging, end-to-end traces enable coherent sampling of the entire execution of specific requests, and this is exploited by many deployments to reduce the overhead and storage requirements of tracing. This sampling, however, is usually done uniformly at random, which dedicates a large fraction of the sampling budget to common, 'normal' executions, while missing infrequent, but sometimes important, erroneous or anomalous executions. In this paper we define the representative trace sampling problem, and present a new approach, based on clustering of execution graphs, that is able to bias the sampling of requests to maximize the diversity of execution traces stored towards infrequent patterns. In a preliminary, but encouraging work, we show how our approach chooses to persist representative and diverse executions, even when anomalous ones are very infrequent.
Systematically reasoning about the fine-grained causes of events in a real-world distributed system is challenging. Causality, from the distributed systems literature, can be used to compute the causal history of an arbitrary event in a distributed system, but the event's causal history is an over-approximation of the true causes. Data provenance, from the database literature, precisely describes why a particular tuple appears in the output of a relational query, but data provenance is limited to the domain of static relational databases. In this paper, we present wat-provenance: a novel form of provenance that provides the benefits of causality and data provenance. Given an arbitrary state machine, wat-provenance describes why the state machine produces a particular output when given a particular input. This enables system developers to reason about the causes of events in real-world distributed systems. We observe that automatically extracting the wat-provenance of a state machine is often infeasible. Fortunately, many distributed systems components have simple interfaces from which a developer can directly specify wat-provenance using a technique we call wat-provenance specifications. Leveraging the theoretical foundations of wat-provenance, we implement a prototype distributed debugging framework called Watermelon.
Cloud computing with large-scale datacenters provides great convenience and cost-efficiency for end users. However, the resource utilization of cloud datacenters is very low, which wastes a huge amount of infrastructure investment and energy to operate. To improve resource utilization, cloud providers usually co-locate workloads of different types on shared resources. However, resource sharing makes the quality of service (QoS) unguaranteed. In fact, improving resource utilization (IRU) and guaranteeing QoS at the same time in cloud has been a dilemma which we name an IRU-QoS curse. To tackle this issue, characterizing the workloads from real production cloud computing platforms is extremely important.
In this work, we analyze a recently released 24-hour trace dataset from a production cluster in Alibaba. We reveal three key findings which are significantly different from those from the Google trace. First, each online service runs in a container while batch jobs run on physical servers. Further, they are concurrently managed by two different schedulers and co-located on same servers, which we call semi-containerized co-location. Second, batch instances largely use the spare resources that containers reserved but not used, which shows the elasticity feature of resource allocation of the Alibaba cluster. Moreover, through resource overprovisioning, overbooking, and overcommitment, the resource allocation of the Alibaba cluster achieves high elasticity. Third, as the high elasticity may hurt the performance of co-located online services, the Alibaba cluster sets bounds of resources used by batch tasks to guarantee the steady performance of both online services and batch tasks, which we call plasticity of resource allocation.
Though there has been much study of information leakage channels exploiting shared hardware resources (memory, cache, and disk) in cloud environments, there has been less study of the exploitability of shared software resources. In this paper, we analyze the exploitability of cloud networking services (which are shared among cloud tenants) and introduce a practical method for building information leakage channels by monitoring workloads on the cloud networking services through the virtual firewall. We also demonstrate the practicality of this attack by implementing two different covert channels in OpenStack as well as a new class of side channels that can eavesdrop on infrastructure-level events. By utilizing a Long Short-Term Memory (LSTM) neural network model, our side channel attack could detect infrastructure level VM creation/termination events with 93.3% accuracy.
Public cloud datacenters implement a distributed computing environment built for economy at scale, with hundreds of thousands of compute and storage servers and a large population of predominantly small customers often densely packed to a compute server. Several recent contributions have investigated how equitable sharing and differentiated services can be achieved in this multi-resource environment, using the Extended Dominant Resource Fairness (EDRF) algorithm. However, we find that EDRF requires prohibitive execution time when employed at datacenter scale due to its iterative nature and polynomial time complexity; its closed-form expression does not alter its asymptotic complexity.
In response, we propose Deadline-Constrained DRF, or DC-DRF, an adaptive approximation of EDRF designed to support centralized multi-resource allocation at datacenter scale in bounded time. The approximation introduces error which can be reduced using a high-performance implementation, drawing on parallelization techniques from the field of High-Performance Computing and vector arithmetic instructions available in modern server processors. We evaluate DC-DRF at scales that exceed those previously reported by several orders of magnitude, calculating resource allocations for one million predominantly small tenants and one hundred thousand resources, in seconds. Our parallel implementation preserves the properties of EDRF up to a small error, and empirical results show that the error introduced by approximation is insignificant for practical purposes.
The increasing density of globally distributed datacenters reduces the network latency between neighboring datacenters and allows replicated services deployed across neighboring locations to share workload when necessary, without violating strict Service Level Objectives (SLOs).
We present Kurma, a practical implementation of a fast and accurate load balancer for geo-distributed storage systems. At run-time, Kurma integrates network latency and service time distributions to accurately estimate the rate of SLO violations for requests redirected across geo-distributed datacenters. Using these estimates, Kurma solves a decentralized rate-based performance model enabling fast load balancing (in the order of seconds) while taming global SLO violations. We integrate Kurma with Cassandra, a popular storage system. Using real-world traces along with a geo-distributed deployment across Amazon EC2, we demonstrate Kurma's ability to effectively share load among datacenters while reducing SLO violations by up to a factor of 3 in high load settings or reducing the cost of running the service by up to 17%.
Current wisdom to run computation-intensive deep neural network (DNN) on resource-constrained mobile devices is allowing the mobile clients to make DNN queries to central cloud servers, where the corresponding DNN models are pre-installed. Unfortunately, this centralized, cloud-based DNN offloading is not appropriate for emerging decentralized cloud infrastructures (e.g., cloudlet, edge/fog servers), where the client may send computation requests to any nearby server located at the edge of the network. To use such a generic edge server for DNN execution, the client should first upload its DNN model to the server, yet it can seriously delay query processing due to long uploading time. This paper proposes IONN (Incremental Offloading of Neural Network), a partitioning-based DNN offloading technique for edge computing. IONN divides a client's DNN model into a few partitions and uploads them to the edge server one by one. The server incrementally builds the DNN model as each DNN partition arrives, allowing the client to start offloading partial DNN execution even before the entire DNN model is uploaded. To decide the best DNN partitions and the uploading order, IONN uses a novel graph-based algorithm. Our experiments show that IONN significantly improves query performance in realistic hardware configurations and network conditions.
Wide-area data analytics has gained much attention in recent years due to the increasing need for analyzing data that are geographically distributed. Many of such queries often require real-time analysis on data streams that are continuously being generated across multiple locations. Yet, analyzing these geo-distributed data streams in a timely manner is very challenging due to the highly heterogeneous and limited bandwidth availability of the wide-area network (WAN). This paper examines the opportunity of applying multi-query optimization in the context of wide-area streaming analytics, with the goal of utilizing WAN bandwidth efficiently while achieving high throughput and low latency execution. Our approach is based on the insight that many streaming analytics queries often exhibit common executions, whether in consuming a common set of input data or performing the same data processing. In this work, we study different types of sharing opportunities and propose a practical online algorithm that allows streaming analytics queries to share their common executions incrementally. We further address the importance of WAN awareness in applying multi-query optimization. Without WAN awareness, sharing executions in a wide-area environment may lead to performance degradation. We have implemented our WAN-aware multi-query optimization in a prototype implementation based on Apache Flink. Experimental evaluation using Twitter traces on a real wide-area system deployment across geo-distributed EC2 data centers shows that our technique is able to achieve 21% higher throughput while saving WAN bandwidth consumption by 33% compared to a WAN-aware, sharing-agnostic system.
A distributed join is a fundamental operation for processing massive datasets in parallel. Unfortunately, computing an equi-join over such datasets is very resource-intensive, even when done in parallel. Given this cost, the equi-join operator becomes a natural candidate for optimization using approximation techniques, which allow users to trade accuracy for latency. Finding the right approximation technique for joins, however, is a challenging task. Sampling, in particular, cannot be directly used in joins; naïvely performing a join over a sample of the dataset will not preserve statistical properties of the query result.
To address this problem, we introduce ApproxJoin. We interweave Bloom filter sketching and stratified sampling with the join computation in a new operator that preserves statistical properties of an aggregation over the join output. ApproxJoin leverages Bloom filters to avoid shuffling non-joinable data items around the network, and then applies stratified sampling to obtain a representative sample of the join output. We implemented ApproxJoin in Apache Spark, and evaluated it using microbenchmarks and real-world workloads. Our evaluation shows that ApproxJoin scales well and significantly reduces data movement, without sacrificing tight error bounds on the accuracy of the final results. ApproxJoin achieves a speedup of up to 9x over unmodified Spark-based joins with the same sampling ratio. Furthermore, the speedup is accompanied by a significant reduction in the shuffled data volume, which is up to 82x less than unmodified Spark-based joins.
Cloud-based data analysis is nowadays common practice because of the lower system management overhead as well as the pay-as-you-go pricing model. The pricing model, however, is not always suitable for query processing as heavy use results in high costs. For example, in query-as-a-service systems, where users are charged per processed byte, collections of queries accessing the same data frequently can become expensive. The problem is compounded by the limited options for the user to optimize query execution when using declarative interfaces such as SQL. In this paper, we show how, without modifying existing systems and without the involvement of the cloud provider, it is possible to significantly reduce the overhead, and hence the cost, of query-as-a-service systems. Our approach is based on query rewriting so that multiple concurrent queries are combined into a single query. Our experiments show the aggregated amount of work done by the shared execution is smaller than in a query-at-a-time approach. Since queries are charged per byte processed, the cost of executing a group of queries is often the same as executing a single one of them. As an example, we demonstrate how the shared execution of the TPC-H benchmark is up to 100x and 16x cheaper in Amazon Athena and bigquery than using a query-at-a-time approach while achieving a higher throughput.
Cloud spot markets rent VMs for a variable price that is typically much lower than the price of on-demand VMs, which makes them attractive for a wide range of large-scale applications. However, applications that run on spot VMs suffer from cost uncertainty, since spot prices fluctuate, in part, based on supply, demand, or both. The difficulty in predicting spot prices affects users and applications: the former cannot effectively plan their IT expenditures, while the latter cannot infer the availability and performance of spot VMs, which are a function of their variable price. Prior work attempts to address this uncertainty by modeling and predicting individual spot prices based on historical data. However, a single model likely does not apply to different spot VMs, since they may have different levels of supply and demand. In addition, cloud providers may unilaterally change spot pricing algorithms, as EC2 has done multiple times, which can invalidate existing price models and prediction methods.
To address the problem, we use properties of cloud infrastructure and workloads to show that prices become more stable and predictable as they are aggregated together. We leverage this observation to define an aggregate index price for spot VMs that serves as a reference for what users should expect to pay. We show that, even when the spot prices for individual VMs are volatile, the index price remains stable and predictable. We then introduce cloud index tracking: a migration policy that tracks the index price to ensure applications running on spot VMs incur a predictable cost by migrating to a new spot VM if the current VM's price significantly deviates from the index price. We implement cloud index tracking on EC2, and show that it yields a predictable cost near that of the index price, but with much higher availability compared to prior work, which aggressively migrates to the lowest cost VM.
Flash-based key-value caching plays an important role in Internet services. Compared to in-memory key-value caches, flash-based key-value caches can provide a 10 to 100 times larger cache space, allowing to accommodate more data for a higher hit ratio. However, the current design relies on a simple hash-based indexing structure, which maintains the entire mapping table in DRAM memory. As the cache capacity continues to grow, such an "all-in-memory" approach raises concerns on cost, power, and scalability.
To address this critical memory challenge, we propose a hierarchical mapping scheme, called Cascade Mapping. This scheme exploits a widely existing phenomenon in key-value caches---only a small set of key-value items in the cache is frequently requested. Leveraging the strong temporal locality, we create a multi-tier mapping structure, aiming to serve the most popular key-value mappings within a small memory space and organize the majority in a highly optimized indexing structure in flash. We have implemented a prototype, called SlickCache, based on Twitter's Fatcache. Our experimental results show that we can achieve nearly identical performance as the conventional all-in-memory scheme, while using only a fraction (10%) of the required memory. Alternatively, this approach allows us to build a 10 times larger flash cache with the same amount of memory, which in turn increases the hit ratio by up to 8.2 times and the throughput by up to 125 times.
Key-value (KV) stores based on multi-stage structures are widely deployed in the cloud to ingest massive amounts of easily searchable user data. However, current KV storage systems inevitably sacrifice at least one of the performance objectives, such as write, read, space efficiency etc., for the optimization of others. To understand the root cause of and ultimately remove such performance disparities among the representative existing KV stores, we analyze their enabling mechanisms and classify them into two models of data structures facilitating KV operations, namely, the multi-stage tree (MS-tree) as represented by LevelDB, and the multi-stage forest (MS-forest) as typified by the size-tiered compaction in Cassandra. We then build a KV store on a novel split MS-forest structure, called SifrDB, that achieves the lowest write amplification across all workload patterns and minimizes space reservation for the compaction. In addition, we design a highly efficient parallel search algorithm that fully exploits the access parallelism of modern flash-based storage devices to substantially boost the read performance. Evaluation results show that under both micro and YCSB benchmarks, SifrDB outperforms its closest competitors, i.e., the popular MS-forest implementations, making it a highly desirable choice for the modern large-dataset-driven KV stores.
Persistent key-value stores have emerged as a main component in the data access path of modern data processing systems. However, they exhibit high CPU and I/O overhead. Today, due to power limitations it is important to reduce CPU overheads for data processing.
In this paper, we propose Kreon, a key-value store that targets servers with flash-based storage, where CPU overhead and I/O amplification are more significant bottlenecks compared to I/O randomness. We first observe that two significant sources of overhead in state-of-the-art key-value stores are: (a) The use of compaction in LSM-Trees that constantly perform merging and sorting of large data segments and (b) the use of an I/O cache to access devices, which incurs overhead even for data that reside in memory. To avoid these, Kreon performs data movement from level to level by performing partial instead of full data reorganization via the use of a full index per-level. In addition, Kreon uses memory-mapped I/O via a custom kernel path with Copy-On-Write.
We implement Kreon as well as our custom memory-mapped I/O path in Linux and we evaluate Kreon using commodity SSDs with both small and large datasets (up to 6 billion keys). For a large dataset that stresses I/O, Kreon reduces CPU cycles/op by up to 5.8x, reduces I/O amplification for inserts by up to 4.61x, and increases insert ops/s by up to 5.3x, compared to RocksDB, a state-of-the-art key-value store that is broadly used today.
Deep learning (DL) is popular in data-center as an important workload for artificial intelligence. With the recent breakthrough of using graphics accelerators and the popularity of DL framework, GPU server cluster dominates DL training in current practice. Cluster scheduler simply treats DL jobs as black-boxes and allocates GPUs as per job request specified by a user. However, other resources, e.g. CPU, are often allocated with workload-agnostic approaches. Kubeflow[1] performs heuristic static CPU resource assignment based on task types (e.g., worker, parameter-server), while [2] evenly divides CPUs of a server to each GPU. Despite the traditional impression that GPU is critical in DL, our observation suggests that the importance of CPU is undervalued. Identifying an appropriate CPU core number in a heterogeneous cluster is challenging yet performance critical to DL jobs. The diverse CPU usage characteristic is not well recognized in the following three aspects.
Heterogeneous CPU demand across jobs. Although powered by GPU accelerators, different workloads exhibit tremendous gap on CPU demand. Figure la illustrates the required CPU cores to reach the maximal training performance for different workloads. Overfeat and Resnet50 require 40 and 7 cores for V100 respectively. Moreover, different workloads are not equally sensitive given insufficient resources offer. The training speed of Overfeat reduces 45% from 14 cores to 7 cores, however, Resnet50 only reduces 20%. Therefore, sacrificing the performance of Resnet50 is more cost-effective than Overfeat under insufficient resources scenarios.
Better GPU, more CPU. Another key insight from Figure la is that, with better GPU allocated, the more CPUs are required. Moreover, with better GPU, Overfeat requires much more CPUs to maximize the performance comparing with Resnet50, showing different sensitivities. DL frameworks (e.g., Tensorflow) tend to overlap the computation in CPU (e.g., data pre-processing) and GPU (e.g., convolution) to maximize the resource utilization. With better GPU allocated, the latency of GPU operators reduces. Relatively it makes the latency of CPU operations become notable, calling for more CPUs. Furthermore, in contrast to the slowdown of CPU scaling, hardware accelerators (e.g., GPU) develop fast, which advocates carefully assignment of CPU resources for coordinating execution.
Waving demand over time. DL training is feedback driven exploration that introduces periodically training and model validation switching. For some sequence-to-sequence models, such as text summarization, the validation on generated output of trained model requires computation efforts different from training. Figure lb illustrates profiling for neural machine translation (NMT) tasks with 1 GPU and 4 CPU cores allocated. The CPU and GPU utilization are in cyclic variation. In training, 4 cores are sufficient as the average CPU utilization is only 104%. However, in validation, only 8% for GPU utilization while 387% for CPU utilization. The latency is bounded in CPU. We further increase the CPU resources to 24 cores, resulting in 75% the validation time reduction.
To address the CPU resource scheduling challenges, we present SAD, to maximize the cluster throughput with coarse-grained periodical rescheduling over an optimal experiment design based performance predictor. SAD exhibits adaptive characteristic-aware features to automatically infer appropriate CPU resources for allocation. Through lightweight profiling and continual monitoring, SAD captures the inter-job and intra-job resource demand heterogeneity of DL. The performance predictor in SAD can accurately suggest DL jobs training speed for different CPU numbers across various GPUs. Our small trace preliminary result shows that SAD improves the overall utilization by 19% while reduces the job completion time by 34% comparing with workload-agnostic allocation.
We present Scavenger, a reactive batch workload manager that opportunistically runs containerized batch jobs next to customer Virtual Machines (VMs) in a public cloud like setting to improve utilization. Scavenger dynamically regulates the resource usage of batch jobs, including CPU usage, memory capacity, and LLC capacity, to ensure that the customer VMs' resource demand is met at all times. We experimentally evaluate Scavenger and show that it considerably increases resource usage without compromising on the resource demand of customer VMs. Importantly, Scavenger does so without requiring any offline profiling or prior information about the customer workloads.
Containers have transformed the cluster management into an application oriented endeavor, thus being widely used as the deployment units (i.e., micro-services) of large scale cloud services. As opposed to VMs, containers allow for resource provisioning with fine granularity and their resource usage directly reflects the micro-service behaviors. Container management systems like Kubernetes and Mesos provision resources to containers according to the capacity requested by the developers. Resource usages estimated by the developers are grossly inaccurate. They tend to be risk-averse and over provision resources, as under-provisioning would cause poor runtime performance or failures.
Without actually running the workloads, resource provisioning is challenging. However, benchmarking production workloads at scale requires huge manual efforts. In this work, we leverage IBM Monitoring service to profile the resource usage of production IBM Watson services in rolling windows by focusing on both evaluating how developers request resources and characterizing the actual resource usage.
Our resource profiling study reveals two important characteristics of the cognitive workloads. 1. Stationarity. According to Augmented Dickey-Fuller test with 95% confidence, more than 95% of the container instances have stationary CPU usage while more than 85% have stationary memory usage, indicating that resource usage statistics do not change over time. We find for the majority of containers that the stationarity can be detected at the early stage of container execution and can hold throughout their lifespans. In addition, containers with non-stationary CPU or memory usage are also observed to implement predictable usage trends and patterns (e.g., trend stationarity or seasonality). 2. Predictability by container image. By clustering the containers based on their images, container resource usages within the same cluster are observed to exhibit strong statistical similarity. This suggests that the history of resource usage for one instance can be used to predict usage for future instances that run the same container image.
Based on profiling results of running containers in rolling windows, we propose a resource usage advisory system to refine the requested resource values of the running and arriving containers as illustrated in Fig. 1. Our system continuously retrieves the resource usage metrics of running containers from IBM monitoring service and predicts the resource usage profiles in a container resource usage prediction agent. Upon the arrival of a new pod1, the resource profile advisor, proposed as a module in the web-hooked admission controller in Kubernetes, checks whether the resource profile of each container in the pod has been predicted with confidence. If a container's profile has been predicted and cached in the container resource profile database, the default requested values of containers are refined by the predicted ones; otherwise, containers are forwarded to the scheduler without any change. Similarly, a resource profile auto-scaler is proposed to update the requested resource values of containers for running pods2 as soon as the database is updated.
Our study shows that developers request at least 1 core-per-second (cps) CPU and 1 GB memory for ≥ 70% of the containers, while ≥ 80% of the containers actually use less than 1 cps and 1GB. Additionally, ~ 20% of the containers are significantly under provisioned. We use resource usage data in one day to generate container resource profiles and evaluate our approach based on the actual usage on the following day. Without our system, average CPU (memory) usage for >90% of containers lies outside of 50% - 100% (70% - 100%) of the requested values. Our evaluation shows that our system can advise request values appropriately so that average and 95th percentile CPU (memory) usage for >90% of the containers are within 50% - 100% (70% - 100%) of the requested values. Furthermore, average CPU (memory) utilization across all pods is raised from 10% (26%) to 54% (88%).
The recent deceleration of Moore's law bespeaks new approaches for optimization of resources. Machine learning has been applied to a wide variety of problems across multiple domains; however, the space of machine learning research for storage optimization is only lightly explored. In this paper, we focus on learning IO access patterns with the aim of improving the performance of flash based devices. Flash based storage devices provide orders of magnitude better performance than HDDs, but they suffer from high tail latencies due to garbage collection (GC) which causes variable IO latency. In flash devices, GC is the method of relocating existing data and deleting stale data, in order to create empty blocks for new incoming data. By learning the temporal trends of IO accesses, we built workload specific regression models for predicting the future time when the SSD will be in GC mode. We tested our models on synthetic traces (random read/write mix with fixed blocksize) generated by FIO workload generator. For the purpose of determining when the SSD is in GC mode, we track I/O completion times and classify completions that take more than 10 times the median completion value as representing those times when the SSD is in GC mode. Experiments run on the SSD models we tested reveal that a GC phase usually last 400 ms and it happens every 7000 ms on average. Results show that our workload specific models are accurate in predicting the time of next GC mode, achieving RMSE score of 10.61.
The performance of flash devices can be further improved via efficient prefetching by learning IO access patterns. We use long short-term memory (LSTM) recurrent neural network (RNN) architecture to learn spatial patterns from block level I/O traces from SNIA, in order to predict the LBA to be requested ahead of time to be put in primary memory. Preliminary results show that the neural network based prefetchers are quite efficient in predicting the next requested LBA, achieving upto 82.5% accuracy. Our LSTM models are also very effective in predicting future IO operations (read/write) achieving high (91.6%) accuracy. We used a four layered neural network architecture with an LSTM layer containing 512 neurons and three other fully connected layers containing 256, 64 and 1000 neurons respectively. Time series models such as LSTM are very efficient in learning local temporal trends in data, which is useful in learning storage IO patterns. The work opens up a new direction towards using time series neural network model-based prefetching, and can be applied to a variety of problems in storage systems.
Unsupervised machine learning techniques can be used to cluster the IO accesses and store offsets in different blocks based on access patterns. The strategy is to cluster the offsets and store data in different physical blocks based on the frequency of writes to those blocks. The separation of hot and cold data will minimize the write amplification associated GC and improve performance. Newly launched multi-stream SSD's provide a perfect opportunity to utilize the idea mentioned above to improve the quality of service.
In this work, we propose a proactive online resource provisioning methodology that addresses the challenge of resource provisioning for IMC workloads in heterogeneous cloud platforms consist of diverse types of servers. As cloud platforms provide a wide range of server configuration choices [4], and the applications' performance and power consumption changes at run-time [3] and depends on the chosen configuration, resource provisioning in cloud platforms is a challenging optimization problem with a large search space to navigate. Our methodology proactively assigns a suitable hardware configuration to IMC program for energy-efficiency (EDP) optimization at run-time before any significant change occurs in application's behavior. This helps to save energy without sacrificing performance [2, 7]. We address these challenges by first characterizing diverse types of IMC workloads across different types of server architectures. The characterization aids to accurately capture applications' behavior [1] and train machine learning models [5, 6]. We use time series neural network to predict the next phase of an application. Our approach then uses artificial neural networks to estimate the performance and power consumption of predicted phase of application on various server configurations. Further, we use the genetic algorithm to distinguish close-to-optimal configuration to minimize EDP. Compared to Oracle scheduler, our methodology achieves 93% accuracy to allocate the right resource for each phase of the program. Our methodology improves the performance by 21% and the EDP by 40% on average, compared to the default scheduler.
Many security challenges arise when mutually untrusted tenants are co-located in the same virtualized network infrastructure. Cloud systems commonly employ different network isolation mechanisms to prevent interferences among tenants' networks, which may rely on different and complementary isolation strategies. In this work, we define three complementary strategies for addressing multi-tenant isolation in cloud networks, observe that no current virtualization architecture implements all the three strategies, and propose a novel architectural design to cover the identified gap.
The cloud computing technology offers consistent access to large-scale computing capabilities, thereby bringing convenience to life. However, the virtualized cloud systems are still too vulnerable to maintain performance scalability and service agility once a task burst surges in without any warning. A mounting account of research has been conducted on proper strategies for accurate workload prediction as well as effective resource reservation and arrangement, but commonly cloud providers seek help to strategies that deploy excessive resources, adding overhead cost and sacrificing the cloud's advantage of scalability, or otherwise fail to reconfigure timely and properly, causing dissatisfaction and even financial loss, which are not expected by both cloud providers and clients.
In this paper, we present a holistic solution called Workload Spike Targeted Cloud Management Solution (WITCAT) for virtualized cloud systems with three fundamental modules as a whole, which was seldom proposed before. By learning historical taskflow patterns, WITCAT can effectively classify the arriving tasks into clusters that feature respective workload traits. Then two different prediction means are employed to continually forecast the arrival rate and attributes of workloads for respective clusters, under two different characteristic scenarios: normal scenarios and bursty scenarios. Last, we employ a reservation strategy, makes full use of the available resources, strengthening the effectiveness of cloud service provisioning under workload spike.
As far as our knowledge reaches, the contributions are three-fold.
• We improve the clustering method for task characterization, where a Mahalanobis-distance-bused k-means clustering is adopted to eliminate the relevance among tasks' attributes.
• We employ a traffic-oriented two-scenario integrated prediction method, with a control knob that monitors the increment of tasks and triggers prediction means alternation for different workload scenarios.
• We develop a prediction-based heuristic algorithm for resource reservation and provisioning, reserving enough space in CPU and memory ahead of time for bursts without disabling the cloud's scalibility.
We conduct extensive experiments using Google cloud traces and the results outperform other scheduling algorithms in guarantee ratio (25.8% improved), total energy consumption (17.3% saved) and resource utilization (18.2% improved), which further indicates the advantages of our proposed solution towards task traffic bursts.
In heterogeneous or shared clusters, distributed learning processes are slowed down by straggling workers. In this work, we propose LB-BSP, a new synchronization scheme that eliminates stragglers by adapting each worker's training load (batch size) to its processing capability. For training in shared production clusters, a prerequisite for deciding the workers' batch sizes is to know their processing speeds before each iteration starts. To this end, we adopt NARX, an extended recurrent neural network that accounts for both the historical speeds and the driving factors such as CPU and memory in prediction.
Ubiquitously connected devices, e.g., Internet of Things (IoT), space telescopes, social networks, and GPS-enabled gadgets, are contributing to the perpetual and swift growth of the data. 2.5 exabytes of daily-produced data, of which 60-80% is geo-referenced. Space telescopes broadcast about 140 GB of data weekly. Availability of such large amount of data calls for new scalable query processing techniques. One of the techniques that is getting attention is sketching which summarizes the data and computes an approximate answer on the sketch. This general technique is used in partitioning [3], clustering [1], selectivity estimation [2], and visualization [4], among others. This paper introduces a sketching-based framework for big spatial data which provides four sketching methods and uses them to implement three common operations, namely, partitioning, clustering, and selectivity estimation. The framework is executed in three phases, sketching, local operation, and generalization, which can apply to a wide range of operations on big spatial data.
Sampling is a widely used sketching technique, but there exist other techniques such as uniform and non-uniform histograms which are not well-studied due to two challenges. First, each sketching method has a different representation and creation parameters, e.g., sampling ratio or number of histogram cells, which make it hard to compare their performance. Second, while existing algorithms can be used as-is with samples, other sketching methods might require some tweaks to the algorithms to work. This work provides a comprehensive evaluation to understand the trade-offs in the different sketching techniques for big spatial data.
In this paper, we present a three-phase sketching-based framework for big data processing. The first phase uses Spark to efficiently compute four types of data sketches, namely, sampling, uniform, non-uniform, and enhanced histograms. To make the sketching methods comparable, we define a parameter B which indicates the memory budget. Regardless of their representation, all sketching methods are designed to use up-to that memory budget. The second phase uses a single-machine to process the sketch and provide a partial answer to three popular and diverse operations, namely, partitioning, clustering, and selectivity estimation. Previous work mostly applied these techniques with sample-based sketches except for selectivity estimation which also used histograms. In this paper, we propose histogram-based spatial partitioning and K-means clustering and show that they can outperform sampling-based methods. The third phase takes the partial answer and scans all the data in parallel to generalize the answer to the entire dataset.
In our experiments, we use both real and synthetic datasets of up-to 2.7 billion records and 100 GB of data. We vary the memory budget that we use for sketching and study its effect in both the execution time and quality of the results.
Mnemo is an application profiling tool specialized for data serving and caching workloads, which retrieve data from cloud in-memory key-value stores. The increasing demand to boost application performance via in-memory data retrieval and the resulting spike in the overall system hosting cost, lead to the promise that cheaper but slower memory technologies, such as NVDIMMs (Non Volatile Memory), are going to co-exist with the currently predominant ones, i.e. DRAM. In such future cloud systems where the memory substrate is going to include heterogeneous hardware, Mnemo comes as the necessary memory sizing and data tiering consultant. Mnemo permits quick exploration of the trade-offs between the system cost and application performance, due to the various possible sizings of the hybrid memory system components.
Modern data centers are increasingly being provisioned with compute accelerators such as GPUs, FPGAs and ASIC's to catch up with the workload performance demands and reduce the total cost of ownership (TCO). By 2021, traffic within hyperscale datacenters is expected to quadruple with 94% of workloads moving to cloud-based datacenters according to Cisco's global cloud index. A majority of these workloads include data mining, image processing, speech recognition and gaming which uses GPUs for high throughput computing. This trend is evident as public cloud operators like Amazon and Microsoft have started to offer GPU-based infrastructure services in the recent times.
The GPU-bound applications in general, can either be batch or latency-sensitive. Typically the latency-critical applications subscribe to datacenter resources in the form of queries (e.g. inference requests from a DNN model). For example, a wearable health monitoring device aggregates several sensor data through a mobile application. In case of a data anomaly, inference services can be triggered from the mobile device to the cloud, requesting for a deep neural network (DNN) model that fits the symptom. Such inference requests which are GPU bound impose strict Service Level Agreements (SLAs) that is typically set around 150 to 500ms. In contrast to the regular datacenter batch workloads, these user-facing applications are typically hosted as services that occur and scale in short bursts. On the other hand, batch applications are HPC based compute-bound workloads which are throughput oriented. In a typical datacenter, these both applications might co-exist on the same device depends on the orchestration and scheduling policy. With the expected increase in such workloads, this GPU resource management problem is expected to exacerbate. Hence, GPUs/accelerators are on the critical path to ensure the performance and meet the end-to-end latency demands of such queries.
State-of-the-art resource orchestrators are agnostic of GPUs and their resource utilization footprints, and thus not equipped to dynamically orchestrate these accelerator-bound containers. On the other hand, job schedulers at the datacenter are heavily optimized and tuned for CPU-based systems. Kubernetes and Mesos by default does uniform task scheduling which statically assigns the GPU resources to the applications. The scheduled tasks access the GPUs via PCIe pass-through which gives the application complete access to the GPU as seen in Figure 1. Hence the resource utilization of the GPU is based on the parallelism of the application which is scheduled to run on it. In case of CPUs, Kubernetes has support for dynamic orchestration with the features such as node affinity, pod affinity, and pod preemption. However, these features cannot be extended for GPUs. This is because, it neither has the support for pod preemption nor the ability to query the real-time GPU metrics such as memory, symmetric multiprocessor (SM) utilization, PCIe bandwidth, etc. Moreover, the containers often overstate their GPU resource requirements such as memory, and this leads to severe resource underutilization which leads to multiple QoS violations because of queuing delays. We identify that by employing CPU-based scheduling policies for GPU-bound workloads would fail to yield high accelerator utilization and lead to poor performance per watt per query. Motivated by this, we propose a GPU-aware resource orchestration layer which enables the resource scheduler to take advantage of the GPUs by knowing their real-time utilization.
We further discuss the ideal scheduler properties for a GPU rich datacenter and list the challenges in developing such a production-grade GPU-based datacenter scheduler. Therefore we modify the well-known Google's Kubernetes datacenter-level resource orchestrator by making it GPU-aware by exposing GPU driver APIs. Based on our observations from Alibaba's cluster traces and real hardware GPU cluster experiments, we build Knots, a GPU-aware resource orchestration layer and integrate it with Kubernetes container orchestrator. In addition, we also evaluate three GPU-based scheduling schemes to schedule datacenter representative GPU workload mixes through Kube-Knots. Evaluations on a ten node GPU cluster demonstrate that Knots together with our proposed GPU-aware scheduling scheme improves the cluster-wide GPU utilization while significantly reducing the cluster-wide power consumption across three different workload mixes when compared against Kubernetes's default uniform scheduler.
The correct implementation of network policies (e.g., routing, NAT, VPNs, load balancing, and IDS/IPS) for underlying network functions is critical, as it determines the security, availability and performance of a production network. However, it is notoriously known that making sure network policies are correctly implemented is challenging, even for basic reachability policies. This becomes more challenging in cloud environments featured with SDN-enabled NFV, where multiple tenants are hosted with richer in-network services in the form of chained, virtualized network functions with dynamic, customized network policies.
To address this problem, existing approaches have been proposed to model network behaviors, generate synthetic network traffic, and verify intended network policies. However, these solutions face a fundamental challenge in SDN-enabled NFV --- lack of capturing dynamics of the production system. For example, virtual network functions (running in virtual machines) can be arbitrarily composed to realize service chaining on the fly; the chained network functions create more complex unpredictable network policies. Further, the on-demand cloud service model compounds this complexity with dynamic loads and varying network function requirements.
One (seemingly straightforward) solution may be to extend existing network models to capture dynamic system behaviors, and thus generate test traffic with broader coverage. However, despite the possibility of doing so, such model-based approaches will easily result in state-space explosion, which will take extensive time for completing a simple network verification task even for a small network. On the other hand, focusing on a subset of "intended" polices may reduce the state space, but could fail to catch some critical sources of violations in practice --- in most cases, it is even hard (or impossible) to know the intended polices without really operating network functions in a production environment (e.g., with improvised changes in NFV configurations/policies).
Ideally, conducting network verification in a live production environment --- complementary to model-based verification approaches --- is attractive, as production traffic captures the most exact, realistic dynamic state of the system that model-based verification tools cannot provide. However, doing so brings potential risks of impacting or even damaging a live production environment, as mis-configured "inline" test network functions could wrongly manipulate network traffic --- numerous network outrages are actually caused by (tiny) mis-configurations of a live production system. It becomes more problematic in multi-tenant cloud, as such misbehaviors could impact other tenants.
In this paper, we present ShadeNF, a novel online verification platform for testing in-cloud network functions in a production-like environment, without disrupting the live production system. ShadeNF enables such a production-like test environment (i.e., the shadow system) with an exact clone of the production network functions (to be tested), which captures the dynamic state and vulnerabilities of the live production system. Further, ShadeNF delicately steers live production traffic to the shadow system as the test traffic, which captures the dynamic state of the production workloads. The actual verification is operated in a completely isolated environment with desired resources (e.g., CPU, memory and storage), thus not interfering with the production system.
We make three key contributions in ShadeNF: First, ShadeNF introduces a new live, consistent snapshot approach to clone chained, dependent network functions by leveraging programmable SDN virtual switches. This approach both preserves a consistent snapshot and reduces performance overhead with no modifications to VMs software and legacy network flows. Second, to capture the dynamics of the production workloads, ShadeNF creates a new traffic forwarding plane, which selectively, unidirectionally steers the production traffic to the test system with new "programmable forwarding pipes". These forwarding pipes also enable the autochaining of arbitrary network functions. Last, to explore broader test coverage, ShadeNF advances existing model-based approaches by taking patterns of real production traffic into consideration --- ShadeNF populates synthetic test traffic with realistic traffic patterns that are captured and provided automatically.
We have implemented a ShadeNF platform prototype upon OpenStack. Our evaluation in a real cloud testbed shows that: (1) ShadeNF captures the dynamics of a production system without affecting the production system; (2) ShadeNF can effectively detect a variety of policy violations.
Application and platform security has always been critical for the success of any business. Traditionally, applications were deployed directly on physical servers. As a result, there are myriad traditional security solutions that were developed around this model to run as local agents on the systems they monitor and protect. These solutions are then refined and standardized with decades of experience. With the emergence of virtualization, cloud and particularly containerization, use of these solutions is becoming challenging with consolidation and scale. As we begin to deploy hundreds of cloud instances on a single node, traditional solutions, designed for local execution do not scale out. At the same time, the clean separation of a virtual machine (VM) or a container from the platform itself, and maturing introspection and inspection APIs provide a simple, practical way to decouple monitored from the monitors [3]. Furthermore, as the scope of cloud security expands from simple monitoring and auditing to more complex learning based analytics, analytics components are further offloaded to separate data services, where they can burn extensive cycles, and in some cases use specialized hardware for security analytics, out of the critical path of the monitored applications [5]. As a result, traditional agent-based tightly-coupled model is being replaced by a more dis-aggregated {system, observation, analytics, actions} architecture.
To implement such dis-aggregated model in practice, first system state needs to be transferred from cloud platform to analytic platform. File system more generally is representative of the system state that persists features of interest for security analytics like processes, metrics, configurations, packages across various files. Remote replication or snapshotting [1] of whole file system is very in-efficient, since only small set of files are accessed during the analytics. As a result, a new family of cloud-native security solutions have recently emerged in the field that uses various specialized data collection techniques[2, 4]. These techniques perform out-of band introspection of systems to interpret and extract required system features from the file system to essentially serialize system state into data. This data is then transferred to an analytic platform for analysis. Unlike the traditional security solutions that work locally against the system's standard POSIXy file system interfaces, these emerging security analytics "work from data" on the analytic platform. However since the target system is now available as "data", existing agent-based security solutions become incompatible to work against the system. One mitigating solution is to rewrite all existing solutions, which requires huge amount of resources and effort.
In Drishti, we address this challenge from a fundamentally different perspective. Instead of rewriting security solutions to work from data, we make the data work for traditional security applications. We achieve this by developing a pseudo-system interface over systems data collected from cloud instances. With this approach, existing solutions run unmodified, as "black box" software over this system interface, as if they were running on the actual cloud instance. Drishti framework is our realization of this approach. It is logically the inverse of the first step of cloud-native security analytics that convert system state into data. With Drishti we transform data back to system on the analytic platform by orchestrating two file system components. First, a standard native system interface is re-calibrated over the system data through our new FUSE file system, confuse or ClOud Native Filesystem in UserSpacE. Second, we mimic the "effect" of an agent installation via an overlay file system based on the the agent image. Within the Drishti framework the underlying data looks like a standard POSIX system to each on-boarded security solution. This allows us to run existing agent-based security solutions as is, but still decoupled from the actual system. Drishti also provides a standard and interoperable platform for designing new security analytic solutions.
Overall, Drishti demonstrates a novel, pragmatic and highly-practical approach for bringing security analytics into the cloud. It enables us to leverage existing solutions built based on decades of experience by eliminating the need for reinventing the wheel for cloud.
Various hardware-based Erasure Coding (EC) schemes have been proposed [5, 6, 8, 12-14] to leverage the advanced compute capabilities on modern data centers. Currently, there is no unified and easy way for distributed storage systems to fully exploit multiple devices such as CPUs, GPUs, and network devices (i.e., multi-rail support) to perform EC operations in parallel. In this paper, we validate that it is time to design an unified library to efficiently exploit heterogeneous EC coders. HDFS co-designed with our proposed library outperforms the write performance of replication scheme and the default HDFS EC coder by 2.7x - 6.1x and 2.4x - 3.3x, respectively, and improves the performance of read with failure recoveries by up to 2.6x and 5.1x compared to the replication scheme and the default HDFS EC coder, respectively.
There are many approaches in use today to either prevent or minimize the impact of inter-query interactions on a shared cluster. Preventive measures often provide query execution isolation at the resource allocation level to guarantee a predictable query performance. Despite these measures, performance issues due to concurrent executions of mixed workloads are a common problem in large scale data processing systems. As a result, answering questions like who is causing my query to slowdown is important to diagnose resource conflicts in a multi-tenant environment for accurate blame attribution. However, accurate analysis of resource contention is challenging owing to a complex cause-effect relationship between resource utilization and runtime of concurrent queries (see Figure 1). For example, when some tasks get delayed because of a high demand for a particular resource (e.g. if they are blocked on CPU), they hold on to other resources (e.g. memory) as well, thus causing contention for other concurrently running queries on the held resources. Based on our user-study experience, this process is non-trivial and tedious, and involves hours of manually debugging through a cycle of query interactions.
Trusted Execution Environments (TEEs) ensure strong data confidentiality for applications running in the TEEs even on untrusted servers. In particular, TEEs are expected to bring significant benefits to blockchain workloads for enterprise because it ensures confidentiality and correctness of transaction records without any heavy-weight data verification process such as proof-of-work. For example, Coco [3] improves both confidentiality and transaction throughput of existing blockchain protocols by utilizing TEE features.
Although executing blockchain workloads in TEEs is a promising technique, applying databases such as key-value store (KVS) to TEEs is challenging. Databases are essential for blockchain systems to manage a large amount of transaction records. However, TEEs are not suitable for running databases due to their limitations. First, since databases are memory-intensive and large-scale workloads, putting them into TEEs increases Trusted Computing Base (TCB) size, which makes the system vulnerable. Second, holding the whole blockchain records in TEEs is not realistic due to small trusted memory (e.g. 128MB provided by Intel SGX).
Secure database designs using TEEs have been proposed to guarantee data confidentiality and integrity, while existing approaches are not suitable for maintaining a large immutable blockchain ledger. A common approach is to put the entire/partial database codes into TEEs [1]. However, this approach incurs challenges due to the TEE restrictions such as a large TCB and limitations of database functionality. Another approach is to place the whole KVS codes outside of TEEs and allow the TEE application to load/store encrypted secret data from/to KVS [2]. While it ensures confidentiality of data stored in the untrusted region, it discloses the database codes to attackers. The compromised KVS may be attacked and delete data intentionally without being noticed.
This paper proposes TEE-KV, an immutable KVS database design that allows blockchain applications running in TEEs to securely store transaction records to the whole KVS codes placed outside of TEEs. TEE-KV provides simple KVS APIs such as Get and Put for the in-TEE application to transparently communicate with the untrusted domain. To ensure immutability of data stored in the untrusted domain, TEE-KV maintains an in-TEE keyset that holds all the keys that have ever been stored in the KVS. If Get requests to the KVS do not return any value, TEE-KV can verify the correctness of the results by searching the keyset. TEE-KV is also equipped with an in-TEE cache to reduce data transfer costs between the trusted and untrusted domains.
We implemented a prototype of TEE-KV with LevelDB 1.1.8 and Intel SGX functions. Lines of code (LOC) of in-TEE domain of the prototype is only 15.1% of the LevelDB code, which leads to smaller TCB size than an all-in-TEE approach which puts the whole KVS code in the TEE. In the experiments, we integrated our TEE-KV in the framework of Coco [3] and constructed a blockchain network with three nodes. We then evaluated the throughput of Ethereum blockchain transactions in the network. The results show that the transaction throughput without in-TEE cache achieves 0.95x of the all-in-TEE approach. The results also show that 4MB in-TEE cache improves the throughput to 1.03x.