- 首页 > it技术 > >
package mainimport (
"fmt"
"math/rand"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
)/*标题: 并发编程*/func hello() {
fmt.Println("Hello!")
}// 最简单的 Goroutine 示例
// Goroutine 类似于线程和协程
// 协程每次只有一个在执行, 而 Goroutine 可以有多个同时执行
// 线程创建后会同时存在, 而 Goroutine 只会挑选一部分(一般等于CPU逻辑核心数)
func basicRoutine() {
go hello() // 开启一个单独的 routine 执行 hello 函数
fmt.Println("main")
// main函数结束会杀死所有 routine
// 所以要先等一下才能看到输出
time.Sleep(time.Second)
}func sleepRandomTime(i int) {
rand.Seed(time.Now().UnixNano())
// 下面几行是为了学习随机数
//for i := 0;
i < 5;
i++ {
// r1 := rand.Int31() // int32
// r2 := rand.Intn(10) // 上限是10
// fmt.Println(-r1, -r2)
//} defer wg.Done()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) // 随机休眠0-500毫秒
fmt.Println("i =", i)
}var wg sync.WaitGroup// WaitGroup的使用方法. 类似于信号量
func waitGroupTest() {
wg.Add(10)
for i := 0;
i < 10;
i++ {
go sleepRandomTime(i)
}
wg.Wait()
}// GOMAXPROCS测试
// GOMAXPROCS是最多多少个线程同时运行. 默认是CPU逻辑核心数
func maxProcsTest() {
runtime.GOMAXPROCS(1) // 如果指定为1, 则下面按顺序执行, 如果换成2, 就可能会乱序了
wg.Add(2)
f := func(routineId int) {
defer wg.Done()
for i := 0;
i < 5;
i++ {
fmt.Printf("[%d]i=%d\n", routineId, i)
}
}
go f(1)
go f(2)
wg.Wait()
}// Channel用于线程交换数据
// 定义通道中的元素. 是引用类型, 默认是nil
// 通道中不要放大元素, 如果要放string, 放*string更好
var b chan int// 不带缓冲区的元素的发送和接收
// 不带缓冲区必须有人接收数据否则会阻塞线程
func channelWithBufferTest() {
b = make(chan int) // 通道中的元素一定要使用make初始化
defer close(b)// 关闭Channel. 有时候不关也没问题
// 不带缓冲区元素的发送和接收
wg.Add(1)
go func() {
defer wg.Done()
x := <-b // 接收值
<-b// 接收但是忽略结果
fmt.Println(x)
}()
// 发送值
b <- 18
b <- 10
wg.Wait()
}// 不带缓冲区的元素的发送和接收
func channelWithoutBufferTest() {
b = make(chan int, 1) // 带缓冲区的元素初始化. 第二个参数是容量
defer close(b)// 关闭Channel
b <- 10// 因为有一个缓冲所以不会阻塞
/*b <- 10*/// 缓冲区满了, 会阻塞
x, ok := <-b // 从通道中取出数据. 对于已经关闭的流也能取输出, 但是ok==false
fmt.Println(x, ok)
}// Channel 练习
// 1. 启动一个 goroutine, 生成 100 个数发送到 ch1
// 2. 启动一个 goroutine, 从 ch1 中取值, 计算平方放到 ch2 中
// 3. 在 printNumbers() 中从 ch2 取值并打印
// 单向通道: 只能发送或接收
//chan<- 表示只能发送
// <-chan表示只能接收
func numberGenerate(ch1 chan<- int) {
defer wg.Done()
for i := 0;
i < 100;
i++ {
ch1 <- i
}
close(ch1)
}
func compute(ch1 <-chan int, ch2 chan<- int) {
defer wg.Done()
for x := range ch1 {
ch2 <- x * x
}
close(ch2) // 必须要有一个结束信号, 否则后面的for循环会出错
fmt.Println("Compute Complete!")
}
func printNumbers() {
ch1 := make(chan int, 10)
ch2 := make(chan int, 1)
wg.Add(2)
go numberGenerate(ch1)
go compute(ch1, ch2)
for ret := range ch2 {
fmt.Println(ret)
}
wg.Wait()
}// 例子: 开启3个goroutine完成100个任务
// 因为开多个routine会增加routine切换的开销, 所以一般只会建少量routine
func doTasksWithFewRoutines() {
jobs := make(chan int, 100) // 实际场景会使用结构体定义Job
results := make(chan int, 100)
// 开启3个routine
for w := 0;
w < 3;
w++ {
go func(id int, jobs <-chan int, result chan<- int) {
for job := range jobs {
result <- job * job
fmt.Printf("Job %d is doing %d\n", id, job)
time.Sleep(time.Millisecond) // 增加一丢丢延迟
}
}(w, jobs, results)
}
// 100个任务
for j := 0;
j < 100;
j++ {
jobs <- j
}
close(jobs)
for j := 0;
j < 100;
j++ {
fmt.Println(<-results)
}
}// 使用select使通道多路复用
// select会从所有case中随机选择能执行的
// 下面的例子, i=0时, 只能执行 ch<-i
// i=1时, 只能执行 i<-ch
// 如果把make(chan int, 1)改成make(chan int, 10), 则结果会随机.
func selectMultiChannel() {
ch := make(chan int, 1)
for i := 0;
i < 10;
i++ {
select {
case x := <-ch: // [1]
fmt.Println(x)
case ch <- i: //[2]
default: // 所有case都执行不了
}
}
}// 互斥锁
func addUsingMutex() {
var mutex sync.Mutex
var wg sync.WaitGroup
x := 0
add := func() {
defer wg.Done()
for i := 0;
i < 1000;
i++ {
mutex.Lock()
x++
time.Sleep(time.Millisecond * 3) // 故意增加的延迟
mutex.Unlock()
}
}
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}// 读写互斥锁
// 很多场景下读的次数远多于写的次数
// 如果用互斥锁, 时间代价很大
// 用读写互斥锁会提高效率
func usingRWMutex() {
var rwMutex sync.RWMutex // 定义读写锁
var x int = 0
write := func() {
defer wg.Done()
rwMutex.Lock() // 写锁
x++
time.Sleep(time.Millisecond) // 故意增加一些延迟
rwMutex.Unlock()
}
read := func() {
defer wg.Done()
rwMutex.RLock() // 读锁
fmt.Println("reading x =", x)
rwMutex.RUnlock()
}
for i := 0;
i < 10;
i++ {
wg.Add(1)
go write()
}
for i := 0;
i < 1000;
i++ {
wg.Add(1)
go read()
}
wg.Wait()
}// sync.Once 确保某些操作在高并发场景下只执行一次
func syncOnceTest() {
var ch = make(chan int)
var once sync.Once
f := func() {
once.Do(func() {
close(ch)
})
}
go f()
go f() // 尽管调用了两次f, 但只会关闭一次通道
}// sync.Map 并发安全的map
func syncMapTest() {
var m sync.Map // 不需要用make初始化, 直接用
var wg sync.WaitGroup
for i := 0;
i < 22;
i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := strconv.Itoa(i) // int to ascii (整数转字符串)
m.Store(key, i)// map写入值
v, ok := m.Load(key)// map读取值
fmt.Printf("key=%v\tok=%v\tvalue=https://www.it610.com/article/%v/n", key, ok, v)
}(i)
}
wg.Wait()
}// 原子操作 atomic包
// 包含 Load Store Add Swap CompareAndSwap 5类操作
// The compare-and-swap operation, implemented by the CompareAndSwapT
// functions, is the atomic equivalent of:
//
// if *addr == old {
//*addr = new
//return true
// }
// return false
func atomicTest() {
var wg sync.WaitGroup
var v int64 = 0
for i := 0;
i < 100000;
i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&v, 1)
// atomic.StoreInt64(&v, 100) // 赋值
}()
}
wg.Wait()
t := atomic.LoadInt64(&v)
fmt.Println(t)
}
推荐阅读