从Python到Rust - 4
第9章:并发编程
并发编程是现代软件开发中的重要议题。Rust以其独特的所有权系统和类型系统为并发编程提供了内存安全的保证。本章将深入探讨Rust的并发模型,并与Python进行对比。
9.1 并发模型对比
Python的并发模型
Python提供了多种并发编程方式,但受到GIL(全局解释器锁)的限制:
# Python - 线程并发(受GIL限制)
import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
# 多线程 - 适用于I/O密集型任务
def cpu_bound_task(n):
"""CPU密集型任务 - 由于GIL,多线程效果不佳"""
result = 0
for i in range(n):
result += i * i
return result
def io_bound_task(duration):
"""I/O密集型任务 - 多线程有效"""
time.sleep(duration)
return f"Task completed after {duration} seconds"
def demonstrate_python_threading():
print("=== Python Threading ===")
# CPU密集型任务 - 多线程效果不佳
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, 1000000) for _ in range(4)]
results = [f.result() for f in futures]
threading_time = time.time() - start
# 单线程对比
start = time.time()
results = [cpu_bound_task(1000000) for _ in range(4)]
sequential_time = time.time() - start
print(f"Threading time: {threading_time:.2f}s")
print(f"Sequential time: {sequential_time:.2f}s")
print(f"Threading overhead: {threading_time / sequential_time:.2f}x")
# I/O密集型任务 - 多线程有效
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(io_bound_task, 0.5) for _ in range(4)]
results = [f.result() for f in futures]
print(f"I/O tasks with threading: {time.time() - start:.2f}s")
# 多进程 - 绕过GIL限制
def demonstrate_python_multiprocessing():
print("=== Python Multiprocessing ===")
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, 1000000) for _ in range(4)]
results = [f.result() for f in futures]
multiprocessing_time = time.time() - start
print(f"Multiprocessing time: {multiprocessing_time:.2f}s")
# 异步编程 - 单线程并发
async def async_io_task(duration, task_id):
print(f"Task {task_id} started")
await asyncio.sleep(duration)
print(f"Task {task_id} completed")
return f"Task {task_id} result"
async def demonstrate_python_async():
print("=== Python Async/Await ===")
start = time.time()
tasks = [async_io_task(0.5, i) for i in range(4)]
results = await asyncio.gather(*tasks)
async_time = time.time() - start
print(f"Async tasks time: {async_time:.2f}s")
# 共享状态的问题
shared_counter = 0
lock = threading.Lock()
def unsafe_increment():
global shared_counter
for _ in range(100000):
shared_counter += 1 # 非线程安全
def safe_increment():
global shared_counter
for _ in range(100000):
with lock:
shared_counter += 1 # 线程安全,但有性能开销
def demonstrate_python_synchronization():
print("=== Python Synchronization ===")
global shared_counter
# 非安全版本
shared_counter = 0
threads = [threading.Thread(target=unsafe_increment) for _ in range(4)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
unsafe_time = time.time() - start
print(f"Unsafe result: {shared_counter} (expected: 400000) in {unsafe_time:.2f}s")
# 安全版本
shared_counter = 0
threads = [threading.Thread(target=safe_increment) for _ in range(4)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
safe_time = time.time() - start
print(f"Safe result: {shared_counter} (expected: 400000) in {safe_time:.2f}s")
if __name__ == "__main__":
demonstrate_python_threading()
demonstrate_python_multiprocessing()
asyncio.run(demonstrate_python_async())
demonstrate_python_synchronization()
Rust的并发模型
Rust通过所有权系统在编译时保证线程安全:
use std::thread;
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
// Rust的线程安全并发
fn cpu_bound_task(n: usize) -> usize {
let mut result = 0;
for i in 0..n {
result += i * i;
}
result
}
fn demonstrate_rust_threading() {
println!("=== Rust Threading ===");
// 多线程CPU密集型任务
let start = Instant::now();
let handles: Vec<_> = (0..4)
.map(|_| {
thread::spawn(|| cpu_bound_task(1_000_000))
})
.collect();
let results: Vec<_> = handles
.into_iter()
.map(|h| h.join().unwrap())
.collect();
let threading_time = start.elapsed();
// 单线程对比
let start = Instant::now();
let sequential_results: Vec<_> = (0..4)
.map(|_| cpu_bound_task(1_000_000))
.collect();
let sequential_time = start.elapsed();
println!("Threading time: {:?}", threading_time);
println!("Sequential time: {:?}", sequential_time);
println!("Speedup: {:.2}x", sequential_time.as_secs_f64() / threading_time.as_secs_f64());
}
// 安全的共享状态
fn demonstrate_rust_shared_state() {
println!("=== Rust Shared State ===");
// 使用Arc<Mutex<T>>共享状态
let counter = Arc::new(Mutex::new(0usize));
let mut handles = vec![];
let start = Instant::now();
for _ in 0..4 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..100_000 {
let mut num = counter.lock().unwrap();
*num += 1;
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let mutex_time = start.elapsed();
println!("Mutex result: {} (expected: 400000) in {:?}",
*counter.lock().unwrap(), mutex_time);
// 使用原子类型获得更好性能
let atomic_counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
let start = Instant::now();
for _ in 0..4 {
let counter = Arc::clone(&atomic_counter);
let handle = thread::spawn(move || {
for _ in 0..100_000 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let atomic_time = start.elapsed();
println!("Atomic result: {} (expected: 400000) in {:?}",
atomic_counter.load(Ordering::SeqCst), atomic_time);
println!("Atomic speedup: {:.2}x", mutex_time.as_secs_f64() / atomic_time.as_secs_f64());
}
fn main() {
demonstrate_rust_threading();
demonstrate_rust_shared_state();
}
9.2 消息传递模型
Python的消息传递
# Python - 队列和管道
import queue
import threading
import multiprocessing
import time
def python_queue_example():
"""使用队列进行线程间通信"""
print("=== Python Queue Communication ===")
q = queue.Queue()
results = []
def producer(name, items):
for i in range(items):
item = f"{name}-item-{i}"
q.put(item)
time.sleep(0.01)
q.put(None) # 结束标记
def consumer(name):
while True:
item = q.get()
if item is None:
q.put(None) # 传递结束标记给其他消费者
break
results.append(f"{name} processed {item}")
q.task_done()
# 启动生产者和消费者
producer_thread = threading.Thread(target=producer, args=("Producer", 5))
consumer_threads = [
threading.Thread(target=consumer, args=(f"Consumer-{i}",))
for i in range(2)
]
producer_thread.start()
for t in consumer_threads:
t.start()
producer_thread.join()
for t in consumer_threads:
t.join()
for result in results:
print(result)
def python_pipe_example():
"""使用管道进行进程间通信"""
print("=== Python Pipe Communication ===")
def worker(conn, worker_id):
while True:
try:
data = conn.recv()
if data == "STOP":
break
result = f"Worker {worker_id} processed: {data}"
conn.send(result)
except EOFError:
break
conn.close()
# 创建管道
parent_conn, child_conn = multiprocessing.Pipe()
# 启动工作进程
process = multiprocessing.Process(target=worker, args=(child_conn, 1))
process.start()
# 发送数据
for i in range(3):
parent_conn.send(f"task-{i}")
result = parent_conn.recv()
print(result)
# 停止工作进程
parent_conn.send("STOP")
process.join()
parent_conn.close()
if __name__ == "__main__":
python_queue_example()
python_pipe_example()
Rust的消息传递
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn rust_channel_example() {
println!("=== Rust Channel Communication ===");
// 创建通道
let (tx, rx) = mpsc::channel();
// 生产者线程
let producer = thread::spawn(move || {
for i in 0..5 {
let msg = format!("Message {}", i);
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
// tx会在这里自动drop,关闭通道
});
// 消费者线程(主线程)
for received in rx {
println!("Received: {}", received);
}
producer.join().unwrap();
}
fn rust_multiple_producers() {
println!("=== Rust Multiple Producers ===");
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
// 创建多个生产者
for i in 0..3 {
let tx = tx.clone();
let handle = thread::spawn(move || {
for j in 0..3 {
let msg = format!("Producer {} - Message {}", i, j);
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
handles.push(handle);
}
// 丢弃原始发送者
drop(tx);
// 接收消息
for received in rx {
println!("Received: {}", received);
}
// 等待所有生产者完成
for handle in handles {
handle.join().unwrap();
}
}
// 双向通信
fn rust_bidirectional_communication() {
println!("=== Rust Bidirectional Communication ===");
let (main_tx, worker_rx) = mpsc::channel();
let (worker_tx, main_rx) = mpsc::channel();
// 工作线程
let worker = thread::spawn(move || {
for msg in worker_rx {
match msg.as_str() {
"ping" => {
worker_tx.send("pong".to_string()).unwrap();
}
"stop" => {
worker_tx.send("stopping".to_string()).unwrap();
break;
}
_ => {
let response = format!("Processed: {}", msg);
worker_tx.send(response).unwrap();
}
}
}
});
// 主线程发送消息并接收响应
let messages = vec!["ping", "hello", "world", "stop"];
for msg in messages {
main_tx.send(msg.to_string()).unwrap();
if let Ok(response) = main_rx.recv() {
println!("Sent: {}, Received: {}", msg, response);
if response == "stopping" {
break;
}
}
}
worker.join().unwrap();
}
fn main() {
rust_channel_example();
rust_multiple_producers();
rust_bidirectional_communication();
}
9.3 并行计算模式
Rayon:数据并行
// 需要在Cargo.toml中添加: rayon = "1.7"
use rayon::prelude::*;
use std::time::Instant;
fn demonstrate_rayon() {
println!("=== Rayon Data Parallelism ===");
let data: Vec<i32> = (0..10_000_000).collect();
// 串行计算
let start = Instant::now();
let serial_sum: i32 = data.iter().map(|&x| x * x).sum();
let serial_time = start.elapsed();
// 并行计算
let start = Instant::now();
let parallel_sum: i32 = data.par_iter().map(|&x| x * x).sum();
let parallel_time = start.elapsed();
println!("Serial sum: {} in {:?}", serial_sum, serial_time);
println!("Parallel sum: {} in {:?}", parallel_sum, parallel_time);
println!("Speedup: {:.2}x", serial_time.as_secs_f64() / parallel_time.as_secs_f64());
// 并行排序
let mut data_clone = data.clone();
let start = Instant::now();
data_clone.sort();
let serial_sort_time = start.elapsed();
let mut data_clone = data.clone();
let start = Instant::now();
data_clone.par_sort();
let parallel_sort_time = start.elapsed();
println!("Serial sort: {:?}", serial_sort_time);
println!("Parallel sort: {:?}", parallel_sort_time);
println!("Sort speedup: {:.2}x",
serial_sort_time.as_secs_f64() / parallel_sort_time.as_secs_f64());
}
// 并行过滤和映射
fn parallel_data_processing() {
println!("=== Parallel Data Processing ===");
let numbers: Vec<i32> = (0..1_000_000).collect();
// 串行处理
let start = Instant::now();
let serial_result: Vec<i32> = numbers
.iter()
.filter(|&&x| x % 2 == 0)
.map(|&x| x * x)
.filter(|&x| x > 100)
.collect();
let serial_time = start.elapsed();
// 并行处理
let start = Instant::now();
let parallel_result: Vec<i32> = numbers
.par_iter()
.filter(|&&x| x % 2 == 0)
.map(|&x| x * x)
.filter(|&x| x > 100)
.collect();
let parallel_time = start.elapsed();
println!("Serial processing: {} items in {:?}",
serial_result.len(), serial_time);
println!("Parallel processing: {} items in {:?}",
parallel_result.len(), parallel_time);
println!("Processing speedup: {:.2}x",
serial_time.as_secs_f64() / parallel_time.as_secs_f64());
}
fn main() {
demonstrate_rayon();
parallel_data_processing();
}
9.4 异步编程
Python的异步编程
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取URL内容"""
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"Error fetching {url}: {e}"
async def python_async_example():
"""Python异步编程示例"""
print("=== Python Async Programming ===")
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
# 异步并发请求
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
async_time = time.time() - start
print(f"Async requests completed in: {async_time:.2f}s")
print(f"Total response length: {sum(len(r) for r in results)}")
# 异步生产者-消费者模式
async def async_producer(queue, name, items):
"""异步生产者"""
for i in range(items):
item = f"{name}-item-{i}"
await queue.put(item)
await asyncio.sleep(0.1) # 模拟工作
await queue.put(None) # 结束标记
async def async_consumer(queue, name):
"""异步消费者"""
results = []
while True:
item = await queue.get()
if item is None:
break
result = f"{name} processed {item}"
results.append(result)
await asyncio.sleep(0.05) # 模拟处理时间
return results
async def python_async_queue():
"""异步队列示例"""
print("=== Python Async Queue ===")
queue = asyncio.Queue()
# 启动生产者和消费者
producer_task = asyncio.create_task(
async_producer(queue, "Producer", 5)
)
consumer_task = asyncio.create_task(
async_consumer(queue, "Consumer")
)
# 等待完成
await producer_task
results = await consumer_task
for result in results:
print(result)
if __name__ == "__main__":
asyncio.run(python_async_example())
asyncio.run(python_async_queue())
Rust的异步编程
// 需要在Cargo.toml中添加:
// tokio = { version = "1", features = ["full"] }
// reqwest = "0.11"
use tokio::time::{sleep, Duration, Instant};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
#[tokio::main]
async fn main() {
rust_async_example().await;
rust_async_channels().await;
rust_async_shared_state().await;
}
async fn fetch_url(client: &reqwest::Client, url: &str) -> Result<String, Box<dyn std::error::Error>> {
let response = client.get(url).send().await?;
let body = response.text().await?;
Ok(body)
}
async fn rust_async_example() {
println!("=== Rust Async Programming ===");
let urls = vec![
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
];
let client = reqwest::Client::new();
let start = Instant::now();
// 并发发送请求
let tasks = urls.into_iter().map(|url| {
let client = &client;
async move {
fetch_url(client, url).await.unwrap_or_else(|e| format!("Error: {}", e))
}
});
let results = futures::future::join_all(tasks).await;
let async_time = start.elapsed();
println!("Async requests completed in: {:?}", async_time);
println!("Total response length: {}", results.iter().map(|r| r.len()).sum::<usize>());
}
// 异步通道
async fn rust_async_channels() {
println!("=== Rust Async Channels ===");
let (tx, mut rx) = mpsc::channel(32);
// 异步生产者
let producer = tokio::spawn(async move {
for i in 0..5 {
let msg = format!("Message {}", i);
if tx.send(msg).await.is_err() {
break;
}
sleep(Duration::from_millis(100)).await;
}
});
// 异步消费者
let consumer = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
sleep(Duration::from_millis(50)).await;
}
});
// 等待任务完成
let _ = tokio::join!(producer, consumer);
}
// 异步共享状态
async fn rust_async_shared_state() {
println!("=== Rust Async Shared State ===");
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
// 创建多个异步任务
for i in 0..4 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
for j in 0..10 {
{
let mut num = counter.lock().await;
*num += 1;
}
println!("Task {} incremented counter (iteration {})", i, j);
sleep(Duration::from_millis(10)).await;
}
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
let final_count = *counter.lock().await;
println!("Final counter value: {}", final_count);
}
// 异步工作池
use tokio::sync::Semaphore;
async fn async_worker_pool() {
println!("=== Async Worker Pool ===");
let semaphore = Arc::new(Semaphore::new(3)); // 最多3个并发任务
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
println!("Task {} started", i);
sleep(Duration::from_millis(500)).await; // 模拟工作
println!("Task {} completed", i);
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
}
9.5 实践练习
练习1:并发文件处理器
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
use rayon::prelude::*;
#[derive(Debug)]
struct FileStats {
total_files: usize,
total_size: u64,
largest_file: (String, u64),
file_types: std::collections::HashMap<String, usize>,
}
impl FileStats {
fn new() -> Self {
FileStats {
total_files: 0,
total_size: 0,
largest_file: (String::new(), 0),
file_types: std::collections::HashMap::new(),
}
}
fn merge(&mut self, other: FileStats) {
self.total_files += other.total_files;
self.total_size += other.total_size;
if other.largest_file.1 > self.largest_file.1 {
self.largest_file = other.largest_file;
}
for (ext, count) in other.file_types {
*self.file_types.entry(ext).or_insert(0) += count;
}
}
}
fn process_file(path: &Path) -> FileStats {
let mut stats = FileStats::new();
if let Ok(metadata) = path.metadata() {
if metadata.is_file() {
stats.total_files = 1;
stats.total_size = metadata.len();
stats.largest_file = (path.to_string_lossy().to_string(), metadata.len());
if let Some(extension) = path.extension() {
let ext = extension.to_string_lossy().to_lowercase();
stats.file_types.insert(ext, 1);
}
}
}
stats
}
fn collect_files(dir: &Path) -> Vec<std::path::PathBuf> {
let mut files = Vec::new();
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
files.push(path);
} else if path.is_dir() {
files.extend(collect_files(&path));
}
}
}
files
}
// 串行处理
fn process_files_sequential(files: &[std::path::PathBuf]) -> FileStats {
let mut total_stats = FileStats::new();
for file in files {
let stats = process_file(file);
total_stats.merge(stats);
}
total_stats
}
// 并行处理(使用Rayon)
fn process_files_parallel(files: &[std::path::PathBuf]) -> FileStats {
files
.par_iter()
.map(|file| process_file(file))
.reduce(FileStats::new, |mut acc, stats| {
acc.merge(stats);
acc
})
}
fn demonstrate_file_processor() {
println!("=== Concurrent File Processor ===");
let current_dir = std::env::current_dir().unwrap();
let files = collect_files(¤t_dir);
println!("Found {} files to process", files.len());
if files.is_empty() {
println!("No files found for processing");
return;
}
// 串行处理
let start = Instant::now();
let sequential_stats = process_files_sequential(&files);
let sequential_time = start.elapsed();
// 并行处理
let start = Instant::now();
let parallel_stats = process_files_parallel(&files);
let parallel_time = start.elapsed();
println!("Sequential processing: {:?}", sequential_time);
println!("Parallel processing: {:?}", parallel_time);
println!("Speedup: {:.2}x",
sequential_time.as_secs_f64() / parallel_time.as_secs_f64());
println!("\nFile Statistics:");
println!("Total files: {}", parallel_stats.total_files);
println!("Total size: {} bytes", parallel_stats.total_size);
println!("Largest file: {} ({} bytes)",
parallel_stats.largest_file.0, parallel_stats.largest_file.1);
println!("File types:");
for (ext, count) in ¶llel_stats.file_types {
println!(" .{}: {}", ext, count);
}
}
fn main() {
demonstrate_file_processor();
}
练习2:生产者-消费者系统
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::{Duration, Instant};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone)]
struct Task {
id: usize,
data: String,
priority: u8,
}
struct TaskProcessor {
processed_count: AtomicUsize,
processing_time: AtomicUsize, // 微秒
}
impl TaskProcessor {
fn new() -> Self {
TaskProcessor {
processed_count: AtomicUsize::new(0),
processing_time: AtomicUsize::new(0),
}
}
fn process_task(&self, task: Task) {
let start = Instant::now();
// 模拟处理时间(基于优先级)
let processing_duration = Duration::from_millis((10 - task.priority as u64) * 10);
thread::sleep(processing_duration);
let elapsed = start.elapsed();
self.processed_count.fetch_add(1, Ordering::SeqCst);
self.processing_time.fetch_add(elapsed.as_micros() as usize, Ordering::SeqCst);
println!("Processed task {} (priority: {}) in {:?}",
task.id, task.priority, elapsed);
}
fn get_stats(&self) -> (usize, Duration) {
let count = self.processed_count.load(Ordering::SeqCst);
let total_time = Duration::from_micros(
self.processing_time.load(Ordering::SeqCst) as u64
);
(count, total_time)
}
}
fn producer_consumer_system() {
println!("=== Producer-Consumer System ===");
let (tx, rx) = mpsc::channel::<Task>();
let processor = Arc::new(TaskProcessor::new());
let num_consumers = 4;
let num_tasks = 100;
// 创建消费者线程
let mut consumer_handles = vec![];
for consumer_id in 0..num_consumers {
let rx = rx.clone();
let processor = Arc::clone(&processor);
let handle = thread::spawn(move || {
loop {
match rx.recv() {
Ok(task) => {
processor.process_task(task);
}
Err(_) => {
println!("Consumer {} shutting down", consumer_id);
break;
}
}
}
});
consumer_handles.push(handle);
}
// 生产者线程
let producer_handle = thread::spawn(move || {
for i in 0..num_tasks {
let task = Task {
id: i,
data: format!("Task data {}", i),
priority: (i % 10 + 1) as u8,
};
if tx.send(task).is_err() {
break;
}
// 模拟生产间隔
thread::sleep(Duration::from_millis(5));
}
println!("Producer finished generating {} tasks", num_tasks);
});
// 等待生产者完成
producer_handle.join().unwrap();
// 关闭通道,让消费者知道没有更多任务
drop(tx);
// 等待所有消费者完成
for handle in consumer_handles {
handle.join().unwrap();
}
// 显示统计信息
let (processed_count, total_processing_time) = processor.get_stats();
println!("\nSystem Statistics:");
println!("Tasks processed: {}/{}", processed_count, num_tasks);
println!("Total processing time: {:?}", total_processing_time);
println!("Average processing time: {:?}",
total_processing_time / processed_count as u32);
}
fn main() {
producer_consumer_system();
}
9.6 对比总结
特性 | Python | Rust |
---|---|---|
线程模型 | 受GIL限制的真线程 | 真正的并行线程 |
内存安全 | 运行时检查 | 编译时保证 |
共享状态 | 需要显式锁定 | 类型系统强制安全 |
消息传递 | queue, multiprocessing | mpsc channels |
异步编程 | asyncio | async/await + tokio |
数据并行 | multiprocessing | rayon |
性能开销 | GIL + 锁开销 | 零成本抽象 |
错误处理 | 运行时异常 | 编译时检查 |
总结
第9章深入探讨了Rust和Python的并发编程模型。Rust通过所有权系统在编译时保证线程安全,避免了数据竞争和内存安全问题。相比Python的GIL限制,Rust能够真正实现CPU密集型任务的并行处理。
在下一章中,我们将学习Rust的生态系统和实际项目开发实践。
第10章:生态系统与实践
本章将介绍Rust的生态系统、包管理、工具链,以及如何在实际项目中应用Rust。我们将通过一个完整的项目示例来展示Rust开发的最佳实践。
10.1 Cargo:包管理器
Cargo vs pip
Python的包管理
# requirements.txt
requests==2.28.1
numpy>=1.21.0
pandas~=1.4.0
pytest>=7.0.0
black
flake8
# 安装依赖
# pip install -r requirements.txt
# setup.py 或 pyproject.toml
from setuptools import setup, find_packages
setup(
name="my-python-project",
version="0.1.0",
packages=find_packages(),
install_requires=[
"requests>=2.28.0",
"numpy>=1.21.0",
],
extras_require={
"dev": ["pytest", "black", "flake8"],
"data": ["pandas", "matplotlib"],
},
)
Rust的包管理
# Cargo.toml
[package]
name = "my-rust-project"
version = "0.1.0"
edition = "2021"
authors = ["Your Name <your.email@example.com>"]
description = "A sample Rust project"
license = "MIT OR Apache-2.0"
repository = "https://github.com/yourusername/my-rust-project"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
clap = { version = "4.0", features = ["derive"] }
anyhow = "1.0"
thiserror = "1.0"
[dev-dependencies]
criterion = "0.5"
proptest = "1.0"
[features]
default = ["json"]
json = ["serde_json"]
async = ["tokio"]
[[bin]]
name = "main"
path = "src/main.rs"
[[bin]]
name = "cli"
path = "src/bin/cli.rs"
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
panic = "abort"
Cargo工作空间
# 根目录的 Cargo.toml
[workspace]
members = [
"core",
"web",
"cli",
"shared",
]
resolver = "2"
[workspace.dependencies]
serde = "1.0"
tokio = "1.0"
anyhow = "1.0"
# core/Cargo.toml
[package]
name = "my-project-core"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
# web/Cargo.toml
[package]
name = "my-project-web"
version = "0.1.0"
edition = "2021"
[dependencies]
my-project-core = { path = "../core" }
tokio = { workspace = true, features = ["full"] }
axum = "0.6"
# cli/Cargo.toml
[package]
name = "my-project-cli"
version = "0.1.0"
edition = "2021"
[dependencies]
my-project-core = { path = "../core" }
clap = { version = "4.0", features = ["derive"] }
10.2 常用库和框架
Web开发
// 使用Axum构建Web API
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
#[derive(Debug, Serialize, Deserialize, Clone)]
struct User {
id: u32,
name: String,
email: String,
}
#[derive(Debug, Deserialize)]
struct CreateUser {
name: String,
email: String,
}
#[derive(Debug, Deserialize)]
struct UserQuery {
name: Option<String>,
}
type UserStore = Arc<Mutex<HashMap<u32, User>>>;
type AppState = UserStore;
async fn get_users(
Query(query): Query<UserQuery>,
State(store): State<AppState>,
) -> Json<Vec<User>> {
let users = store.lock().unwrap();
let filtered_users: Vec<User> = users
.values()
.filter(|user| {
query.name.as_ref().map_or(true, |name| {
user.name.to_lowercase().contains(&name.to_lowercase())
})
})
.cloned()
.collect();
Json(filtered_users)
}
async fn get_user(
Path(id): Path<u32>,
State(store): State<AppState>,
) -> Result<Json<User>, StatusCode> {
let users = store.lock().unwrap();
match users.get(&id) {
Some(user) => Ok(Json(user.clone())),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn create_user(
State(store): State<AppState>,
Json(create_user): Json<CreateUser>,
) -> Result<Json<User>, StatusCode> {
let mut users = store.lock().unwrap();
let id = users.len() as u32 + 1;
let user = User {
id,
name: create_user.name,
email: create_user.email,
};
users.insert(id, user.clone());
Ok(Json(user))
}
#[tokio::main]
async fn web_example() {
let store: AppState = Arc::new(Mutex::new(HashMap::new()));
let app = Router::new()
.route("/users", get(get_users).post(create_user))
.route("/users/:id", get(get_user))
.with_state(store);
let listener = TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("Server running on http://127.0.0.1:3000");
axum::serve(listener, app).await.unwrap();
}
命令行工具
// 使用Clap构建CLI工具
use clap::{Parser, Subcommand};
use std::fs;
use std::path::PathBuf;
use anyhow::{Context, Result};
#[derive(Parser)]
#[command(name = "file-manager")]
#[command(about = "A file management CLI tool")]
#[command(version)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(short, long)]
verbose: bool,
}
#[derive(Subcommand)]
enum Commands {
/// List files in directory
List {
/// Directory to list
#[arg(default_value = ".")]
path: PathBuf,
/// Show hidden files
#[arg(short = 'a', long)]
all: bool,
/// Show detailed information
#[arg(short, long)]
long: bool,
},
/// Copy files
Copy {
/// Source file
source: PathBuf,
/// Destination
destination: PathBuf,
/// Overwrite existing files
#[arg(short, long)]
force: bool,
},
/// Search for files
Search {
/// Search pattern
pattern: String,
/// Search directory
#[arg(default_value = ".")]
directory: PathBuf,
/// Case insensitive search
#[arg(short, long)]
ignore_case: bool,
},
}
fn list_files(path: &PathBuf, show_all: bool, detailed: bool) -> Result<()> {
let entries = fs::read_dir(path)
.with_context(|| format!("Failed to read directory: {}", path.display()))?;
for entry in entries {
let entry = entry?;
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy();
// 跳过隐藏文件(除非指定显示)
if !show_all && file_name_str.starts_with('.') {
continue;
}
if detailed {
let metadata = entry.metadata()?;
let file_type = if metadata.is_dir() { "DIR" } else { "FILE" };
let size = metadata.len();
println!("{:<6} {:>10} {}", file_type, size, file_name_str);
} else {
println!("{}", file_name_str);
}
}
Ok(())
}
fn copy_file(source: &PathBuf, destination: &PathBuf, force: bool) -> Result<()> {
if destination.exists() && !force {
anyhow::bail!("Destination exists. Use --force to overwrite.");
}
fs::copy(source, destination)
.with_context(|| {
format!("Failed to copy {} to {}",
source.display(), destination.display())
})?;
println!("Copied {} to {}", source.display(), destination.display());
Ok(())
}
fn search_files(pattern: &str, directory: &PathBuf, ignore_case: bool) -> Result<()> {
fn search_recursive(
dir: &PathBuf,
pattern: &str,
ignore_case: bool
) -> Result<Vec<PathBuf>> {
let mut results = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
results.extend(search_recursive(&path, pattern, ignore_case)?);
} else {
let file_name = path.file_name()
.unwrap_or_default()
.to_string_lossy();
let matches = if ignore_case {
file_name.to_lowercase().contains(&pattern.to_lowercase())
} else {
file_name.contains(pattern)
};
if matches {
results.push(path);
}
}
}
Ok(results)
}
let results = search_recursive(directory, pattern, ignore_case)?;
if results.is_empty() {
println!("No files found matching '{}'", pattern);
} else {
println!("Found {} files:", results.len());
for path in results {
println!(" {}", path.display());
}
}
Ok(())
}
fn cli_example() -> Result<()> {
let cli = Cli::parse();
if cli.verbose {
println!("Running in verbose mode");
}
match cli.command {
Commands::List { path, all, long } => {
list_files(&path, all, long)?;
}
Commands::Copy { source, destination, force } => {
copy_file(&source, &destination, force)?;
}
Commands::Search { pattern, directory, ignore_case } => {
search_files(&pattern, &directory, ignore_case)?;
}
}
Ok(())
}
fn main() {
if let Err(e) = cli_example() {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
数据处理
// 使用Serde进行JSON处理
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use anyhow::Result;
#[derive(Debug, Serialize, Deserialize)]
struct Config {
database: DatabaseConfig,
server: ServerConfig,
features: HashMap<String, bool>,
}
#[derive(Debug, Serialize, Deserialize)]
struct DatabaseConfig {
url: String,
max_connections: u32,
timeout_seconds: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct ServerConfig {
host: String,
port: u16,
workers: Option<u32>,
}
fn config_example() -> Result<()> {
// 创建配置
let mut features = HashMap::new();
features.insert("analytics".to_string(), true);
features.insert("caching".to_string(), false);
let config = Config {
database: DatabaseConfig {
url: "postgresql://localhost:5432/mydb".to_string(),
max_connections: 10,
timeout_seconds: 30,
},
server: ServerConfig {
host: "localhost".to_string(),
port: 8080,
workers: Some(4),
},
features,
};
// 序列化为JSON
let json_str = serde_json::to_string_pretty(&config)?;
println!("Configuration JSON:\n{}", json_str);
// 从JSON反序列化
let parsed_config: Config = serde_json::from_str(&json_str)?;
println!("Parsed config: {:?}", parsed_config);
Ok(())
}
// 使用CSV处理
use csv::ReaderBuilder;
use std::io::Cursor;
#[derive(Debug, Deserialize)]
struct SalesRecord {
date: String,
product: String,
quantity: u32,
price: f64,
}
fn csv_example() -> Result<()> {
let csv_data = r#"date,product,quantity,price
2023-01-01,Widget A,10,29.99
2023-01-02,Widget B,5,49.99
2023-01-03,Widget A,8,29.99
2023-01-04,Widget C,3,99.99"#;
let mut reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(Cursor::new(csv_data));
let mut total_revenue = 0.0;
let mut records = Vec::new();
for result in reader.deserialize() {
let record: SalesRecord = result?;
total_revenue += record.quantity as f64 * record.price;
records.push(record);
}
println!("Sales Records:");
for record in &records {
println!(" {:?}", record);
}
println!("Total Revenue: ${:.2}", total_revenue);
Ok(())
}
fn main() -> Result<()> {
config_example()?;
println!("\n{}", "=".repeat(50));
csv_example()?;
Ok(())
}
10.3 测试和文档
单元测试和集成测试
// src/lib.rs
pub struct Calculator {
memory: f64,
}
impl Calculator {
pub fn new() -> Self {
Calculator { memory: 0.0 }
}
pub fn add(&mut self, value: f64) -> f64 {
self.memory += value;
self.memory
}
pub fn subtract(&mut self, value: f64) -> f64 {
self.memory -= value;
self.memory
}
pub fn multiply(&mut self, value: f64) -> f64 {
self.memory *= value;
self.memory
}
pub fn divide(&mut self, value: f64) -> Result<f64, String> {
if value == 0.0 {
Err("Division by zero".to_string())
} else {
self.memory /= value;
Ok(self.memory)
}
}
pub fn clear(&mut self) {
self.memory = 0.0;
}
pub fn get_memory(&self) -> f64 {
self.memory
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_calculator() {
let calc = Calculator::new();
assert_eq!(calc.get_memory(), 0.0);
}
#[test]
fn test_basic_operations() {
let mut calc = Calculator::new();
assert_eq!(calc.add(5.0), 5.0);
assert_eq!(calc.subtract(2.0), 3.0);
assert_eq!(calc.multiply(4.0), 12.0);
assert_eq!(calc.divide(3.0).unwrap(), 4.0);
}
#[test]
fn test_division_by_zero() {
let mut calc = Calculator::new();
calc.add(10.0);
let result = calc.divide(0.0);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Division by zero");
}
#[test]
fn test_clear() {
let mut calc = Calculator::new();
calc.add(100.0);
calc.clear();
assert_eq!(calc.get_memory(), 0.0);
}
#[test]
fn test_chained_operations() {
let mut calc = Calculator::new();
calc.add(10.0);
calc.multiply(2.0);
calc.subtract(5.0);
assert_eq!(calc.get_memory(), 15.0);
}
}
// tests/integration_tests.rs
use my_rust_project::Calculator;
#[test]
fn test_calculator_workflow() {
let mut calc = Calculator::new();
// 模拟实际使用场景
calc.add(100.0);
assert_eq!(calc.get_memory(), 100.0);
calc.subtract(25.0);
assert_eq!(calc.get_memory(), 75.0);
calc.multiply(2.0);
assert_eq!(calc.get_memory(), 150.0);
calc.divide(3.0).unwrap();
assert_eq!(calc.get_memory(), 50.0);
calc.clear();
assert_eq!(calc.get_memory(), 0.0);
}
#[test]
fn test_error_recovery() {
let mut calc = Calculator::new();
calc.add(10.0);
let result = calc.divide(0.0);
assert!(result.is_err());
// 确保错误操作不影响内存
assert_eq!(calc.get_memory(), 10.0);
// 继续正常操作
calc.multiply(2.0);
assert_eq!(calc.get_memory(), 20.0);
}
属性测试
// 在Cargo.toml中添加:proptest = "1.0"
use proptest::prelude::*;
proptest! {
#[test]
fn test_add_commutative(a in -1000.0..1000.0f64, b in -1000.0..1000.0f64) {
let mut calc1 = Calculator::new();
let mut calc2 = Calculator::new();
calc1.add(a);
calc1.add(b);
calc2.add(b);
calc2.add(a);
prop_assert!((calc1.get_memory() - calc2.get_memory()).abs() < f64::EPSILON);
}
#[test]
fn test_multiply_by_zero(a in -1000.0..1000.0f64) {
let mut calc = Calculator::new();
calc.add(a);
calc.multiply(0.0);
prop_assert_eq!(calc.get_memory(), 0.0);
}
#[test]
fn test_add_subtract_inverse(a in -1000.0..1000.0f64, b in -1000.0..1000.0f64) {
let mut calc = Calculator::new();
calc.add(a);
calc.add(b);
calc.subtract(b);
prop_assert!((calc.get_memory() - a).abs() < f64::EPSILON);
}
}
基准测试
// benches/calculator_bench.rs
// 在Cargo.toml中添加:criterion = "0.5"
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use my_rust_project::Calculator;
fn bench_basic_operations(c: &mut Criterion) {
c.bench_function("add", |b| {
b.iter(|| {
let mut calc = Calculator::new();
for i in 0..1000 {
calc.add(black_box(i as f64));
}
})
});
c.bench_function("multiply", |b| {
b.iter(|| {
let mut calc = Calculator::new();
calc.add(1.0);
for i in 1..100 {
calc.multiply(black_box(1.01));
}
})
});
}
fn bench_mixed_operations(c: &mut Criterion) {
c.bench_function("mixed_operations", |b| {
b.iter(|| {
let mut calc = Calculator::new();
for i in 0..1000 {
match i % 4 {
0 => calc.add(black_box(i as f64)),
1 => calc.subtract(black_box(i as f64 / 2.0)),
2 => calc.multiply(black_box(1.1)),
3 => { let _ = calc.divide(black_box(2.0)); },
_ => unreachable!(),
};
}
})
});
}
criterion_group!(benches, bench_basic_operations, bench_mixed_operations);
criterion_main!(benches);
文档生成
//! # Calculator Library
//!
//! 这是一个简单的计算器库,提供基本的数学运算功能。
//!
//! ## Examples
//!
//! ```
//! use my_rust_project::Calculator;
//!
//! let mut calc = Calculator::new();
//! calc.add(10.0);
//! calc.multiply(2.0);
//! assert_eq!(calc.get_memory(), 20.0);
//! ```
//!
//! ## Features
//!
//! - 基本数学运算(加、减、乘、除)
//! - 内存存储
//! - 错误处理(除零检测)
/// 一个简单的计算器,支持基本数学运算和内存功能。
///
/// `Calculator` 维护一个内部内存值,所有操作都会更新这个值。
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(5.0);
/// calc.multiply(2.0);
/// assert_eq!(calc.get_memory(), 10.0);
/// ```
pub struct Calculator {
/// 计算器的内存值
memory: f64,
}
impl Calculator {
/// 创建一个新的计算器实例,内存初始化为0。
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let calc = Calculator::new();
/// assert_eq!(calc.get_memory(), 0.0);
/// ```
pub fn new() -> Self {
Calculator { memory: 0.0 }
}
/// 将值加到内存中。
///
/// # Arguments
///
/// * `value` - 要加的值
///
/// # Returns
///
/// 更新后的内存值
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// assert_eq!(calc.add(5.0), 5.0);
/// assert_eq!(calc.add(3.0), 8.0);
/// ```
pub fn add(&mut self, value: f64) -> f64 {
self.memory += value;
self.memory
}
/// 从内存中减去值。
///
/// # Arguments
///
/// * `value` - 要减的值
///
/// # Returns
///
/// 更新后的内存值
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(10.0);
/// assert_eq!(calc.subtract(3.0), 7.0);
/// ```
pub fn subtract(&mut self, value: f64) -> f64 {
self.memory -= value;
self.memory
}
/// 用值乘以内存。
///
/// # Arguments
///
/// * `value` - 要乘的值
///
/// # Returns
///
/// 更新后的内存值
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(5.0);
/// assert_eq!(calc.multiply(3.0), 15.0);
/// ```
pub fn multiply(&mut self, value: f64) -> f64 {
self.memory *= value;
self.memory
}
/// 用值除以内存。
///
/// # Arguments
///
/// * `value` - 要除的值
///
/// # Returns
///
/// 成功时返回更新后的内存值,除零时返回错误
///
/// # Errors
///
/// 当 `value` 为0时返回错误字符串 "Division by zero"
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(10.0);
/// assert_eq!(calc.divide(2.0).unwrap(), 5.0);
///
/// // 除零错误
/// assert!(calc.divide(0.0).is_err());
/// ```
pub fn divide(&mut self, value: f64) -> Result<f64, String> {
if value == 0.0 {
Err("Division by zero".to_string())
} else {
self.memory /= value;
Ok(self.memory)
}
}
/// 清空内存,将其重置为0。
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(100.0);
/// calc.clear();
/// assert_eq!(calc.get_memory(), 0.0);
/// ```
pub fn clear(&mut self) {
self.memory = 0.0;
}
/// 获取当前内存值。
///
/// # Returns
///
/// 当前内存中存储的值
///
/// # Examples
///
/// ```
/// use my_rust_project::Calculator;
///
/// let mut calc = Calculator::new();
/// calc.add(42.0);
/// assert_eq!(calc.get_memory(), 42.0);
/// ```
pub fn get_memory(&self) -> f64 {
self.memory
}
}
impl Default for Calculator {
fn default() -> Self {
Self::new()
}
}
10.4 完整项目示例:任务管理器
项目结构
task_manager/
├── Cargo.toml
├── src/
│ ├── main.rs
│ ├── lib.rs
│ ├── task.rs
│ ├── storage.rs
│ ├── cli.rs
│ └── error.rs
├── tests/
│ └── integration_tests.rs
├── examples/
│ └── basic_usage.rs
└── README.md
Cargo.toml
[package]
name = "task_manager"
version = "0.1.0"
edition = "2021"
authors = ["Your Name <your.email@example.com>"]
description = "A command-line task management tool"
license = "MIT OR Apache-2.0"
[dependencies]
clap = { version = "4.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1.0"
thiserror = "1.0"
colored = "2.0"
uuid = { version = "1.0", features = ["v4", "serde"] }
[dev-dependencies]
tempfile = "3.0"
assert_cmd = "2.0"
predicates = "2.0"
错误处理
// src/error.rs
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TaskError {
#[error("Task not found: {id}")]
TaskNotFound { id: String },
#[error("Invalid task status: {status}")]
InvalidStatus { status: String },
#[error("Storage error: {source}")]
Storage {
#[from]
source: std::io::Error,
},
#[error("Serialization error: {source}")]
Serialization {
#[from]
source: serde_json::Error,
},
#[error("Invalid date format: {date}")]
InvalidDate { date: String },
}
pub type Result<T> = std::result::Result<T, TaskError>;
任务模型
// src/task.rs
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::{TaskError, Result};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskStatus {
Todo,
InProgress,
Done,
}
impl std::fmt::Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Todo => write!(f, "TODO"),
TaskStatus::InProgress => write!(f, "IN_PROGRESS"),
TaskStatus::Done => write!(f, "DONE"),
}
}
}
impl std::str::FromStr for TaskStatus {
type Err = TaskError;
fn from_str(s: &str) -> Result<Self> {
match s.to_uppercase().as_str() {
"TODO" => Ok(TaskStatus::Todo),
"IN_PROGRESS" | "INPROGRESS" => Ok(TaskStatus::InProgress),
"DONE" => Ok(TaskStatus::Done),
_ => Err(TaskError::InvalidStatus {
status: s.to_string()
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Priority {
Low,
Medium,
High,
Urgent,
}
impl std::fmt::Display for Priority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Priority::Low => write!(f, "LOW"),
Priority::Medium => write!(f, "MEDIUM"),
Priority::High => write!(f, "HIGH"),
Priority::Urgent => write!(f, "URGENT"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
pub id: Uuid,
pub title: String,
pub description: Option<String>,
pub status: TaskStatus,
pub priority: Priority,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub due_date: Option<DateTime<Utc>>,
pub tags: Vec<String>,
}
impl Task {
pub fn new(title: String) -> Self {
let now = Utc::now();
Task {
id: Uuid::new_v4(),
title,
description: None,
status: TaskStatus::Todo,
priority: Priority::Medium,
created_at: now,
updated_at: now,
due_date: None,
tags: Vec::new(),
}
}
pub fn with_description(mut self, description: String) -> Self {
self.description = Some(description);
self.updated_at = Utc::now();
self
}
pub fn with_priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self.updated_at = Utc::now();
self
}
pub fn with_due_date(mut self, due_date: DateTime<Utc>) -> Self {
self.due_date = Some(due_date);
self.updated_at = Utc::now();
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self.updated_at = Utc::now();
self
}
pub fn update_status(&mut self, status: TaskStatus) {
self.status = status;
self.updated_at = Utc::now();
}
pub fn add_tag(&mut self, tag: String) {
if !self.tags.contains(&tag) {
self.tags.push(tag);
self.updated_at = Utc::now();
}
}
pub fn remove_tag(&mut self, tag: &str) {
self.tags.retain(|t| t != tag);
self.updated_at = Utc::now();
}
pub fn is_overdue(&self) -> bool {
if let Some(due_date) = self.due_date {
Utc::now() > due_date && self.status != TaskStatus::Done
} else {
false
}
}
}
#[derive(Debug, Clone)]
pub struct TaskFilter {
pub status: Option<TaskStatus>,
pub priority: Option<Priority>,
pub tags: Vec<String>,
pub overdue_only: bool,
}
impl TaskFilter {
pub fn new() -> Self {
TaskFilter {
status: None,
priority: None,
tags: Vec::new(),
overdue_only: false,
}
}
pub fn matches(&self, task: &Task) -> bool {
if let Some(status) = &self.status {
if &task.status != status {
return false;
}
}
if let Some(priority) = &self.priority {
if &task.priority != priority {
return false;
}
}
if !self.tags.is_empty() {
let has_all_tags = self.tags.iter()
.all(|tag| task.tags.contains(tag));
if !has_all_tags {
return false;
}
}
if self.overdue_only && !task.is_overdue() {
return false;
}
true
}
}
impl Default for TaskFilter {
fn default() -> Self {
Self::new()
}
}
存储层
// src/storage.rs
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use uuid::Uuid;
use crate::task::{Task, TaskFilter};
use crate::error::{TaskError, Result};
pub trait TaskStorage {
fn save_task(&mut self, task: &Task) -> Result<()>;
fn load_task(&self, id: Uuid) -> Result<Task>;
fn delete_task(&mut self, id: Uuid) -> Result<()>;
fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<Task>>;
fn update_task(&mut self, task: &Task) -> Result<()>;
}
pub struct JsonFileStorage {
file_path: PathBuf,
tasks: HashMap<Uuid, Task>,
}
impl JsonFileStorage {
pub fn new<P: AsRef<Path>>(file_path: P) -> Result<Self> {
let file_path = file_path.as_ref().to_path_buf();
let tasks = if file_path.exists() {
let content = fs::read_to_string(&file_path)?;
if content.trim().is_empty() {
HashMap::new()
} else {
serde_json::from_str(&content)?
}
} else {
HashMap::new()
};
Ok(JsonFileStorage { file_path, tasks })
}
fn save_to_file(&self) -> Result<()> {
let content = serde_json::to_string_pretty(&self.tasks)?;
fs::write(&self.file_path, content)?;
Ok(())
}
}
impl TaskStorage for JsonFileStorage {
fn save_task(&mut self, task: &Task) -> Result<()> {
self.tasks.insert(task.id, task.clone());
self.save_to_file()
}
fn load_task(&self, id: Uuid) -> Result<Task> {
self.tasks.get(&id)
.cloned()
.ok_or(TaskError::TaskNotFound {
id: id.to_string()
})
}
fn delete_task(&mut self, id: Uuid) -> Result<()> {
if self.tasks.remove(&id).is_some() {
self.save_to_file()
} else {
Err(TaskError::TaskNotFound {
id: id.to_string()
})
}
}
fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<Task>> {
let filtered_tasks: Vec<Task> = self.tasks
.values()
.filter(|task| filter.matches(task))
.cloned()
.collect();
Ok(filtered_tasks)
}
fn update_task(&mut self, task: &Task) -> Result<()> {
if self.tasks.contains_key(&task.id) {
self.tasks.insert(task.id, task.clone());
self.save_to_file()
} else {
Err(TaskError::TaskNotFound {
id: task.id.to_string()
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
use crate::task::{Priority, TaskStatus};
#[test]
fn test_json_storage_crud() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
let mut storage = JsonFileStorage::new(temp_file.path())?;
// Create
let task = Task::new("Test task".to_string())
.with_priority(Priority::High);
storage.save_task(&task)?;
// Read
let loaded_task = storage.load_task(task.id)?;
assert_eq!(loaded_task.title, "Test task");
assert_eq!(loaded_task.priority, Priority::High);
// Update
let mut updated_task = loaded_task;
updated_task.update_status(TaskStatus::Done);
storage.update_task(&updated_task)?;
let loaded_updated = storage.load_task(task.id)?;
assert_eq!(loaded_updated.status, TaskStatus::Done);
// Delete
storage.delete_task(task.id)?;
assert!(storage.load_task(task.id).is_err());
Ok(())
}
#[test]
fn test_task_filtering() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
let mut storage = JsonFileStorage::new(temp_file.path())?;
let task1 = Task::new("Task 1".to_string())
.with_priority(Priority::High)
.with_tags(vec!["work".to_string()]);
let task2 = Task::new("Task 2".to_string())
.with_priority(Priority::Low)
.with_tags(vec!["personal".to_string()]);
storage.save_task(&task1)?;
storage.save_task(&task2)?;
// Filter by priority
let filter = TaskFilter {
priority: Some(Priority::High),
..TaskFilter::default()
};
let high_priority_tasks = storage.list_tasks(&filter)?;
assert_eq!(high_priority_tasks.len(), 1);
assert_eq!(high_priority_tasks[0].title, "Task 1");
// Filter by tag
let filter = TaskFilter {
tags: vec!["personal".to_string()],
..TaskFilter::default()
};
let personal_tasks = storage.list_tasks(&filter)?;
assert_eq!(personal_tasks.len(), 1);
assert_eq!(personal_tasks[0].title, "Task 2");
Ok(())
}
}
CLI接口
// src/cli.rs
use clap::{Parser, Subcommand};
use chrono::{DateTime, Utc};
use colored::*;
use uuid::Uuid;
use crate::task::{Priority, Task, TaskFilter, TaskStatus};
use crate::storage::{JsonFileStorage, TaskStorage};
use crate::error::Result;
#[derive(Parser)]
#[command(name = "task-manager")]
#[command(about = "A command-line task management tool")]
#[command(version)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
#[arg(short, long, default_value = "tasks.json")]
pub file: String,
}
#[derive(Subcommand)]
pub enum Commands {
/// Add a new task
Add {
/// Task title
title: String,
/// Task description
#[arg(short, long)]
description: Option<String>,
/// Task priority (low, medium, high, urgent)
#[arg(short, long, default_value = "medium")]
priority: String,
/// Due date (ISO 8601 format)
#[arg(short = 'd', long)]
due_date: Option<String>,
/// Tags (comma-separated)
#[arg(short, long)]
tags: Option<String>,
},
/// List tasks
List {
/// Filter by status
#[arg(short, long)]
status: Option<String>,
/// Filter by priority
#[arg(short, long)]
priority: Option<String>,
/// Filter by tags
#[arg(short, long)]
tags: Option<String>,
/// Show only overdue tasks
#[arg(long)]
overdue: bool,
},
/// Update task status
Update {
/// Task ID
id: String,
/// New status (todo, in_progress, done)
status: String,
},
/// Delete a task
Delete {
/// Task ID
id: String,
},
/// Show task details
Show {
/// Task ID
id: String,
},
}
pub fn run_cli() -> Result<()> {
let cli = Cli::parse();
let mut storage = JsonFileStorage::new(&cli.file)?;
match cli.command {
Commands::Add {
title,
description,
priority,
due_date,
tags
} => {
add_task(&mut storage, title, description, priority, due_date, tags)?;
}
Commands::List {
status,
priority,
tags,
overdue
} => {
list_tasks(&storage, status, priority, tags, overdue)?;
}
Commands::Update { id, status } => {
update_task(&mut storage, id, status)?;
}
Commands::Delete { id } => {
delete_task(&mut storage, id)?;
}
Commands::Show { id } => {
show_task(&storage, id)?;
}
}
Ok(())
}
fn add_task(
storage: &mut JsonFileStorage,
title: String,
description: Option<String>,
priority: String,
due_date: Option<String>,
tags: Option<String>,
) -> Result<()> {
let priority: Priority = match priority.to_lowercase().as_str() {
"low" => Priority::Low,
"medium" => Priority::Medium,
"high" => Priority::High,
"urgent" => Priority::Urgent,
_ => Priority::Medium,
};
let mut task = Task::new(title).with_priority(priority);
if let Some(desc) = description {
task = task.with_description(desc);
}
if let Some(due_str) = due_date {
let due_date: DateTime<Utc> = due_str.parse()
.map_err(|_| crate::error::TaskError::InvalidDate {
date: due_str
})?;
task = task.with_due_date(due_date);
}
if let Some(tags_str) = tags {
let tags: Vec<String> = tags_str
.split(',')
.map(|s| s.trim().to_string())
.collect();
task = task.with_tags(tags);
}
storage.save_task(&task)?;
println!("{} Task added: {}", "✓".green(), task.id);
Ok(())
}
fn list_tasks(
storage: &JsonFileStorage,
status: Option<String>,
priority: Option<String>,
tags: Option<String>,
overdue: bool,
) -> Result<()> {
let mut filter = TaskFilter::new();
if let Some(status_str) = status {
filter.status = Some(status_str.parse()?);
}
if let Some(priority_str) = priority {
filter.priority = Some(match priority_str.to_lowercase().as_str() {
"low" => Priority::Low,
"medium" => Priority::Medium,
"high" => Priority::High,
"urgent" => Priority::Urgent,
_ => return Err(crate::error::TaskError::InvalidStatus {
status: priority_str
}),
});
}
if let Some(tags_str) = tags {
filter.tags = tags_str
.split(',')
.map(|s| s.trim().to_string())
.collect();
}
filter.overdue_only = overdue;
let tasks = storage.list_tasks(&filter)?;
if tasks.is_empty() {
println!("No tasks found.");
return Ok(());
}
println!("{:<8} {:<20} {:<12} {:<8} {:<10}",
"ID", "Title", "Status", "Priority", "Due Date");
println!("{}", "-".repeat(70));
for task in tasks {
let status_colored = match task.status {
TaskStatus::Todo => task.status.to_string().yellow(),
TaskStatus::InProgress => task.status.to_string().blue(),
TaskStatus::Done => task.status.to_string().green(),
};
let priority_colored = match task.priority {
Priority::Low => task.priority.to_string().white(),
Priority::Medium => task.priority.to_string().yellow(),
Priority::High => task.priority.to_string().red(),
Priority::Urgent => task.priority.to_string().red().bold(),
};
let due_date_str = task.due_date
.map(|d| d.format("%Y-%m-%d").to_string())
.unwrap_or_else(|| "-".to_string());
let due_date_colored = if task.is_overdue() {
due_date_str.red()
} else {
due_date_str.normal()
};
println!("{:<8} {:<20} {:<12} {:<8} {:<10}",
&task.id.to_string()[..8],
task.title,
status_colored,
priority_colored,
due_date_colored);
}
Ok(())
}
fn update_task(
storage: &mut JsonFileStorage,
id: String,
status: String,
) -> Result<()> {
let task_id: Uuid = id.parse()
.map_err(|_| crate::error::TaskError::TaskNotFound { id })?;
let mut task = storage.load_task(task_id)?;
let new_status: TaskStatus = status.parse()?;
task.update_status(new_status);
storage.update_task(&task)?;
println!("{} Task {} updated to {}",
"✓".green(), task_id, new_status);
Ok(())
}
fn delete_task(storage: &mut JsonFileStorage, id: String) -> Result<()> {
let task_id: Uuid = id.parse()
.map_err(|_| crate::error::TaskError::TaskNotFound { id })?;
storage.delete_task(task_id)?;
println!("{} Task {} deleted", "✓".green(), task_id);
Ok(())
}
fn show_task(storage: &JsonFileStorage, id: String) -> Result<()> {
let task_id: Uuid = id.parse()
.map_err(|_| crate::error::TaskError::TaskNotFound { id })?;
let task = storage.load_task(task_id)?;
println!("Task Details:");
println!(" ID: {}", task.id);
println!(" Title: {}", task.title.bold());
if let Some(description) = &task.description {
println!(" Description: {}", description);
}
println!(" Status: {}", match task.status {
TaskStatus::Todo => task.status.to_string().yellow(),
TaskStatus::InProgress => task.status.to_string().blue(),
TaskStatus::Done => task.status.to_string().green(),
});
println!(" Priority: {}", match task.priority {
Priority::Low => task.priority.to_string().white(),
Priority::Medium => task.priority.to_string().yellow(),
Priority::High => task.priority.to_string().red(),
Priority::Urgent => task.priority.to_string().red().bold(),
});
println!(" Created: {}", task.created_at.format("%Y-%m-%d %H:%M:%S UTC"));
println!(" Updated: {}", task.updated_at.format("%Y-%m-%d %H:%M:%S UTC"));
if let Some(due_date) = task.due_date {
let due_str = due_date.format("%Y-%m-%d %H:%M:%S UTC").to_string();
if task.is_overdue() {
println!(" Due Date: {} {}", due_str.red(), "(OVERDUE)".red().bold());
} else {
println!(" Due Date: {}", due_str);
}
}
if !task.tags.is_empty() {
println!(" Tags: {}", task.tags.join(", ").cyan());
}
Ok(())
}
主程序
// src/main.rs
mod task;
mod storage;
mod cli;
mod error;
use cli::run_cli;
fn main() {
if let Err(e) = run_cli() {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
库入口
// src/lib.rs
pub mod task;
pub mod storage;
pub mod error;
pub use task::{Task, TaskStatus, Priority, TaskFilter};
pub use storage::{TaskStorage, JsonFileStorage};
pub use error::{TaskError, Result};
10.5 项目对比总结
Python vs Rust 项目开发对比
方面 | Python | Rust |
---|---|---|
项目初始化 | pip install + setup.py | cargo new + Cargo.toml |
依赖管理 | pip, poetry, pipenv | Cargo (内置) |
编译检查 | 运行时错误检查 | 编译时严格检查 |
测试框架 | pytest, unittest | 内置测试 + 第三方 |
文档生成 | sphinx, pydoc | rustdoc (内置) |
性能分析 | cProfile, line_profiler | criterion, perf |
错误处理 | 异常 + 类型提示 | Result<T,E> + Option<T> |
并发模型 | GIL限制 | 真正并行 |
内存管理 | 垃圾回收 | 所有权系统 |
部署 | 解释器 + 源码 | 单一可执行文件 |
总结
第10章展示了Rust完整的生态系统和开发实践。通过任务管理器项目,我们看到了Rust如何在实际开发中体现其优势:类型安全、性能、并发能力和优秀的工具链。
Rust的生态系统虽然相对较新,但已经非常成熟,拥有高质量的库和工具。Cargo作为包管理器和构建工具,提供了统一的开发体验。