golang 实现并发是通过通信共享内存,channel 是 go 语言中goroutine 的通信管道,通过 channel 将值从一个 goroutine 发送到另一个 goroutine。
创建 channel:
使用 make()
函数创建:
ch := make(chan int)
默认创建一个无缓存 channel。
发送数据到 channel:
ch <- x
从 channel 读取数据:
x := <-ch
关闭 channel:
使用 close()
函数关闭:
close(ch)
当你的程序不再需要往 channel 中发送数据时,可以关闭 channel。
如果往已经关闭的 channal 发送数据,程序发生异常。
如果当前没有一个 goroutine 对无缓冲 channel 接收数据,那么无缓冲 channel 会阻止发送数据。
类似队列机制,创建时需要设定缓冲大小。
创建一个有缓冲 channel:
ch := make(chan int ch, 10)
在有 goroutine 接收 channel 数据之前,可以先向 channel 中发送10个数据。
现在,你可能想知道何时使用这两种类型。 这完全取决于你希望 goroutine 之间的通信如何进行。 无缓冲 channel 同步通信。 它们保证每次发送数据时,程序都会被阻止,直到有人从 channel 中读取数据。
相反,有缓冲 channel 将发送和接收操作解耦。 它们不会阻止程序,但你必须小心使用,因为可能最终会导致死锁(如前文所述)。 使用无缓冲 channel 时,可以控制可并发运行的 goroutine 的数量。 例如,你可能要对 API 进行调用,并且想要控制每秒执行的调用次数。 否则,你可能会被阻止。
package main
import (
"fmt"
"io/ioutil"
)
func main() {
bytes, e := ioutil.ReadFile("/Users/dp/tmp/0811/03/file.txt")
if e != nil {
panic("read file error.")
}
fmt.Println(string(bytes))
}
package main
import (
"bufio"
"fmt"
"io"
"os"
)
func main() {
f, e := os.Open("C:/Users/dp/go/src/0811/03/file.txt")
if e != nil {
panic("read file error.")
}
defer f.Close()
r := bufio.NewReader(f)
for {
line, _, eof := r.ReadLine()
if eof == io.EOF {
break // read last line.
}
fmt.Println(string(line))
}
}
package main
import (
"log"
"net/http"
"os"
"os/signal"
"time"
"github.com/gin-gonic/gin"
)
func main() {
router := gin.Default()
router.GET("/", func(c *gin.Context) {
time.Sleep(5 * time.Second)
c.String(http.StatusOK, "Welcome Gin Server")
})
server := &http.Server{
Addr: ":8080",
Handler: router,
}
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
go func() {
<-quit
log.Println("receive interrupt signal")
if err := server.Close(); err != nil {
log.Fatal("Server Close:", err)
}
}()
if err := server.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
log.Println("Server closed under request")
} else {
log.Fatal("Server closed unexpect")
}
}
log.Println("Server exiting")
}
package main
import (
"context"
"log"
"net/http"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
)
func main() {
// Create context that listens for the interrupt signal from the OS.
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
router := gin.Default()
router.GET("/", func(c *gin.Context) {
time.Sleep(10 * time.Second)
c.String(http.StatusOK, "Welcome Gin Server")
})
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Initializing the server in a goroutine so that
// it won't block the graceful shutdown handling below
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen: %s\n", err)
}
}()
// Listen for the interrupt signal.
<-ctx.Done()
// Restore default behavior on the interrupt signal and notify user of shutdown.
stop()
log.Println("shutting down gracefully, press Ctrl+C again to force")
// The context is used to inform the server it has 5 seconds to finish
// the request it is currently handling
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown: ", err)
}
log.Println("Server exiting")
}
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
)
func main() {
router := gin.Default()
router.GET("/", func(c *gin.Context) {
time.Sleep(5 * time.Second)
c.String(http.StatusOK, "Welcome Gin Server")
})
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Initializing the server in a goroutine so that
// it won't block the graceful shutdown handling below
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen: %s\n", err)
}
}()
// Wait for interrupt signal to gracefully shutdown the server with
// a timeout of 5 seconds.
quit := make(chan os.Signal, 1)
// kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be catch, so don't need add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// The context is used to inform the server it has 5 seconds to finish
// the request it is currently handling
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown: ", err)
}
log.Println("Server exiting")
}
函数式选项模式:Functional Options Pattern,是 Golang 中一种常用的设计模式,用于解决构造函数参数过多的问题。
示例:
package main
func main() {
NewServer("test", SetHost("localhost"), SetPort(8081))
}
type Server struct {
Label string
Host string
Port int32
}
func NewServer(label string, opts ...func(s *Server)) *Server {
s := &Server{
Label: label,
}
for _, opt := range opts {
opt(s)
}
return s
}
type Option func(s *Server)
func SetHost(host string) Option {
return func(s *Server) {
s.Host = host
}
}
func SetPort(port int32) Option {
return func(s *Server) {
s.Port = port
}
}
使用 Golang 调用 API 代码示例:
func GetApi() {
resp, err := http.Get("https://baidu.com")
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
1 简单 Post:
func PostApi1() {
json := `{"id":"u-001","name":"Jay","age":18}`
resp, err := http.Post("https://example.com/user", "application/json", bytes.NewBuffer([]byte(json)))
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
2 附带 Header:
func PostApi2() {
// 1. json
//json := `{"id":"u-001","name":"Jay","age":18}`
//req, _ := http.NewRequest(http.MethodPost, "https://example.com/user", bytes.NewBuffer([]byte(json)))
// 2. map
reqBody, _ := json.Marshal(map[string]string{
"id": "u-001",
"name": "Jay",
"age": "18",
})
req, _ := http.NewRequest(http.MethodPost, "https://example.com/user", bytes.NewBuffer(reqBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("My_Custom_Header", "Value")
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
3 构造请求对象:
type User struct {
Id string `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
func PostApi3() {
user := User{
Id: "u-001",
Name: "Jay",
Age: 18,
}
buf := new(bytes.Buffer)
json.NewEncoder(buf).Encode(user)
resp, err := http.Post("https://example.com/user", "application/json", buf)
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
4 OAuth2:
// go get golang.org/x/oauth2
func getAccessToken()string{
var authCfg = &clientcredentials.Config{
ClientID: "xxx",
ClientSecret: "xxx",
TokenURL: "https://example.com/connect/token",
EndpointParams: url.Values{
"grant_type": {"client_credentials"},
},
}
token, err := authCfg.TokenSource(context.Background()).Token()
if err != nil {
fmt.Errorf("get access token failed. ERROR: %s\n", err.Error())
}
return token.AccessToken
}
func OAuth2Api(){
json := `{"id":"u-001","name":"Jay","age":18}`
req, _ := http.NewRequest(http.MethodPost, "https://example.com/user", bytes.NewBuffer([]byte(json)))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Bearer", getAccessToken())
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
5 上传文件:
func FileApi(){
file,err:=os.Open("hello.txt")
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
defer file.Close()
var reqBody bytes.Buffer
multiPartWriter:=multipart.NewWriter(&reqBody)
fileWriter,err:=multiPartWriter.CreateFormFile("file_field","hello.txt")
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
_,err=io.Copy(fileWriter,file)
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
fieldWriter,err:=multiPartWriter.CreateFormField("normal_field")
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
_,err=fieldWriter.Write([]byte("value"))
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
multiPartWriter.Close()
req,err:=http.NewRequest(http.MethodPost,"http://example.com/file",&reqBody)
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
req.Header.Set("Content-Type",multiPartWriter.FormDataContentType())
client:=http.Client{}
resp,err:=client.Do(req)
if err!=nil{
fmt.Errorf("ERROR: %s\n", err.Error())
}
defer resp.Body.Close()
fmt.Printf("Status: %s\n", resp.Status)
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
fmt.Printf("StatusCode: %s\n", string(body))
}
package main
import (
"fmt"
"net/http"
"strings"
)
type JenkinsConfig struct {
User string `json:"user"`
Token string `json:"token"`
}
var jenkinsConf *JenkinsConfig
func main() {
jenkinsConf := &JenkinsConfig{
User: "xxx",
Token: "xxxxxx"
}
job := "jenkins-job-1"
disableJenkins(job)
}
func disableJenkins(job string) {
// disabled jenkins job
r, err := http.NewRequest(http.MethodPost, fmt.Sprintf("https://jenkins.example.com/job/%s/disable", job), nil)
if err != nil {
log.Errorf("new jenkins request failed: %s", err)
}
r.SetBasicAuth(jenkinsConf.User, jenkinsConf.Token)
client := &http.Client{}
resp, err := client.Do(r)
if err != nil {
log.Errorf("request failed: %s", err)
}
if resp.StatusCode == http.StatusOK {
log.Healthf("disable jenkins job [%s] successfully", job)
} else {
log.Warnf("disable jenkins job [%s] status [%d]", job, resp.StatusCode)
}
}
package main
import "fmt"
func main() {
/*
位运算符:
将数值,转为二进制后,按位操作
按位&:
对应位的值如果都为 1 才为 1,有一个为 0 就为 0
按位|:
对应位的值如果都是 0 才为 0,有一个为 1 就为 1
异或^:
二元:a^b
对应位的值不同为 1,相同为 0
一元:^a
按位取反:
1--->0
0--->1
位清空:&^
对于 a &^ b
对于 b 上的每个数值
如果为 0,则取 a 对应位上的数值
如果为 1,则结果位就取 0
位移运算符:
<<:按位左移,将 a 转为二进制,向左移动 b 位
a << b
>>: 按位右移,将 a 转为二进制,向右移动 b 位
a >> b
*/
a := 60
b := 13
/*
a: 60 0011 1100
b: 13 0000 1101
& 0000 1100
| 0011 1101
^ 0011 0001
&^ 0011 0000
a : 0000 0000 ... 0011 1100
^ 1111 1111 ... 1100 0011
*/
fmt.Printf("a:%d, %b\n",a,a)
fmt.Printf("b:%d, %b\n",b,b)
res1 := a & b
fmt.Println(res1) // 12
res2 := a | b
fmt.Println(res2) // 61
res3 := a ^ b
fmt.Println(res3) // 49
res4 := a &^ b
fmt.Println(res4) // 48
res5 := ^a
fmt.Println(res5)
c:=8
/*
c : ... 0000 1000
0000 100000
>> 0000 10
*/
res6 := c << 2
fmt.Println(res6)
res7 := c >> 2
fmt.Println(res7)
}
实现类似 nc 的客户端命令工具:
$ nc www.baidu.com 80
GET / HTTP/1.1
注意:需要连续两次回车,才会连接
废话少说,上代码:
main.go
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
if len(os.Args) != 3 {
log.Fatalln("Usage: nc [host] [port]")
}
host, port := os.Args[1], os.Args[2]
c, err := net.Dial("tcp", host+":"+port)
if err != nil {
log.Fatalln(err)
}
go func() {
io.Copy(c, os.Stdin)
}()
io.Copy(os.Stdout, c)
}
测试:
$ go build -o nc main.go
$ ./nc www.baidu.com 80
GET / HTTP/1.1
package gitutil
import (
"fmt"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"os"
)
func Clone() {
_, err := git.PlainClone("/var/app/ash", false, &git.CloneOptions{
URL: "https://github.com/poneding/ash.git",
ReferenceName: plumbing.NewBranchReferenceName("master"),
Auth: &http.BasicAuth{
Username: "poneding",
Password: "xxxxxx",
},
Progress: os.Stdout,
})
if err != nil {
fmt.Errorf("ERROR: %s\n", err.Error())
}
}
package esutil
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"strings"
)
func NewESClient2(esAddress []string) (*elasticsearch.Client, error) {
if len(esAddress) == 0 {
panic("Invalid parameters: esAddress.")
}
esConfig := elasticsearch.Config{Addresses: esAddress}
esClient, err := elasticsearch.NewClient(esConfig)
if err != nil {
fmt.Errorf("GetESClient ERROR: %s\n", err)
panic(err)
}
return esClient, nil
}
func QueryLogs2(esClient *elasticsearch.Client, query QueryLogModel) ([]LogModel, error) {
var (
buf bytes.Buffer
r map[string]interface{}
)
index := query.App + "-*"
q := map[string]interface{}{
"query": map[string]interface{}{
"match_phrase": map[string]interface{}{
"kubernetes.labels.app": query.App,
},
},
}
if err := json.NewEncoder(&buf).Encode(q); err != nil {
fmt.Errorf("QueryLogs ERROR: %s\n", err)
return nil, err
}
searchRes, err := esClient.Search(
esClient.Search.WithIndex(index),
esClient.Search.WithBody(&buf),
esClient.Search.WithQuery(query.Filter),
esClient.Search.WithFilterPath("hits.hits"),
esClient.Search.WithSize(query.Size),
esClient.Search.WithSort("@timestamp:desc"),
esClient.Search.WithSource("@timestamp","level","log","msg"),
)
defer searchRes.Body.Close()
if err != nil || searchRes.IsError() {
fmt.Errorf("QueryLogs ERROR: %s\n", err.Error())
return nil, errors.New(strings.Join([]string{"es.Search ERROR:", searchRes.Status(), err.Error()}, " "))
}
if err := json.NewDecoder(searchRes.Body).Decode(&r); err != nil {
fmt.Errorf("QueryLogs ERROR: %s\n", err.Error())
return nil, err
}
res := make([]LogModel, 0)
for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
logModel := LogModel{
Timestamp: source["@timestamp"].(string),
Log: source["log"].(string),
}
level, ok := source["level"]
if ok {
logModel.Level = level.(string)
}
log, ok := source["msg"]
if ok {
logModel.Log = log.(string)
}
res = append(res, logModel)
}
return res, nil
}
package es
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
"reflect"
"time"
)
func NewESClient(esAddresses []string) (*elastic.Client, error) {
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(esAddresses...))
if err != nil {
fmt.Errorf("NewESClient ERROR: %s\n", err)
panic(err)
}
return client, nil
}
func QueryLogs(esClient *elastic.Client, queryModel QueryLogModel) ([]LogModel, error) {
res := make([]LogModel, 0)
boolQuery := elastic.NewBoolQuery()
index := queryModel.App + "-*"
query := esClient.Search(index).FilterPath("hits.hits").Sort("@timestamp", false)
if len(queryModel.Filter) > 0 {
boolQuery.Filter(elastic.NewQueryStringQuery(queryModel.Filter))
}
if len(queryModel.Level) > 0 {
boolQuery.Filter(elastic.NewMatchPhraseQuery("level", queryModel.Level))
}
if queryModel.Page <= 0 {
queryModel.Page = 1
}
if queryModel.Size <= 0 {
queryModel.Size = 50
}
query = query.From((queryModel.Page-1)*queryModel.Size + 1).Size(queryModel.Size)
if queryModel.StartAt == (time.Time{}) {
queryModel.StartAt = time.Now().Add(-15 * time.Minute).UTC()
}
if queryModel.EndAt == (time.Time{}) {
queryModel.EndAt = time.Now().UTC()
}
boolQuery.Filter(elastic.NewRangeQuery("@timestamp").Gte(queryModel.StartAt).Lte(queryModel.EndAt))
query = query.Query(boolQuery)
queryRes, err := query.Do(context.Background())
if err != nil {
fmt.Errorf("QueryLogs ERROR: %s\n", err.Error())
return res, err
}
for _, item := range queryRes.Each(reflect.TypeOf(LogModel{})) {
if t, ok := item.(LogModel); ok {
res = append(res, LogModel{
Timestamp: t.Timestamp,
Level: t.Level,
// Msg storing source log here.
Log: t.Msg,
Msg: t.Log,
App: t.Kubernetes.Labels.App,
})
}
}
return res, nil
}
func QueryErrorLogs(esClient *elastic.Client) error {
query := esClient.Search("dev-core-*").FilterPath("hits.hits").Sort("@timestamp", false).Size(100).Query(elastic.NewMatchPhraseQuery("level", "error"))
queryRes, err := query.Do(context.Background())
if err != nil {
fmt.Errorf("QueryLogs ERROR: %s\n", err.Error())
}
for _, item := range queryRes.Each(reflect.TypeOf(LogModel{})) {
if t, ok := item.(LogModel); ok {
fmt.Println(t.Log)
}
}
return nil
}